diff --git a/src/Common/config.h b/src/Common/config.h index 76017fba..9ae406c0 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -57,14 +57,12 @@ bool loadIniConfig(const char *ini_path = nullptr); #define CLEAR_ARR(arr) for(auto &item : arr){ item = 0;} #endif //CLEAR_ARR -#define SERVER_NAME "ZLMediaKit" +#define SERVER_NAME "ZLMediaKit-3.0" #define VHOST_KEY "vhost" #define HTTP_SCHEMA "http" #define RTSP_SCHEMA "rtsp" #define RTMP_SCHEMA "rtmp" #define DEFAULT_VHOST "__defaultVhost__" -#define RTSP_VERSION 1.30 -#define RTSP_BUILDTIME __DATE__" CST" ////////////广播名称/////////// namespace Broadcast { diff --git a/src/Http/HttpRequestSplitter.cpp b/src/Http/HttpRequestSplitter.cpp index 5f7353be..59acb05b 100644 --- a/src/Http/HttpRequestSplitter.cpp +++ b/src/Http/HttpRequestSplitter.cpp @@ -54,12 +54,16 @@ void HttpRequestSplitter::input(const char *data,uint64_t len) { //数据按照请求头处理 const char *index = nullptr; - uint64_t remain = len; - while (_content_len == 0 && remain > 0 && (index = onSearchPacketTail(ptr,remain)) != nullptr) { + _remain_data_size = len; + while (_content_len == 0 && _remain_data_size > 0 && (index = onSearchPacketTail(ptr,_remain_data_size)) != nullptr) { //_content_len == 0,这是请求头 - _content_len = onRecvHeader(ptr, index - ptr); + const char *header_ptr = ptr; + int64_t header_size = index - ptr; + ptr = index; - remain = len - (ptr - data); + _remain_data_size = len - (ptr - data); + + _content_len = onRecvHeader(header_ptr, header_size); } /* @@ -67,7 +71,7 @@ void HttpRequestSplitter::input(const char *data,uint64_t len) { */ tail_ref = tail_tmp; - if(remain <= 0){ + if(_remain_data_size <= 0){ //没有剩余数据,清空缓存 _remain_data.clear(); return; @@ -75,7 +79,7 @@ void HttpRequestSplitter::input(const char *data,uint64_t len) { if(_content_len == 0){ //尚未找到http头,缓存定位到剩余数据部分 - string str(ptr,remain); + string str(ptr,_remain_data_size); _remain_data = str; return; } @@ -83,23 +87,23 @@ void HttpRequestSplitter::input(const char *data,uint64_t len) { //已经找到http头了 if(_content_len > 0){ //数据按照固定长度content处理 - if(remain < _content_len){ + if(_remain_data_size < _content_len){ //数据不够,缓存定位到剩余数据部分 - string str(ptr,remain); + string str(ptr,_remain_data_size); _remain_data = str; return; } //收到content数据,并且接受content完毕 onRecvContent(ptr,_content_len); - remain -= _content_len; + _remain_data_size -= _content_len; ptr += _content_len; //content处理完毕,后面数据当做请求头处理 _content_len = 0; - if(remain > 0){ + if(_remain_data_size > 0){ //还有数据没有处理完毕 - string str(ptr,remain); + string str(ptr,_remain_data_size); _remain_data = str; data = ptr = (char *)_remain_data.data(); @@ -112,7 +116,7 @@ void HttpRequestSplitter::input(const char *data,uint64_t len) { //_content_len < 0;数据按照不固定长度content处理 - onRecvContent(ptr,remain);//消费掉所有剩余数据 + onRecvContent(ptr,_remain_data_size);//消费掉所有剩余数据 _remain_data.clear(); } @@ -133,6 +137,10 @@ const char *HttpRequestSplitter::onSearchPacketTail(const char *data,int len) { return pos + 4; } +int64_t HttpRequestSplitter::remainDataSize() { + return _remain_data_size; +} + } /* namespace mediakit */ diff --git a/src/Http/HttpRequestSplitter.h b/src/Http/HttpRequestSplitter.h index 0e143f49..8aedcdcc 100644 --- a/src/Http/HttpRequestSplitter.h +++ b/src/Http/HttpRequestSplitter.h @@ -81,9 +81,15 @@ protected: * 恢复初始设置 */ void reset(); + + /** + * 剩余数据大小 + */ + int64_t remainDataSize(); private: string _remain_data; int64_t _content_len = 0; + int64_t _remain_data_size = 0; }; } /* namespace mediakit */ diff --git a/src/Player/PlayerBase.h b/src/Player/PlayerBase.h index f547adeb..7fce8563 100644 --- a/src/Player/PlayerBase.h +++ b/src/Player/PlayerBase.h @@ -87,6 +87,7 @@ class PlayerBase : public DemuxerBase, public mINI{ public: typedef std::shared_ptr Ptr; typedef enum { + RTP_Invalid = -1, RTP_TCP = 0, RTP_UDP = 1, RTP_MULTICAST = 2, diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 79bbd1d8..6b24c3f8 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -25,6 +25,7 @@ */ #include +#include #include "Common/config.h" #include "UDPServer.h" #include "RtspSession.h" @@ -43,17 +44,11 @@ namespace mediakit { static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE; -string dateHeader() { - char buf[200]; - time_t tt = time(NULL); - strftime(buf, sizeof buf, "Date: %a, %b %d %Y %H:%M:%S GMT\r\n", gmtime(&tt)); - return buf; -} - unordered_map > RtspSession::g_mapGetter; unordered_map > RtspSession::g_mapPostter; recursive_mutex RtspSession::g_mtxGetter; //对quicktime上锁保护 recursive_mutex RtspSession::g_mtxPostter; //对quicktime上锁保护 + RtspSession::RtspSession(const std::shared_ptr &pTh, const Socket::Ptr &pSock) : TcpSession(pTh, pSock), _pSender(pSock) { //设置10秒发送缓存 @@ -152,41 +147,40 @@ void RtspSession::onManager() { int64_t RtspSession::onRecvHeader(const char *header,uint64_t len) { - char tmp[2 * 1024]; - _pcBuf = tmp; - _parser.Parse(header); //rtsp请求解析 string strCmd = _parser.Method(); //提取出请求命令字 _iCseq = atoi(_parser["CSeq"].data()); - typedef bool (RtspSession::*rtspCMDHandle)(); - static unordered_map g_mapCmd; + typedef int (RtspSession::*rtsp_request_handler)(); + static unordered_map s_handler_map; static onceToken token( []() { - g_mapCmd.emplace("OPTIONS",&RtspSession::handleReq_Options); - g_mapCmd.emplace("DESCRIBE",&RtspSession::handleReq_Describe); - g_mapCmd.emplace("SETUP",&RtspSession::handleReq_Setup); - g_mapCmd.emplace("PLAY",&RtspSession::handleReq_Play); - g_mapCmd.emplace("PAUSE",&RtspSession::handleReq_Pause); - g_mapCmd.emplace("TEARDOWN",&RtspSession::handleReq_Teardown); - g_mapCmd.emplace("GET",&RtspSession::handleReq_Get); - g_mapCmd.emplace("POST",&RtspSession::handleReq_Post); - g_mapCmd.emplace("SET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER); - g_mapCmd.emplace("GET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER); + s_handler_map.emplace("OPTIONS",&RtspSession::handleReq_Options); + s_handler_map.emplace("DESCRIBE",&RtspSession::handleReq_Describe); + s_handler_map.emplace("ANNOUNCE",&RtspSession::handleReq_ANNOUNCE); + s_handler_map.emplace("SETUP",&RtspSession::handleReq_Setup); + s_handler_map.emplace("PLAY",&RtspSession::handleReq_Play); + s_handler_map.emplace("PAUSE",&RtspSession::handleReq_Pause); + s_handler_map.emplace("TEARDOWN",&RtspSession::handleReq_Teardown); + s_handler_map.emplace("GET",&RtspSession::handleReq_Get); + s_handler_map.emplace("POST",&RtspSession::handleReq_Post); + s_handler_map.emplace("SET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER); + s_handler_map.emplace("GET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER); }, []() {}); - auto it = g_mapCmd.find(strCmd); - if (it != g_mapCmd.end()) { + auto it = s_handler_map.find(strCmd); + int ret = 0; + if (it != s_handler_map.end()) { auto fun = it->second; - if(!(this->*fun)()){ - shutdown(); + ret = (this->*fun)(); + if(ret == -1){ + shutdown(); } } else{ shutdown(); - WarnL << "cmd=" << strCmd; + WarnL << "不支持的rtsp命令:" << strCmd; } - _parser.Clear(); - return 0; + return ret; } @@ -203,6 +197,7 @@ void RtspSession::onRecv(const Buffer::Ptr &pBuf) { } void RtspSession::inputRtspOrRtcp(const char *data,uint64_t len) { +// DebugL << data; if(data[0] == '$' && _rtpType == PlayerBase::RTP_TCP){ //这是rtcp return; @@ -210,23 +205,28 @@ void RtspSession::inputRtspOrRtcp(const char *data,uint64_t len) { input(data,len); } -bool RtspSession::handleReq_Options() { -//支持这些命令 - int n = sprintf(_pcBuf, - "RTSP/1.0 200 OK\r\n" - "CSeq: %d\r\n" - "Server: %s-%0.2f(build in %s)\r\n" - "%s" - "Public: OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY," - " PAUSE, SET_PARAMETER, GET_PARAMETER\r\n\r\n", - _iCseq, SERVER_NAME, - RTSP_VERSION, RTSP_BUILDTIME, - dateHeader().data()); - SocketHelper::send(_pcBuf, n); - return true; +int RtspSession::handleReq_Options() { + //支持这些命令 + sendRtspResponse("200 OK",{"Public" , "OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, ANNOUNCE, SET_PARAMETER, GET_PARAMETER"}); + return 0; } -bool RtspSession::handleReq_Describe() { +void RtspSession::onRecvContent(const char *data, uint64_t len) { +// DebugL << data; + if(_onContent){ + _onContent(data,len); + _onContent = nullptr; + } +} +int RtspSession::handleReq_ANNOUNCE() { + sendRtspResponse("200 OK"); + _onContent = [this](const char *data, uint64_t len){ + + }; + return atoi(_parser["Content-Length"].data()); +} + +int RtspSession::handleReq_Describe() { { //解析url获取媒体名称 _strUrl = _parser.Url(); @@ -242,8 +242,6 @@ bool RtspSession::handleReq_Describe() { } //恢复现场 strongSelf->_parser = parserCopy; - char tmp[2 * 1024]; - strongSelf->_pcBuf = tmp; if(!success){ //未找到相应的MediaSource @@ -273,7 +271,7 @@ bool RtspSession::handleReq_Describe() { invoker(""); } }); - return true; + return 0; } void RtspSession::onAuthSuccess(const weak_ptr &weakSelf) { auto strongSelf = weakSelf.lock(); @@ -287,22 +285,11 @@ void RtspSession::onAuthSuccess(const weak_ptr &weakSelf) { //本对象已销毁 return; } - char response[2 * 1024]; - int n = sprintf(response, - "RTSP/1.0 200 OK\r\n" - "CSeq: %d\r\n" - "Server: %s-%0.2f(build in %s)\r\n" - "%s" - "x-Accept-Retransmit: our-retransmit\r\n" - "x-Accept-Dynamic-Rate: 1\r\n" - "Content-Base: %s/\r\n" - "Content-Type: application/sdp\r\n" - "Content-Length: %d\r\n\r\n%s", - strongSelf->_iCseq, SERVER_NAME, - RTSP_VERSION, RTSP_BUILDTIME, - dateHeader().data(), strongSelf->_strUrl.data(), - (int) strongSelf->_strSdp.length(), strongSelf->_strSdp.data()); - strongSelf->SocketHelper::send(response, n); + strongSelf->sendRtspResponse("200 OK", + {"Content-Base",strongSelf->_strUrl, + "x-Accept-Retransmit","our-retransmit", + "x-Accept-Dynamic-Rate","1" + },strongSelf->_strSdp); }); } void RtspSession::onAuthFailed(const weak_ptr &weakSelf,const string &realm) { @@ -318,34 +305,21 @@ void RtspSession::onAuthFailed(const weak_ptr &weakSelf,const strin return; } - int n; - char response[2 * 1024]; GET_CONFIG_AND_REGISTER(bool,authBasic,Rtsp::kAuthBasic); if (!authBasic) { //我们需要客户端优先以md5方式认证 - strongSelf->_strNonce = makeRandStr(32); - n = sprintf(response, - "RTSP/1.0 401 Unauthorized\r\n" - "CSeq: %d\r\n" - "Server: %s-%0.2f(build in %s)\r\n" - "%s" - "WWW-Authenticate: Digest realm=\"%s\",nonce=\"%s\"\r\n\r\n", - strongSelf->_iCseq, SERVER_NAME, - RTSP_VERSION, RTSP_BUILDTIME, - dateHeader().data(), realm.data(), strongSelf->_strNonce.data()); + strongSelf->_strNonce = makeRandStr(32); + strongSelf->sendRtspResponse("401 Unauthorized", + {"WWW-Authenticate", + StrPrinter << "Digest realm=\"" << realm << "\",nonce=\"" << strongSelf->_strNonce << "\"" }, + strongSelf->_strSdp); }else { //当然我们也支持base64认证,但是我们不建议这样做 - n = sprintf(response, - "RTSP/1.0 401 Unauthorized\r\n" - "CSeq: %d\r\n" - "Server: %s-%0.2f(build in %s)\r\n" - "%s" - "WWW-Authenticate: Basic realm=\"%s\"\r\n\r\n", - strongSelf->_iCseq, SERVER_NAME, - RTSP_VERSION, RTSP_BUILDTIME, - dateHeader().data(), realm.data()); + strongSelf->sendRtspResponse("401 Unauthorized", + {"WWW-Authenticate", + StrPrinter << "Basic realm=\"" << realm << "\"" }, + strongSelf->_strSdp); } - strongSelf->SocketHelper::send(response, n); }); } @@ -488,62 +462,31 @@ void RtspSession::onAuthUser(const weak_ptr &weakSelf,const string } } inline void RtspSession::send_StreamNotFound() { - int n = sprintf(_pcBuf, - "RTSP/1.0 404 Stream Not Found\r\n" - "CSeq: %d\r\n" - "Server: %s-%0.2f(build in %s)\r\n" - "%s" - "Connection: Close\r\n\r\n", - _iCseq, SERVER_NAME, - RTSP_VERSION, RTSP_BUILDTIME, - dateHeader().data()); - SocketHelper::send(_pcBuf, n); + sendRtspResponse("404 Stream Not Found",{"Connection","Close"}); } inline void RtspSession::send_UnsupportedTransport() { - int n = sprintf(_pcBuf, - "RTSP/1.0 461 Unsupported Transport\r\n" - "CSeq: %d\r\n" - "Server: %s-%0.2f(build in %s)\r\n" - "%s" - "Connection: Close\r\n\r\n", - _iCseq, SERVER_NAME, - RTSP_VERSION, RTSP_BUILDTIME, - dateHeader().data()); - SocketHelper::send(_pcBuf, n); + sendRtspResponse("461 Unsupported Transport",{"Connection","Close"}); } inline void RtspSession::send_SessionNotFound() { - int n = sprintf(_pcBuf, - "RTSP/1.0 454 Session Not Found\r\n" - "CSeq: %d\r\n" - "Server: %s-%0.2f(build in %s)\r\n" - "%s" - "Connection: Close\r\n\r\n", - _iCseq, SERVER_NAME, - RTSP_VERSION, RTSP_BUILDTIME, - dateHeader().data()); - SocketHelper::send(_pcBuf, n); - - /*40 Method Not Allowed*/ - + sendRtspResponse("454 Session Not Found",{"Connection","Close"}); } -bool RtspSession::handleReq_Setup() { +int RtspSession::handleReq_Setup() { //处理setup命令,该函数可能进入多次 auto controlSuffix = _parser.FullUrl().substr(1 + _parser.FullUrl().rfind('/')); int trackIdx = getTrackIndexByControlSuffix(controlSuffix); if (trackIdx == -1) { //未找到相应track - return false; + return -1; } SdpTrack::Ptr &trackRef = _aTrackInfo[trackIdx]; if (trackRef->_inited) { //已经初始化过该Track - return false; + return -1; } trackRef->_inited = true; //现在初始化 - if(!_bSetUped){ - _bSetUped = true; + if(_rtpType == PlayerBase::RTP_Invalid){ auto strTransport = _parser["Transport"]; if(strTransport.find("TCP") != string::npos){ _rtpType = PlayerBase::RTP_TCP; @@ -556,23 +499,13 @@ bool RtspSession::handleReq_Setup() { switch (_rtpType) { case PlayerBase::RTP_TCP: { - int iLen = sprintf(_pcBuf, - "RTSP/1.0 200 OK\r\n" - "CSeq: %d\r\n" - "Server: %s-%0.2f(build in %s)\r\n" - "%s" - "Transport: RTP/AVP/TCP;unicast;" - "interleaved=%d-%d;ssrc=%s;mode=play\r\n" - "Session: %s\r\n" - "x-Transport-Options: late-tolerance=1.400000\r\n" - "x-Dynamic-Rate: 1\r\n\r\n", - _iCseq, SERVER_NAME, - RTSP_VERSION, RTSP_BUILDTIME, - dateHeader().data(), trackRef->_type * 2, - trackRef->_type * 2 + 1, - printSSRC(trackRef->_ssrc).data(), - _strSession.data()); - SocketHelper::send(_pcBuf, iLen); + sendRtspResponse("200 OK", + {"Transport",StrPrinter << "RTP/AVP/TCP;unicast;" + << "interleaved=" << trackRef->_type * 2 << "-" << trackRef->_type * 2 + 1 << ";" + << "ssrc=" << printSSRC(trackRef->_ssrc) << ";mode=play", + "x-Transport-Options" , "late-tolerance=1.400000", + "x-Dynamic-Rate" , "1" + }); } break; case PlayerBase::RTP_UDP: { @@ -582,14 +515,14 @@ bool RtspSession::handleReq_Setup() { //分配端口失败 WarnL << "分配rtp端口失败"; send_NotAcceptable(); - return false; + return -1; } auto pSockRtcp = UDPServer::Instance().getSock(get_local_ip().data(),2*trackIdx + 1 ,pSockRtp->get_local_port() + 1); if (!pSockRtcp) { //分配端口失败 WarnL << "分配rtcp端口失败"; send_NotAcceptable(); - return false; + return -1; } _apUdpSock[trackIdx] = pSockRtp; //设置客户端内网端口信息 @@ -604,21 +537,13 @@ bool RtspSession::handleReq_Setup() { //尝试获取客户端nat映射地址 startListenPeerUdpData(); //InfoL << "分配端口:" << srv_port; - int n = sprintf(_pcBuf, - "RTSP/1.0 200 OK\r\n" - "CSeq: %d\r\n" - "Server: %s-%0.2f(build in %s)\r\n" - "%s" - "Transport: RTP/AVP/UDP;unicast;" - "client_port=%s;server_port=%d-%d;ssrc=%s;mode=play\r\n" - "Session: %s\r\n\r\n", - _iCseq, SERVER_NAME, - RTSP_VERSION, RTSP_BUILDTIME, - dateHeader().data(), strClientPort.data(), - pSockRtp->get_local_port(), pSockRtcp->get_local_port(), - printSSRC(trackRef->_ssrc).data(), - _strSession.data()); - SocketHelper::send(_pcBuf, n); + + sendRtspResponse("200 OK", + {"Transport",StrPrinter << "RTP/AVP/UDP;unicast;" + << "client_port=" << strClientPort << ";" + << "server_port=" << pSockRtp->get_local_port() << "-" << pSockRtcp->get_local_port() << ";" + << "ssrc=" << printSSRC(trackRef->_ssrc) << ";mode=play" + }); } break; case PlayerBase::RTP_MULTICAST: { @@ -626,7 +551,7 @@ bool RtspSession::handleReq_Setup() { _pBrdcaster = RtpBroadCaster::get(get_local_ip(),_mediaInfo._vhost, _mediaInfo._app, _mediaInfo._streamid); if (!_pBrdcaster) { send_NotAcceptable(); - return false; + return -1; } weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); _pBrdcaster->setDetachCB(this, [weakSelf]() { @@ -644,56 +569,38 @@ bool RtspSession::handleReq_Setup() { //分配端口失败 WarnL << "分配rtcp端口失败"; send_NotAcceptable(); - return false; + return -1; } startListenPeerUdpData(); GET_CONFIG_AND_REGISTER(uint32_t,udpTTL,MultiCast::kUdpTTL); - int n = sprintf(_pcBuf, - "RTSP/1.0 200 OK\r\n" - "CSeq: %d\r\n" - "Server: %s-%0.2f(build in %s)\r\n" - "%s" - "Transport: RTP/AVP;multicast;destination=%s;" - "source=%s;port=%d-%d;ttl=%d;ssrc=%s\r\n" - "Session: %s\r\n\r\n", - _iCseq, SERVER_NAME, - RTSP_VERSION, RTSP_BUILDTIME, - dateHeader().data(), _pBrdcaster->getIP().data(), - get_local_ip().data(), iSrvPort, pSockRtcp->get_local_port(), - udpTTL, printSSRC(trackRef->_ssrc).data(), - _strSession.data()); - SocketHelper::send(_pcBuf, n); + + sendRtspResponse("200 OK", + {"Transport",StrPrinter << "RTP/AVP;multicast;" + << "destination=" << _pBrdcaster->getIP() << ";" + << "source=" << get_local_ip() << ";" + << "port=" << iSrvPort << "-" << pSockRtcp->get_local_port() << ";" + << "ttl=" << udpTTL << ";" + << "ssrc=" << printSSRC(trackRef->_ssrc) + }); } break; default: break; } - return true; + return 0; } -bool RtspSession::handleReq_Play() { +int RtspSession::handleReq_Play() { if (_aTrackInfo.empty() || _parser["Session"] != _strSession) { send_SessionNotFound(); - return false; + return -1; } auto strRange = _parser["Range"]; auto onRes = [this,strRange](const string &err){ bool authSuccess = err.empty(); - char response[2 * 1024]; - _pcBuf = response; if(!authSuccess){ //第一次play是播放,否则是恢复播放。只对播放鉴权 - int n = sprintf(_pcBuf, - "RTSP/1.0 401 Unauthorized\r\n" - "CSeq: %d\r\n" - "Server: %s-%0.2f(build in %s)\r\n" - "%s" - "Content-Type: text/plain\r\n" - "Content-Length: %d\r\n\r\n%s", - _iCseq, SERVER_NAME, - RTSP_VERSION, RTSP_BUILDTIME, - dateHeader().data(),(int)err.size(),err.data()); - SocketHelper::send(_pcBuf,n); + sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err); shutdown(); return; } @@ -722,16 +629,8 @@ bool RtspSession::handleReq_Play() { pMediaSrc->seekTo(0); } _bFirstPlay = false; - int iLen = sprintf(_pcBuf, - "RTSP/1.0 200 OK\r\n" - "CSeq: %d\r\n" - "Server: %s-%0.2f(build in %s)\r\n" - "%s" - "Session: %s\r\n" - "Range: npt=%.2f-\r\n" - "RTP-Info: ", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME, - dateHeader().data(), _strSession.data(), pMediaSrc->getTimeStamp(TrackInvalid) / 1000.0); + _StrPrinter rtp_info; for(auto &track : _aTrackInfo){ if (track->_inited == false) { //还有track没有setup @@ -742,17 +641,17 @@ bool RtspSession::handleReq_Play() { track->_seq = pMediaSrc->getSeqence(track->_type); track->_time_stamp = pMediaSrc->getTimeStamp(track->_type); - iLen += sprintf(_pcBuf + iLen, "url=%s/%s;seq=%d;rtptime=%u,", - _strUrl.data(), - track->_control_surffix.data(), - track->_seq, - track->_time_stamp * (track->_samplerate / 1000)); + rtp_info << "url=" << _strUrl << "/" << track->_control_surffix << ";" + << "seq=" << track->_seq << ";" + << "rtptime=" << (int)(track->_time_stamp * (track->_samplerate / 1000)) << ","; } - iLen -= 1; - (_pcBuf)[iLen] = '\0'; - iLen += sprintf(_pcBuf + iLen, "\r\n\r\n"); - SocketHelper::send(_pcBuf, iLen); + rtp_info.pop_back(); + + sendRtspResponse("200 OK", + {"Range", StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << pMediaSrc->getTimeStamp(TrackInvalid) / 1000.0, + "RTP-Info",rtp_info + }); _enableSendRtp = true; @@ -816,66 +715,51 @@ bool RtspSession::handleReq_Play() { //后面是seek或恢复命令,不需要鉴权 onRes(""); } - return true; + return 0; } -bool RtspSession::handleReq_Pause() { +int RtspSession::handleReq_Pause() { if (_parser["Session"] != _strSession) { send_SessionNotFound(); - return false; + return -1; } - int n = sprintf(_pcBuf, - "RTSP/1.0 200 OK\r\n" - "CSeq: %d\r\n" - "Server: %s-%0.2f(build in %s)\r\n" - "%s" - "Session: %s\r\n\r\n", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME, - dateHeader().data(), _strSession.data()); - SocketHelper::send(_pcBuf, n); + + sendRtspResponse("200 OK"); _enableSendRtp = false; - return true; + return 0; } -bool RtspSession::handleReq_Teardown() { - int n = sprintf(_pcBuf, - "RTSP/1.0 200 OK\r\n" - "CSeq: %d\r\n" - "Server: %s-%0.2f(build in %s)\r\n" - "%s" - "Session: %s\r\n\r\n", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME, - dateHeader().data(), _strSession.data()); - SocketHelper::send(_pcBuf, n); +int RtspSession::handleReq_Teardown() { + sendRtspResponse("200 OK"); TraceL << "播放器断开连接!"; - return false; + return 0; } -bool RtspSession::handleReq_Get() { +int RtspSession::handleReq_Get() { _strSessionCookie = _parser["x-sessioncookie"]; - int n = sprintf(_pcBuf, - "HTTP/1.0 200 OK\r\n" - "%s" - "Connection: close\r\n" - "Cache-Control: no-store\r\n" - "Pragma: no-cache\r\n" - "Content-Type: application/x-rtsp-tunnelled\r\n\r\n", - dateHeader().data()); -//注册GET + sendRtspResponse("200 OK", + {"Connection","Close", + "Cache-Control","no-store", + "Pragma","no-store", + "Content-Type","application/x-rtsp-tunnelled", + },"","HTTP/1.0"); + + //注册GET lock_guard lock(g_mtxGetter); g_mapGetter[_strSessionCookie] = dynamic_pointer_cast(shared_from_this()); //InfoL << _strSessionCookie; - SocketHelper::send(_pcBuf, n); - return true; + return 0; } -bool RtspSession::handleReq_Post() { +int RtspSession::handleReq_Post() { lock_guard lock(g_mtxGetter); string sessioncookie = _parser["x-sessioncookie"]; //Poster 找到 Getter auto it = g_mapGetter.find(sessioncookie); if (it == g_mapGetter.end()) { //WarnL << sessioncookie; - return false; + return -1; } _bBase64need = true; //Poster 找到Getter的SOCK @@ -884,35 +768,34 @@ bool RtspSession::handleReq_Post() { if (!strongSession) { send_SessionNotFound(); //WarnL; - return false; + return -1; } initSender(strongSession); - return true; + auto nextPacketSize = remainDataSize(); + if(nextPacketSize > 0){ + _onContent = [this](const char *data,uint64_t len){ + BufferRaw::Ptr buffer = std::make_shared(); + buffer->assign(data,len); + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + async([weakSelf,buffer](){ + auto strongSelf = weakSelf.lock(); + if(strongSelf){ + strongSelf->onRecv(buffer); + } + },false); + }; + } + return nextPacketSize; } -bool RtspSession::handleReq_SET_PARAMETER() { +int RtspSession::handleReq_SET_PARAMETER() { //TraceL< &fun) { @@ -990,8 +873,8 @@ inline bool RtspSession::findStream() { return false; } _strSdp = pMediaSrc->getSdp(); - _sdpAttr.load(_strSdp); - _aTrackInfo = _sdpAttr.getAvailableTrack(); + SdpAttr sdpAttr(_strSdp); + _aTrackInfo = sdpAttr.getAvailableTrack(); if (_aTrackInfo.empty()) { return false; @@ -1121,6 +1004,95 @@ inline void RtspSession::initSender(const std::shared_ptr& session) session->shutdown_l(false); } +static string dateStr(){ + char buf[64]; + time_t tt = time(NULL); + strftime(buf, sizeof buf, "%a, %b %d %Y %H:%M:%S GMT", gmtime(&tt)); + return buf; +} + +bool RtspSession::sendRtspResponse(const string &res_code, + const StrCaseMap &header_const, + const string &sdp, + const char *protocol){ + auto header = header_const; + header.emplace("CSeq",StrPrinter << _iCseq); + if(!_strSession.empty()){ + header.emplace("Session",_strSession); + } + + header.emplace("Server",SERVER_NAME "(build in " __DATE__ " " __TIME__ ")"); + header.emplace("Date",dateStr()); + + if(!sdp.empty()){ + header.emplace("Content-Length",StrPrinter << sdp.size()); + header.emplace("Content-Type","application/sdp"); + } + + _StrPrinter printer; + printer << protocol << " " << res_code << "\r\n"; + for (auto &pr : header){ + printer << pr.first << ": " << pr.second << "\r\n"; + } + + printer << "\r\n"; + + if(!sdp.empty()){ + printer << sdp; + } +// DebugL << printer; + return send(std::make_shared(printer)) > 0 ; +} + +int RtspSession::send(const Buffer::Ptr &pkt){ + _ui64TotalBytes += pkt->size(); + return _pSender->send(pkt,_flags); +} + +bool RtspSession::sendRtspResponse(const string &res_code, + const std::initializer_list &header, + const string &sdp, + const char *protocol) { + string key; + StrCaseMap header_map; + int i = 0; + for(auto &val : header){ + if(++i % 2 == 0){ + header_map.emplace(key,val); + }else{ + key = val; + } + } + return sendRtspResponse(res_code,header_map,sdp,protocol); +} + +inline string RtspSession::printSSRC(uint32_t ui32Ssrc) { + char tmp[9] = { 0 }; + ui32Ssrc = htonl(ui32Ssrc); + uint8_t *pSsrc = (uint8_t *) &ui32Ssrc; + for (int i = 0; i < 4; i++) { + sprintf(tmp + 2 * i, "%02X", pSsrc[i]); + } + return tmp; +} +inline int RtspSession::getTrackIndexByTrackType(TrackType type) { + for (unsigned int i = 0; i < _aTrackInfo.size(); i++) { + if (type == _aTrackInfo[i]->_type) { + return i; + } + } + return -1; +} +inline int RtspSession::getTrackIndexByControlSuffix(const string &controlSuffix) { + for (unsigned int i = 0; i < _aTrackInfo.size(); i++) { + if (controlSuffix == _aTrackInfo[i]->_control_surffix) { + return i; + } + } + return -1; +} + + #ifdef RTSP_SEND_RTCP inline void RtspSession::sendRTCP() { //DebugL; diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index fa728f1d..af3387ab 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -77,27 +77,24 @@ public: void onRecv(const Buffer::Ptr &pBuf) override; void onError(const SockException &err) override; void onManager() override; - protected: //HttpRequestSplitter override int64_t onRecvHeader(const char *data,uint64_t len) override ; + void onRecvContent(const char *data,uint64_t len) override; private: void inputRtspOrRtcp(const char *data,uint64_t len); - int send(const Buffer::Ptr &pkt) override{ - _ui64TotalBytes += pkt->size(); - return _pSender->send(pkt,_flags); - } void shutdown() override ; void shutdown_l(bool close); - bool handleReq_Options(); //处理options方法 - bool handleReq_Describe(); //处理describe方法 - bool handleReq_Setup(); //处理setup方法 - bool handleReq_Play(); //处理play方法 - bool handleReq_Pause(); //处理pause方法 - bool handleReq_Teardown(); //处理teardown方法 - bool handleReq_Get(); //处理Get方法 - bool handleReq_Post(); //处理Post方法 - bool handleReq_SET_PARAMETER(); //处理SET_PARAMETER方法 + int handleReq_Options(); //处理options方法 + int handleReq_Describe(); //处理describe方法 + int handleReq_ANNOUNCE(); //处理options方法 + int handleReq_Setup(); //处理setup方法 + int handleReq_Play(); //处理play方法 + int handleReq_Pause(); //处理pause方法 + int handleReq_Teardown(); //处理teardown方法 + int handleReq_Get(); //处理Get方法 + int handleReq_Post(); //处理Post方法 + int handleReq_SET_PARAMETER(); //处理SET_PARAMETER方法 void inline send_StreamNotFound(); //rtsp资源未找到 void inline send_UnsupportedTransport(); //不支持的传输模式 @@ -106,33 +103,9 @@ private: inline bool findStream(); //根据rtsp url查找 MediaSource实例 inline void findStream(const function &cb); //根据rtsp url查找 MediaSource实例 - inline void initSender(const std::shared_ptr &pSession); //处理rtsp over http,quicktime使用的 - inline void sendRtpPacket(const RtpPacket::Ptr &pkt); - inline string printSSRC(uint32_t ui32Ssrc) { - char tmp[9] = { 0 }; - ui32Ssrc = htonl(ui32Ssrc); - uint8_t *pSsrc = (uint8_t *) &ui32Ssrc; - for (int i = 0; i < 4; i++) { - sprintf(tmp + 2 * i, "%02X", pSsrc[i]); - } - return tmp; - } - inline int getTrackIndexByTrackType(TrackType type) { - for (unsigned int i = 0; i < _aTrackInfo.size(); i++) { - if (type == _aTrackInfo[i]->_type) { - return i; - } - } - return -1; - } - inline int getTrackIndexByControlSuffix(const string &controlSuffix) { - for (unsigned int i = 0; i < _aTrackInfo.size(); i++) { - if (controlSuffix == _aTrackInfo[i]->_control_surffix) { - return i; - } - } - return -1; - } + inline string printSSRC(uint32_t ui32Ssrc); + inline int getTrackIndexByTrackType(TrackType type); + inline int getTrackIndexByControlSuffix(const string &controlSuffix); inline void onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf, const struct sockaddr &addr); inline void startListenPeerUdpData(); @@ -146,10 +119,16 @@ private: void doDelay(int delaySec,const std::function &fun); void cancelDelyaTask(); + + inline void sendRtpPacket(const RtpPacket::Ptr &pkt); + bool sendRtspResponse(const string &res_code,const std::initializer_list &header, const string &sdp = "" , const char *protocol = "RTSP/1.0"); + bool sendRtspResponse(const string &res_code,const StrCaseMap &header = StrCaseMap(), const string &sdp = "",const char *protocol = "RTSP/1.0"); + int send(const Buffer::Ptr &pkt) override; + inline void initSender(const std::shared_ptr &pSession); //处理rtsp over http,quicktime使用的 private: - char *_pcBuf = nullptr; Ticker _ticker; Parser _parser; //rtsp解析类 + int _iCseq = 0; string _strUrl; string _strSdp; string _strSession; @@ -157,31 +136,23 @@ private: MediaInfo _mediaInfo; std::weak_ptr _pMediaSrc; RingBuffer::RingReader::Ptr _pRtpReader; - - PlayerBase::eRtpType _rtpType = PlayerBase::RTP_UDP; - bool _bSetUped = false; - int _iCseq = 0; - - SdpAttr _sdpAttr; + PlayerBase::eRtpType _rtpType = PlayerBase::RTP_Invalid; vector _aTrackInfo; + //RTP over udp bool _bGotAllPeerUdp = false; - -#ifdef RTSP_SEND_RTCP - RtcpCounter _aRtcpCnt[2]; //rtcp统计,trackid idx 为数组下标 - Ticker _aRtcpTicker[2]; //rtcp发送时间,trackid idx 为数组下标 - inline void sendRTCP(); -#endif - - //RTP over UDP bool _abGotPeerUdp[2] = { false, false }; //获取客户端udp端口计数 weak_ptr _apUdpSock[2]; //发送RTP的UDP端口,trackid idx 为数组下标 std::shared_ptr _apPeerUdpAddr[2]; //播放器接收RTP的地址,trackid idx 为数组下标 bool _bListenPeerUdpData = false; + + //RTP over udp_multicast RtpBroadCaster::Ptr _pBrdcaster; //登录认证 string _strNonce; + //消耗的总流量 + uint64_t _ui64TotalBytes = 0; //RTSP over HTTP function _onDestory; @@ -190,18 +161,21 @@ private: //quicktime 请求rtsp会产生两次tcp连接, //一次发送 get 一次发送post,需要通过sessioncookie关联起来 string _strSessionCookie; - - //消耗的总流量 - uint64_t _ui64TotalBytes = 0; - static recursive_mutex g_mtxGetter; //对quicktime上锁保护 static recursive_mutex g_mtxPostter; //对quicktime上锁保护 static unordered_map > g_mapGetter; static unordered_map > g_mapPostter; + function _onContent; std::function _delayTask; uint32_t _iTaskTimeLine = 0; atomic _enableSendRtp; + +#ifdef RTSP_SEND_RTCP + RtcpCounter _aRtcpCnt[2]; //rtcp统计,trackid idx 为数组下标 + Ticker _aRtcpTicker[2]; //rtcp发送时间,trackid idx 为数组下标 + inline void sendRTCP(); +#endif }; } /* namespace mediakit */