diff --git a/src/Common/config.cpp b/src/Common/config.cpp index 1f8deace..2d9bb1dc 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -62,6 +62,7 @@ const char kBroadcastRtmpPublish[] = "kBroadcastRtmpPublish"; const char kBroadcastFlowReport[] = "kBroadcastFlowReport"; const char kBroadcastReloadConfig[] = "kBroadcastReloadConfig"; const char kBroadcastShellLogin[] = "kBroadcastShellLogin"; +const char kBroadcastNotFoundStream[] = "kBroadcastNotFoundStream"; const char kFlowThreshold[] = "broadcast.flowThreshold"; diff --git a/src/Common/config.h b/src/Common/config.h index a5a351d3..76017fba 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -110,6 +110,10 @@ extern const char kBroadcastShellLogin[]; extern const char kBroadcastFlowReport[]; #define BroadcastFlowReportArgs const MediaInfo &args,const uint64_t &totalBytes,const uint64_t &totalDuration,TcpSession &sender +//未找到流后会广播该事件,请在监听该事件后去拉流或其他方式产生流,这样就能按需拉流了 +extern const char kBroadcastNotFoundStream[]; +#define BroadcastNotFoundStreamArgs const MediaInfo &args,TcpSession &sender + //流量汇报事件流量阈值,单位KB,默认1MB extern const char kFlowThreshold[]; diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 0c7866c5..a498a4a1 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -209,77 +209,73 @@ void RtmpSession::onCmd_deleteStream(AMFDecoder &dec) { throw std::runtime_error(StrPrinter << "Stop publishing." << endl); } -void RtmpSession::doPlayResponse(const string &err,bool tryDelay,const std::shared_ptr &pToken) { - //获取流对象 +void RtmpSession::findStream(const function &cb,bool retry) { auto src = dynamic_pointer_cast(MediaSource::find(RTMP_SCHEMA, _mediaInfo._vhost, _mediaInfo._app, _mediaInfo._streamid, true)); - //是否鉴权成功 - bool authSuccess = err.empty(); - if(authSuccess && !src && tryDelay ){ - //校验成功,但是流不存在而导致的不能播放 - //所以我们注册rtmp注册事件,等rtmp推流端推流成功后再告知播放器开始播放 - auto task_id = this; - auto media_info = _mediaInfo; - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + if(src || !retry){ + cb(src); + return; + } - NoticeCenter::Instance().addListener(task_id,Broadcast::kBroadcastMediaChanged, - [task_id,weakSelf,media_info,pToken](BroadcastMediaChangedArgs){ + //广播未找到流 + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastNotFoundStream,_mediaInfo,*this); - if(bRegist && - schema == media_info._schema && - vhost == media_info._vhost && - app == media_info._app && - stream == media_info._streamid){ - //播发器请求的rtmp流终于注册上了 - auto strongSelf = weakSelf.lock(); - if(!strongSelf) { - return; - } - //切换到自己的线程再回复 - //如果触发 kBroadcastMediaChanged 事件的线程与本RtmpSession绑定的线程相同, - //那么strongSelf->async操作可能是同步操作, - //通过指定参数may_sync为false确保 NoticeCenter::delListener操作延后执行, - //以便防止遍历事件监听对象map时做删除操作 - strongSelf->async([task_id,weakSelf,pToken,media_info](){ - auto strongSelf = weakSelf.lock(); - if(!strongSelf) { - return; - } - DebugL << "收到rtmp注册事件,回复播放器:" << media_info._schema << "/" << media_info._vhost << "/" << media_info._app << "/" << media_info._streamid; - //回复播放器 - strongSelf->doPlayResponse("",false,pToken); - //取消延时任务,防止多次回复 - strongSelf->cancelDelyaTask(); - - //取消事件监听 - //在事件触发时不能在当前线程移除事件监听,否则会导致遍历map时做删除操作导致程序崩溃 - NoticeCenter::Instance().delListener(task_id,Broadcast::kBroadcastMediaChanged); - }, false); - } - }); - - //5秒后执行延时任务 - doDelay(5,[task_id,weakSelf,pToken](){ - //取消监听该事件,该延时任务可以在本对象析构时或到达指定延时后调用 - //所以该对象在销毁前一定会被取消事件监听 - NoticeCenter::Instance().delListener(task_id,Broadcast::kBroadcastMediaChanged); + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + auto task_id = this; + auto media_info = _mediaInfo; + auto onRegist = [task_id, weakSelf, media_info, cb](BroadcastMediaChangedArgs) { + if(bRegist && + schema == media_info._schema && + vhost == media_info._vhost && + app == media_info._app && + stream == media_info._streamid){ + //播发器请求的rtmp流终于注册上了 auto strongSelf = weakSelf.lock(); if(!strongSelf) { return; } - //5秒后,我们不管流有没有注册上都回复播放器 - strongSelf->doPlayResponse("",false,pToken); - }); - return; - } + //切换到自己的线程再回复 + //如果触发 kBroadcastMediaChanged 事件的线程与本RtmpSession绑定的线程相同, + //那么strongSelf->async操作可能是同步操作, + //通过指定参数may_sync为false确保 NoticeCenter::delListener操作延后执行, + //以便防止遍历事件监听对象map时做删除操作 + strongSelf->async([task_id,weakSelf,media_info,cb](){ + auto strongSelf = weakSelf.lock(); + if(!strongSelf) { + return; + } + DebugL << "收到rtmp注册事件,回复播放器:" << media_info._schema << "/" << media_info._vhost << "/" << media_info._app << "/" << media_info._streamid; - ///////回复流程/////// + //再找一遍媒体源,一般能找到 + strongSelf->findStream(cb,false); - //是否播放成功 + //取消延时任务,防止多次回复 + strongSelf->cancelDelyaTask(); + + //取消事件监听 + //在事件触发时不能在当前线程移除事件监听,否则会导致遍历map时做删除操作导致程序崩溃 + NoticeCenter::Instance().delListener(task_id,Broadcast::kBroadcastMediaChanged); + }, false); + } + }; + + NoticeCenter::Instance().addListener(task_id, Broadcast::kBroadcastMediaChanged, onRegist); + //5秒后执行失败回调 + doDelay(5, [cb,task_id]() { + //取消监听该事件,该延时任务可以在本对象析构时或到达指定延时后调用 + //所以该对象在销毁前一定会被取消事件监听 + NoticeCenter::Instance().delListener(task_id,Broadcast::kBroadcastMediaChanged); + cb(nullptr); + }); + +} + +void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr &src){ + bool authSuccess = err.empty(); bool ok = (src.operator bool() && authSuccess); if (ok) { //stream begin @@ -294,8 +290,7 @@ void RtmpSession::doPlayResponse(const string &err,bool tryDelay,const std::shar status.set("clientid", "0"); sendReply("onStatus", nullptr, status); if (!ok) { - WarnL << "onPlayed:" - << (authSuccess ? "No such stream:" : err.data()) << " " + WarnL << (authSuccess ? "No such stream:" : err.data()) << " " << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid @@ -377,6 +372,25 @@ void RtmpSession::doPlayResponse(const string &err,bool tryDelay,const std::shar SockUtil::setNoDelay(_sock->rawFD(),false); } +void RtmpSession::doPlayResponse(const string &err,const std::function &cb){ + if(!err.empty()){ + //鉴权失败,直接返回播放失败 + sendPlayResponse(err, nullptr); + cb(false); + return; + } + + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + //鉴权成功,查找媒体源并回复 + findStream([weakSelf,cb](const RtmpMediaSource::Ptr &src){ + auto strongSelf = weakSelf.lock(); + if(strongSelf){ + strongSelf->sendPlayResponse("", src); + } + cb(src.operator bool()); + }); +} + void RtmpSession::doPlay(AMFDecoder &dec){ std::shared_ptr pTicker(new Ticker); std::shared_ptr pToken(new onceToken(nullptr,[pTicker](){ @@ -393,13 +407,13 @@ void RtmpSession::doPlay(AMFDecoder &dec){ if(!strongSelf){ return; } - strongSelf->doPlayResponse(err,true,pToken); + strongSelf->doPlayResponse(err,[pToken](bool){}); }); }; auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,*this); if(!flag){ //该事件无人监听,默认不鉴权 - doPlayResponse("",true,pToken); + doPlayResponse("",[pToken](bool){}); } } void RtmpSession::onCmd_play2(AMFDecoder &dec) { diff --git a/src/Rtmp/RtmpSession.h b/src/Rtmp/RtmpSession.h index dc5e8122..5091d300 100644 --- a/src/Rtmp/RtmpSession.h +++ b/src/Rtmp/RtmpSession.h @@ -60,7 +60,9 @@ private: void onCmd_play(AMFDecoder &dec); void onCmd_play2(AMFDecoder &dec); void doPlay(AMFDecoder &dec); - void doPlayResponse(const string &err,bool tryDelay,const std::shared_ptr &token); + void doPlayResponse(const string &err,const std::function &cb); + void sendPlayResponse(const string &err,const RtmpMediaSource::Ptr &src); + void onCmd_seek(AMFDecoder &dec); void onCmd_pause(AMFDecoder &dec); void setMetaData(AMFDecoder &dec); @@ -87,6 +89,7 @@ private: void doDelay(int delaySec,const std::function &fun); void cancelDelyaTask(); + void findStream(const function &cb ,bool retry = true); private: std::string _strTcUrl; MediaInfo _mediaInfo; diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index c06b9b2a..9feb90f8 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -247,6 +247,7 @@ bool RtspSession::handleReq_Describe() { if(!success){ //未找到相应的MediaSource + WarnL << "No such stream:" << strongSelf->_mediaInfo._vhost << " " << strongSelf->_mediaInfo._app << " " << strongSelf->_mediaInfo._streamid; strongSelf->send_StreamNotFound(); strongSelf->shutdown(); return; @@ -933,6 +934,9 @@ void RtspSession::findStream(const function &cb) { return; } + //广播未找到流 + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastNotFoundStream,_mediaInfo,*this); + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); auto task_id = this; auto media_info = _mediaInfo; @@ -943,7 +947,7 @@ void RtspSession::findStream(const function &cb) { vhost == media_info._vhost && app == media_info._app && stream == media_info._streamid) { - //播发器请求的rtmp流终于注册上了 + //播发器请求的rtsp流终于注册上了 auto strongSelf = weakSelf.lock(); if (!strongSelf) { return; @@ -983,7 +987,6 @@ inline bool RtspSession::findStream() { RtspMediaSource::Ptr pMediaSrc = dynamic_pointer_cast( MediaSource::find(RTSP_SCHEMA,_mediaInfo._vhost, _mediaInfo._app,_mediaInfo._streamid) ); if (!pMediaSrc) { - WarnL << "No such stream:" << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid; return false; } _strSdp = pMediaSrc->getSdp();