优化rtsp服务器代码

修复一个rtp over http的bug
This commit is contained in:
xiongziliang 2018-12-14 14:59:12 +08:00
parent 91d8888784
commit f411ddc23c
6 changed files with 301 additions and 342 deletions

View File

@ -57,14 +57,12 @@ bool loadIniConfig(const char *ini_path = nullptr);
#define CLEAR_ARR(arr) for(auto &item : arr){ item = 0;}
#endif //CLEAR_ARR
#define SERVER_NAME "ZLMediaKit"
#define SERVER_NAME "ZLMediaKit-3.0"
#define VHOST_KEY "vhost"
#define HTTP_SCHEMA "http"
#define RTSP_SCHEMA "rtsp"
#define RTMP_SCHEMA "rtmp"
#define DEFAULT_VHOST "__defaultVhost__"
#define RTSP_VERSION 1.30
#define RTSP_BUILDTIME __DATE__" CST"
////////////广播名称///////////
namespace Broadcast {

View File

@ -54,12 +54,16 @@ void HttpRequestSplitter::input(const char *data,uint64_t len) {
//数据按照请求头处理
const char *index = nullptr;
uint64_t remain = len;
while (_content_len == 0 && remain > 0 && (index = onSearchPacketTail(ptr,remain)) != nullptr) {
_remain_data_size = len;
while (_content_len == 0 && _remain_data_size > 0 && (index = onSearchPacketTail(ptr,_remain_data_size)) != nullptr) {
//_content_len == 0这是请求头
_content_len = onRecvHeader(ptr, index - ptr);
const char *header_ptr = ptr;
int64_t header_size = index - ptr;
ptr = index;
remain = len - (ptr - data);
_remain_data_size = len - (ptr - data);
_content_len = onRecvHeader(header_ptr, header_size);
}
/*
@ -67,7 +71,7 @@ void HttpRequestSplitter::input(const char *data,uint64_t len) {
*/
tail_ref = tail_tmp;
if(remain <= 0){
if(_remain_data_size <= 0){
//没有剩余数据,清空缓存
_remain_data.clear();
return;
@ -75,7 +79,7 @@ void HttpRequestSplitter::input(const char *data,uint64_t len) {
if(_content_len == 0){
//尚未找到http头缓存定位到剩余数据部分
string str(ptr,remain);
string str(ptr,_remain_data_size);
_remain_data = str;
return;
}
@ -83,23 +87,23 @@ void HttpRequestSplitter::input(const char *data,uint64_t len) {
//已经找到http头了
if(_content_len > 0){
//数据按照固定长度content处理
if(remain < _content_len){
if(_remain_data_size < _content_len){
//数据不够,缓存定位到剩余数据部分
string str(ptr,remain);
string str(ptr,_remain_data_size);
_remain_data = str;
return;
}
//收到content数据并且接受content完毕
onRecvContent(ptr,_content_len);
remain -= _content_len;
_remain_data_size -= _content_len;
ptr += _content_len;
//content处理完毕,后面数据当做请求头处理
_content_len = 0;
if(remain > 0){
if(_remain_data_size > 0){
//还有数据没有处理完毕
string str(ptr,remain);
string str(ptr,_remain_data_size);
_remain_data = str;
data = ptr = (char *)_remain_data.data();
@ -112,7 +116,7 @@ void HttpRequestSplitter::input(const char *data,uint64_t len) {
//_content_len < 0;数据按照不固定长度content处理
onRecvContent(ptr,remain);//消费掉所有剩余数据
onRecvContent(ptr,_remain_data_size);//消费掉所有剩余数据
_remain_data.clear();
}
@ -133,6 +137,10 @@ const char *HttpRequestSplitter::onSearchPacketTail(const char *data,int len) {
return pos + 4;
}
int64_t HttpRequestSplitter::remainDataSize() {
return _remain_data_size;
}
} /* namespace mediakit */

View File

@ -81,9 +81,15 @@ protected:
*
*/
void reset();
/**
*
*/
int64_t remainDataSize();
private:
string _remain_data;
int64_t _content_len = 0;
int64_t _remain_data_size = 0;
};
} /* namespace mediakit */

View File

@ -87,6 +87,7 @@ class PlayerBase : public DemuxerBase, public mINI{
public:
typedef std::shared_ptr<PlayerBase> Ptr;
typedef enum {
RTP_Invalid = -1,
RTP_TCP = 0,
RTP_UDP = 1,
RTP_MULTICAST = 2,

View File

@ -25,6 +25,7 @@
*/
#include <atomic>
#include <iomanip>
#include "Common/config.h"
#include "UDPServer.h"
#include "RtspSession.h"
@ -43,17 +44,11 @@ namespace mediakit {
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;
string dateHeader() {
char buf[200];
time_t tt = time(NULL);
strftime(buf, sizeof buf, "Date: %a, %b %d %Y %H:%M:%S GMT\r\n", gmtime(&tt));
return buf;
}
unordered_map<string, weak_ptr<RtspSession> > RtspSession::g_mapGetter;
unordered_map<void *, std::shared_ptr<RtspSession> > RtspSession::g_mapPostter;
recursive_mutex RtspSession::g_mtxGetter; //对quicktime上锁保护
recursive_mutex RtspSession::g_mtxPostter; //对quicktime上锁保护
RtspSession::RtspSession(const std::shared_ptr<ThreadPool> &pTh, const Socket::Ptr &pSock) :
TcpSession(pTh, pSock), _pSender(pSock) {
//设置10秒发送缓存
@ -152,41 +147,40 @@ void RtspSession::onManager() {
int64_t RtspSession::onRecvHeader(const char *header,uint64_t len) {
char tmp[2 * 1024];
_pcBuf = tmp;
_parser.Parse(header); //rtsp请求解析
string strCmd = _parser.Method(); //提取出请求命令字
_iCseq = atoi(_parser["CSeq"].data());
typedef bool (RtspSession::*rtspCMDHandle)();
static unordered_map<string, rtspCMDHandle> g_mapCmd;
typedef int (RtspSession::*rtsp_request_handler)();
static unordered_map<string, rtsp_request_handler> s_handler_map;
static onceToken token( []() {
g_mapCmd.emplace("OPTIONS",&RtspSession::handleReq_Options);
g_mapCmd.emplace("DESCRIBE",&RtspSession::handleReq_Describe);
g_mapCmd.emplace("SETUP",&RtspSession::handleReq_Setup);
g_mapCmd.emplace("PLAY",&RtspSession::handleReq_Play);
g_mapCmd.emplace("PAUSE",&RtspSession::handleReq_Pause);
g_mapCmd.emplace("TEARDOWN",&RtspSession::handleReq_Teardown);
g_mapCmd.emplace("GET",&RtspSession::handleReq_Get);
g_mapCmd.emplace("POST",&RtspSession::handleReq_Post);
g_mapCmd.emplace("SET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
g_mapCmd.emplace("GET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
s_handler_map.emplace("OPTIONS",&RtspSession::handleReq_Options);
s_handler_map.emplace("DESCRIBE",&RtspSession::handleReq_Describe);
s_handler_map.emplace("ANNOUNCE",&RtspSession::handleReq_ANNOUNCE);
s_handler_map.emplace("SETUP",&RtspSession::handleReq_Setup);
s_handler_map.emplace("PLAY",&RtspSession::handleReq_Play);
s_handler_map.emplace("PAUSE",&RtspSession::handleReq_Pause);
s_handler_map.emplace("TEARDOWN",&RtspSession::handleReq_Teardown);
s_handler_map.emplace("GET",&RtspSession::handleReq_Get);
s_handler_map.emplace("POST",&RtspSession::handleReq_Post);
s_handler_map.emplace("SET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
s_handler_map.emplace("GET_PARAMETER",&RtspSession::handleReq_SET_PARAMETER);
}, []() {});
auto it = g_mapCmd.find(strCmd);
if (it != g_mapCmd.end()) {
auto it = s_handler_map.find(strCmd);
int ret = 0;
if (it != s_handler_map.end()) {
auto fun = it->second;
if(!(this->*fun)()){
shutdown();
ret = (this->*fun)();
if(ret == -1){
shutdown();
}
} else{
shutdown();
WarnL << "cmd=" << strCmd;
WarnL << "不支持的rtsp命令:" << strCmd;
}
_parser.Clear();
return 0;
return ret;
}
@ -203,6 +197,7 @@ void RtspSession::onRecv(const Buffer::Ptr &pBuf) {
}
void RtspSession::inputRtspOrRtcp(const char *data,uint64_t len) {
// DebugL << data;
if(data[0] == '$' && _rtpType == PlayerBase::RTP_TCP){
//这是rtcp
return;
@ -210,23 +205,28 @@ void RtspSession::inputRtspOrRtcp(const char *data,uint64_t len) {
input(data,len);
}
bool RtspSession::handleReq_Options() {
//支持这些命令
int n = sprintf(_pcBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Public: OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY,"
" PAUSE, SET_PARAMETER, GET_PARAMETER\r\n\r\n",
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data());
SocketHelper::send(_pcBuf, n);
return true;
int RtspSession::handleReq_Options() {
//支持这些命令
sendRtspResponse("200 OK",{"Public" , "OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, ANNOUNCE, SET_PARAMETER, GET_PARAMETER"});
return 0;
}
bool RtspSession::handleReq_Describe() {
void RtspSession::onRecvContent(const char *data, uint64_t len) {
// DebugL << data;
if(_onContent){
_onContent(data,len);
_onContent = nullptr;
}
}
int RtspSession::handleReq_ANNOUNCE() {
sendRtspResponse("200 OK");
_onContent = [this](const char *data, uint64_t len){
};
return atoi(_parser["Content-Length"].data());
}
int RtspSession::handleReq_Describe() {
{
//解析url获取媒体名称
_strUrl = _parser.Url();
@ -242,8 +242,6 @@ bool RtspSession::handleReq_Describe() {
}
//恢复现场
strongSelf->_parser = parserCopy;
char tmp[2 * 1024];
strongSelf->_pcBuf = tmp;
if(!success){
//未找到相应的MediaSource
@ -273,7 +271,7 @@ bool RtspSession::handleReq_Describe() {
invoker("");
}
});
return true;
return 0;
}
void RtspSession::onAuthSuccess(const weak_ptr<RtspSession> &weakSelf) {
auto strongSelf = weakSelf.lock();
@ -287,22 +285,11 @@ void RtspSession::onAuthSuccess(const weak_ptr<RtspSession> &weakSelf) {
//本对象已销毁
return;
}
char response[2 * 1024];
int n = sprintf(response,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"x-Accept-Retransmit: our-retransmit\r\n"
"x-Accept-Dynamic-Rate: 1\r\n"
"Content-Base: %s/\r\n"
"Content-Type: application/sdp\r\n"
"Content-Length: %d\r\n\r\n%s",
strongSelf->_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), strongSelf->_strUrl.data(),
(int) strongSelf->_strSdp.length(), strongSelf->_strSdp.data());
strongSelf->SocketHelper::send(response, n);
strongSelf->sendRtspResponse("200 OK",
{"Content-Base",strongSelf->_strUrl,
"x-Accept-Retransmit","our-retransmit",
"x-Accept-Dynamic-Rate","1"
},strongSelf->_strSdp);
});
}
void RtspSession::onAuthFailed(const weak_ptr<RtspSession> &weakSelf,const string &realm) {
@ -318,34 +305,21 @@ void RtspSession::onAuthFailed(const weak_ptr<RtspSession> &weakSelf,const strin
return;
}
int n;
char response[2 * 1024];
GET_CONFIG_AND_REGISTER(bool,authBasic,Rtsp::kAuthBasic);
if (!authBasic) {
//我们需要客户端优先以md5方式认证
strongSelf->_strNonce = makeRandStr(32);
n = sprintf(response,
"RTSP/1.0 401 Unauthorized\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"WWW-Authenticate: Digest realm=\"%s\",nonce=\"%s\"\r\n\r\n",
strongSelf->_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), realm.data(), strongSelf->_strNonce.data());
strongSelf->_strNonce = makeRandStr(32);
strongSelf->sendRtspResponse("401 Unauthorized",
{"WWW-Authenticate",
StrPrinter << "Digest realm=\"" << realm << "\",nonce=\"" << strongSelf->_strNonce << "\"" },
strongSelf->_strSdp);
}else {
//当然我们也支持base64认证,但是我们不建议这样做
n = sprintf(response,
"RTSP/1.0 401 Unauthorized\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"WWW-Authenticate: Basic realm=\"%s\"\r\n\r\n",
strongSelf->_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), realm.data());
strongSelf->sendRtspResponse("401 Unauthorized",
{"WWW-Authenticate",
StrPrinter << "Basic realm=\"" << realm << "\"" },
strongSelf->_strSdp);
}
strongSelf->SocketHelper::send(response, n);
});
}
@ -488,62 +462,31 @@ void RtspSession::onAuthUser(const weak_ptr<RtspSession> &weakSelf,const string
}
}
inline void RtspSession::send_StreamNotFound() {
int n = sprintf(_pcBuf,
"RTSP/1.0 404 Stream Not Found\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Connection: Close\r\n\r\n",
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data());
SocketHelper::send(_pcBuf, n);
sendRtspResponse("404 Stream Not Found",{"Connection","Close"});
}
inline void RtspSession::send_UnsupportedTransport() {
int n = sprintf(_pcBuf,
"RTSP/1.0 461 Unsupported Transport\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Connection: Close\r\n\r\n",
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data());
SocketHelper::send(_pcBuf, n);
sendRtspResponse("461 Unsupported Transport",{"Connection","Close"});
}
inline void RtspSession::send_SessionNotFound() {
int n = sprintf(_pcBuf,
"RTSP/1.0 454 Session Not Found\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Connection: Close\r\n\r\n",
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data());
SocketHelper::send(_pcBuf, n);
/*40 Method Not Allowed*/
sendRtspResponse("454 Session Not Found",{"Connection","Close"});
}
bool RtspSession::handleReq_Setup() {
int RtspSession::handleReq_Setup() {
//处理setup命令该函数可能进入多次
auto controlSuffix = _parser.FullUrl().substr(1 + _parser.FullUrl().rfind('/'));
int trackIdx = getTrackIndexByControlSuffix(controlSuffix);
if (trackIdx == -1) {
//未找到相应track
return false;
return -1;
}
SdpTrack::Ptr &trackRef = _aTrackInfo[trackIdx];
if (trackRef->_inited) {
//已经初始化过该Track
return false;
return -1;
}
trackRef->_inited = true; //现在初始化
if(!_bSetUped){
_bSetUped = true;
if(_rtpType == PlayerBase::RTP_Invalid){
auto strTransport = _parser["Transport"];
if(strTransport.find("TCP") != string::npos){
_rtpType = PlayerBase::RTP_TCP;
@ -556,23 +499,13 @@ bool RtspSession::handleReq_Setup() {
switch (_rtpType) {
case PlayerBase::RTP_TCP: {
int iLen = sprintf(_pcBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Transport: RTP/AVP/TCP;unicast;"
"interleaved=%d-%d;ssrc=%s;mode=play\r\n"
"Session: %s\r\n"
"x-Transport-Options: late-tolerance=1.400000\r\n"
"x-Dynamic-Rate: 1\r\n\r\n",
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), trackRef->_type * 2,
trackRef->_type * 2 + 1,
printSSRC(trackRef->_ssrc).data(),
_strSession.data());
SocketHelper::send(_pcBuf, iLen);
sendRtspResponse("200 OK",
{"Transport",StrPrinter << "RTP/AVP/TCP;unicast;"
<< "interleaved=" << trackRef->_type * 2 << "-" << trackRef->_type * 2 + 1 << ";"
<< "ssrc=" << printSSRC(trackRef->_ssrc) << ";mode=play",
"x-Transport-Options" , "late-tolerance=1.400000",
"x-Dynamic-Rate" , "1"
});
}
break;
case PlayerBase::RTP_UDP: {
@ -582,14 +515,14 @@ bool RtspSession::handleReq_Setup() {
//分配端口失败
WarnL << "分配rtp端口失败";
send_NotAcceptable();
return false;
return -1;
}
auto pSockRtcp = UDPServer::Instance().getSock(get_local_ip().data(),2*trackIdx + 1 ,pSockRtp->get_local_port() + 1);
if (!pSockRtcp) {
//分配端口失败
WarnL << "分配rtcp端口失败";
send_NotAcceptable();
return false;
return -1;
}
_apUdpSock[trackIdx] = pSockRtp;
//设置客户端内网端口信息
@ -604,21 +537,13 @@ bool RtspSession::handleReq_Setup() {
//尝试获取客户端nat映射地址
startListenPeerUdpData();
//InfoL << "分配端口:" << srv_port;
int n = sprintf(_pcBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Transport: RTP/AVP/UDP;unicast;"
"client_port=%s;server_port=%d-%d;ssrc=%s;mode=play\r\n"
"Session: %s\r\n\r\n",
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), strClientPort.data(),
pSockRtp->get_local_port(), pSockRtcp->get_local_port(),
printSSRC(trackRef->_ssrc).data(),
_strSession.data());
SocketHelper::send(_pcBuf, n);
sendRtspResponse("200 OK",
{"Transport",StrPrinter << "RTP/AVP/UDP;unicast;"
<< "client_port=" << strClientPort << ";"
<< "server_port=" << pSockRtp->get_local_port() << "-" << pSockRtcp->get_local_port() << ";"
<< "ssrc=" << printSSRC(trackRef->_ssrc) << ";mode=play"
});
}
break;
case PlayerBase::RTP_MULTICAST: {
@ -626,7 +551,7 @@ bool RtspSession::handleReq_Setup() {
_pBrdcaster = RtpBroadCaster::get(get_local_ip(),_mediaInfo._vhost, _mediaInfo._app, _mediaInfo._streamid);
if (!_pBrdcaster) {
send_NotAcceptable();
return false;
return -1;
}
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
_pBrdcaster->setDetachCB(this, [weakSelf]() {
@ -644,56 +569,38 @@ bool RtspSession::handleReq_Setup() {
//分配端口失败
WarnL << "分配rtcp端口失败";
send_NotAcceptable();
return false;
return -1;
}
startListenPeerUdpData();
GET_CONFIG_AND_REGISTER(uint32_t,udpTTL,MultiCast::kUdpTTL);
int n = sprintf(_pcBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Transport: RTP/AVP;multicast;destination=%s;"
"source=%s;port=%d-%d;ttl=%d;ssrc=%s\r\n"
"Session: %s\r\n\r\n",
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), _pBrdcaster->getIP().data(),
get_local_ip().data(), iSrvPort, pSockRtcp->get_local_port(),
udpTTL, printSSRC(trackRef->_ssrc).data(),
_strSession.data());
SocketHelper::send(_pcBuf, n);
sendRtspResponse("200 OK",
{"Transport",StrPrinter << "RTP/AVP;multicast;"
<< "destination=" << _pBrdcaster->getIP() << ";"
<< "source=" << get_local_ip() << ";"
<< "port=" << iSrvPort << "-" << pSockRtcp->get_local_port() << ";"
<< "ttl=" << udpTTL << ";"
<< "ssrc=" << printSSRC(trackRef->_ssrc)
});
}
break;
default:
break;
}
return true;
return 0;
}
bool RtspSession::handleReq_Play() {
int RtspSession::handleReq_Play() {
if (_aTrackInfo.empty() || _parser["Session"] != _strSession) {
send_SessionNotFound();
return false;
return -1;
}
auto strRange = _parser["Range"];
auto onRes = [this,strRange](const string &err){
bool authSuccess = err.empty();
char response[2 * 1024];
_pcBuf = response;
if(!authSuccess){
//第一次play是播放否则是恢复播放。只对播放鉴权
int n = sprintf(_pcBuf,
"RTSP/1.0 401 Unauthorized\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Content-Type: text/plain\r\n"
"Content-Length: %d\r\n\r\n%s",
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(),(int)err.size(),err.data());
SocketHelper::send(_pcBuf,n);
sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err);
shutdown();
return;
}
@ -722,16 +629,8 @@ bool RtspSession::handleReq_Play() {
pMediaSrc->seekTo(0);
}
_bFirstPlay = false;
int iLen = sprintf(_pcBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Session: %s\r\n"
"Range: npt=%.2f-\r\n"
"RTP-Info: ", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), _strSession.data(), pMediaSrc->getTimeStamp(TrackInvalid) / 1000.0);
_StrPrinter rtp_info;
for(auto &track : _aTrackInfo){
if (track->_inited == false) {
//还有track没有setup
@ -742,17 +641,17 @@ bool RtspSession::handleReq_Play() {
track->_seq = pMediaSrc->getSeqence(track->_type);
track->_time_stamp = pMediaSrc->getTimeStamp(track->_type);
iLen += sprintf(_pcBuf + iLen, "url=%s/%s;seq=%d;rtptime=%u,",
_strUrl.data(),
track->_control_surffix.data(),
track->_seq,
track->_time_stamp * (track->_samplerate / 1000));
rtp_info << "url=" << _strUrl << "/" << track->_control_surffix << ";"
<< "seq=" << track->_seq << ";"
<< "rtptime=" << (int)(track->_time_stamp * (track->_samplerate / 1000)) << ",";
}
iLen -= 1;
(_pcBuf)[iLen] = '\0';
iLen += sprintf(_pcBuf + iLen, "\r\n\r\n");
SocketHelper::send(_pcBuf, iLen);
rtp_info.pop_back();
sendRtspResponse("200 OK",
{"Range", StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << pMediaSrc->getTimeStamp(TrackInvalid) / 1000.0,
"RTP-Info",rtp_info
});
_enableSendRtp = true;
@ -816,66 +715,51 @@ bool RtspSession::handleReq_Play() {
//后面是seek或恢复命令不需要鉴权
onRes("");
}
return true;
return 0;
}
bool RtspSession::handleReq_Pause() {
int RtspSession::handleReq_Pause() {
if (_parser["Session"] != _strSession) {
send_SessionNotFound();
return false;
return -1;
}
int n = sprintf(_pcBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Session: %s\r\n\r\n", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), _strSession.data());
SocketHelper::send(_pcBuf, n);
sendRtspResponse("200 OK");
_enableSendRtp = false;
return true;
return 0;
}
bool RtspSession::handleReq_Teardown() {
int n = sprintf(_pcBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Session: %s\r\n\r\n", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), _strSession.data());
SocketHelper::send(_pcBuf, n);
int RtspSession::handleReq_Teardown() {
sendRtspResponse("200 OK");
TraceL << "播放器断开连接!";
return false;
return 0;
}
bool RtspSession::handleReq_Get() {
int RtspSession::handleReq_Get() {
_strSessionCookie = _parser["x-sessioncookie"];
int n = sprintf(_pcBuf,
"HTTP/1.0 200 OK\r\n"
"%s"
"Connection: close\r\n"
"Cache-Control: no-store\r\n"
"Pragma: no-cache\r\n"
"Content-Type: application/x-rtsp-tunnelled\r\n\r\n",
dateHeader().data());
//注册GET
sendRtspResponse("200 OK",
{"Connection","Close",
"Cache-Control","no-store",
"Pragma","no-store",
"Content-Type","application/x-rtsp-tunnelled",
},"","HTTP/1.0");
//注册GET
lock_guard<recursive_mutex> lock(g_mtxGetter);
g_mapGetter[_strSessionCookie] = dynamic_pointer_cast<RtspSession>(shared_from_this());
//InfoL << _strSessionCookie;
SocketHelper::send(_pcBuf, n);
return true;
return 0;
}
bool RtspSession::handleReq_Post() {
int RtspSession::handleReq_Post() {
lock_guard<recursive_mutex> lock(g_mtxGetter);
string sessioncookie = _parser["x-sessioncookie"];
//Poster 找到 Getter
auto it = g_mapGetter.find(sessioncookie);
if (it == g_mapGetter.end()) {
//WarnL << sessioncookie;
return false;
return -1;
}
_bBase64need = true;
//Poster 找到Getter的SOCK
@ -884,35 +768,34 @@ bool RtspSession::handleReq_Post() {
if (!strongSession) {
send_SessionNotFound();
//WarnL;
return false;
return -1;
}
initSender(strongSession);
return true;
auto nextPacketSize = remainDataSize();
if(nextPacketSize > 0){
_onContent = [this](const char *data,uint64_t len){
BufferRaw::Ptr buffer = std::make_shared<BufferRaw>();
buffer->assign(data,len);
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
async([weakSelf,buffer](){
auto strongSelf = weakSelf.lock();
if(strongSelf){
strongSelf->onRecv(buffer);
}
},false);
};
}
return nextPacketSize;
}
bool RtspSession::handleReq_SET_PARAMETER() {
int RtspSession::handleReq_SET_PARAMETER() {
//TraceL<<endl;
int n = sprintf(_pcBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Session: %s\r\n\r\n", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), _strSession.data());
SocketHelper::send(_pcBuf, n);
return true;
sendRtspResponse("200 OK");
return 0;
}
inline void RtspSession::send_NotAcceptable() {
int n = sprintf(_pcBuf,
"RTSP/1.0 406 Not Acceptable\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Connection: Close\r\n\r\n", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data());
SocketHelper::send(_pcBuf, n);
sendRtspResponse("406 Not Acceptable",{"Connection","Close"});
}
void RtspSession::doDelay(int delaySec, const std::function<void()> &fun) {
@ -990,8 +873,8 @@ inline bool RtspSession::findStream() {
return false;
}
_strSdp = pMediaSrc->getSdp();
_sdpAttr.load(_strSdp);
_aTrackInfo = _sdpAttr.getAvailableTrack();
SdpAttr sdpAttr(_strSdp);
_aTrackInfo = sdpAttr.getAvailableTrack();
if (_aTrackInfo.empty()) {
return false;
@ -1121,6 +1004,95 @@ inline void RtspSession::initSender(const std::shared_ptr<RtspSession>& session)
session->shutdown_l(false);
}
static string dateStr(){
char buf[64];
time_t tt = time(NULL);
strftime(buf, sizeof buf, "%a, %b %d %Y %H:%M:%S GMT", gmtime(&tt));
return buf;
}
bool RtspSession::sendRtspResponse(const string &res_code,
const StrCaseMap &header_const,
const string &sdp,
const char *protocol){
auto header = header_const;
header.emplace("CSeq",StrPrinter << _iCseq);
if(!_strSession.empty()){
header.emplace("Session",_strSession);
}
header.emplace("Server",SERVER_NAME "(build in " __DATE__ " " __TIME__ ")");
header.emplace("Date",dateStr());
if(!sdp.empty()){
header.emplace("Content-Length",StrPrinter << sdp.size());
header.emplace("Content-Type","application/sdp");
}
_StrPrinter printer;
printer << protocol << " " << res_code << "\r\n";
for (auto &pr : header){
printer << pr.first << ": " << pr.second << "\r\n";
}
printer << "\r\n";
if(!sdp.empty()){
printer << sdp;
}
// DebugL << printer;
return send(std::make_shared<BufferString>(printer)) > 0 ;
}
int RtspSession::send(const Buffer::Ptr &pkt){
_ui64TotalBytes += pkt->size();
return _pSender->send(pkt,_flags);
}
bool RtspSession::sendRtspResponse(const string &res_code,
const std::initializer_list<string> &header,
const string &sdp,
const char *protocol) {
string key;
StrCaseMap header_map;
int i = 0;
for(auto &val : header){
if(++i % 2 == 0){
header_map.emplace(key,val);
}else{
key = val;
}
}
return sendRtspResponse(res_code,header_map,sdp,protocol);
}
inline string RtspSession::printSSRC(uint32_t ui32Ssrc) {
char tmp[9] = { 0 };
ui32Ssrc = htonl(ui32Ssrc);
uint8_t *pSsrc = (uint8_t *) &ui32Ssrc;
for (int i = 0; i < 4; i++) {
sprintf(tmp + 2 * i, "%02X", pSsrc[i]);
}
return tmp;
}
inline int RtspSession::getTrackIndexByTrackType(TrackType type) {
for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
if (type == _aTrackInfo[i]->_type) {
return i;
}
}
return -1;
}
inline int RtspSession::getTrackIndexByControlSuffix(const string &controlSuffix) {
for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
if (controlSuffix == _aTrackInfo[i]->_control_surffix) {
return i;
}
}
return -1;
}
#ifdef RTSP_SEND_RTCP
inline void RtspSession::sendRTCP() {
//DebugL;

View File

@ -77,27 +77,24 @@ public:
void onRecv(const Buffer::Ptr &pBuf) override;
void onError(const SockException &err) override;
void onManager() override;
protected:
//HttpRequestSplitter override
int64_t onRecvHeader(const char *data,uint64_t len) override ;
void onRecvContent(const char *data,uint64_t len) override;
private:
void inputRtspOrRtcp(const char *data,uint64_t len);
int send(const Buffer::Ptr &pkt) override{
_ui64TotalBytes += pkt->size();
return _pSender->send(pkt,_flags);
}
void shutdown() override ;
void shutdown_l(bool close);
bool handleReq_Options(); //处理options方法
bool handleReq_Describe(); //处理describe方法
bool handleReq_Setup(); //处理setup方法
bool handleReq_Play(); //处理play方法
bool handleReq_Pause(); //处理pause方法
bool handleReq_Teardown(); //处理teardown方法
bool handleReq_Get(); //处理Get方法
bool handleReq_Post(); //处理Post方法
bool handleReq_SET_PARAMETER(); //处理SET_PARAMETER方法
int handleReq_Options(); //处理options方法
int handleReq_Describe(); //处理describe方法
int handleReq_ANNOUNCE(); //处理options方法
int handleReq_Setup(); //处理setup方法
int handleReq_Play(); //处理play方法
int handleReq_Pause(); //处理pause方法
int handleReq_Teardown(); //处理teardown方法
int handleReq_Get(); //处理Get方法
int handleReq_Post(); //处理Post方法
int handleReq_SET_PARAMETER(); //处理SET_PARAMETER方法
void inline send_StreamNotFound(); //rtsp资源未找到
void inline send_UnsupportedTransport(); //不支持的传输模式
@ -106,33 +103,9 @@ private:
inline bool findStream(); //根据rtsp url查找 MediaSource实例
inline void findStream(const function<void(bool)> &cb); //根据rtsp url查找 MediaSource实例
inline void initSender(const std::shared_ptr<RtspSession> &pSession); //处理rtsp over httpquicktime使用的
inline void sendRtpPacket(const RtpPacket::Ptr &pkt);
inline string printSSRC(uint32_t ui32Ssrc) {
char tmp[9] = { 0 };
ui32Ssrc = htonl(ui32Ssrc);
uint8_t *pSsrc = (uint8_t *) &ui32Ssrc;
for (int i = 0; i < 4; i++) {
sprintf(tmp + 2 * i, "%02X", pSsrc[i]);
}
return tmp;
}
inline int getTrackIndexByTrackType(TrackType type) {
for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
if (type == _aTrackInfo[i]->_type) {
return i;
}
}
return -1;
}
inline int getTrackIndexByControlSuffix(const string &controlSuffix) {
for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
if (controlSuffix == _aTrackInfo[i]->_control_surffix) {
return i;
}
}
return -1;
}
inline string printSSRC(uint32_t ui32Ssrc);
inline int getTrackIndexByTrackType(TrackType type);
inline int getTrackIndexByControlSuffix(const string &controlSuffix);
inline void onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf, const struct sockaddr &addr);
inline void startListenPeerUdpData();
@ -146,10 +119,16 @@ private:
void doDelay(int delaySec,const std::function<void()> &fun);
void cancelDelyaTask();
inline void sendRtpPacket(const RtpPacket::Ptr &pkt);
bool sendRtspResponse(const string &res_code,const std::initializer_list<string> &header, const string &sdp = "" , const char *protocol = "RTSP/1.0");
bool sendRtspResponse(const string &res_code,const StrCaseMap &header = StrCaseMap(), const string &sdp = "",const char *protocol = "RTSP/1.0");
int send(const Buffer::Ptr &pkt) override;
inline void initSender(const std::shared_ptr<RtspSession> &pSession); //处理rtsp over httpquicktime使用的
private:
char *_pcBuf = nullptr;
Ticker _ticker;
Parser _parser; //rtsp解析类
int _iCseq = 0;
string _strUrl;
string _strSdp;
string _strSession;
@ -157,31 +136,23 @@ private:
MediaInfo _mediaInfo;
std::weak_ptr<RtspMediaSource> _pMediaSrc;
RingBuffer<RtpPacket::Ptr>::RingReader::Ptr _pRtpReader;
PlayerBase::eRtpType _rtpType = PlayerBase::RTP_UDP;
bool _bSetUped = false;
int _iCseq = 0;
SdpAttr _sdpAttr;
PlayerBase::eRtpType _rtpType = PlayerBase::RTP_Invalid;
vector<SdpTrack::Ptr> _aTrackInfo;
//RTP over udp
bool _bGotAllPeerUdp = false;
#ifdef RTSP_SEND_RTCP
RtcpCounter _aRtcpCnt[2]; //rtcp统计,trackid idx 为数组下标
Ticker _aRtcpTicker[2]; //rtcp发送时间,trackid idx 为数组下标
inline void sendRTCP();
#endif
//RTP over UDP
bool _abGotPeerUdp[2] = { false, false }; //获取客户端udp端口计数
weak_ptr<Socket> _apUdpSock[2]; //发送RTP的UDP端口,trackid idx 为数组下标
std::shared_ptr<struct sockaddr> _apPeerUdpAddr[2]; //播放器接收RTP的地址,trackid idx 为数组下标
bool _bListenPeerUdpData = false;
//RTP over udp_multicast
RtpBroadCaster::Ptr _pBrdcaster;
//登录认证
string _strNonce;
//消耗的总流量
uint64_t _ui64TotalBytes = 0;
//RTSP over HTTP
function<void(void)> _onDestory;
@ -190,18 +161,21 @@ private:
//quicktime 请求rtsp会产生两次tcp连接
//一次发送 get 一次发送post需要通过sessioncookie关联起来
string _strSessionCookie;
//消耗的总流量
uint64_t _ui64TotalBytes = 0;
static recursive_mutex g_mtxGetter; //对quicktime上锁保护
static recursive_mutex g_mtxPostter; //对quicktime上锁保护
static unordered_map<string, weak_ptr<RtspSession> > g_mapGetter;
static unordered_map<void *, std::shared_ptr<RtspSession> > g_mapPostter;
function<void(const char *data,uint64_t len)> _onContent;
std::function<void()> _delayTask;
uint32_t _iTaskTimeLine = 0;
atomic<bool> _enableSendRtp;
#ifdef RTSP_SEND_RTCP
RtcpCounter _aRtcpCnt[2]; //rtcp统计,trackid idx 为数组下标
Ticker _aRtcpTicker[2]; //rtcp发送时间,trackid idx 为数组下标
inline void sendRTCP();
#endif
};
} /* namespace mediakit */