diff --git a/src/Common/Device.cpp b/src/Common/Device.cpp index 67ab696e..f47bffd4 100644 --- a/src/Common/Device.cpp +++ b/src/Common/Device.cpp @@ -35,9 +35,9 @@ using namespace toolkit; namespace mediakit { -DevChannel::DevChannel(const char *strVhost, - const char *strApp, - const char *strId, +DevChannel::DevChannel(const string &strVhost, + const string &strApp, + const string &strId, float fDuration, bool bEanbleHls, bool bEnableMp4) : diff --git a/src/Common/Device.h b/src/Common/Device.h index 5fc933cc..c1d6339f 100644 --- a/src/Common/Device.h +++ b/src/Common/Device.h @@ -70,9 +70,9 @@ class DevChannel : public MultiMediaSourceMuxer{ public: typedef std::shared_ptr Ptr; //fDuration<=0为直播,否则为点播 - DevChannel(const char *strVhost, - const char *strApp, - const char *strId, + DevChannel(const string &strVhost, + const string &strApp, + const string &strId, float fDuration = 0, bool bEanbleHls = true, bool bEnableMp4 = false); diff --git a/src/Extension/Factory.cpp b/src/Extension/Factory.cpp index f5346c96..3e1fe431 100644 --- a/src/Extension/Factory.cpp +++ b/src/Extension/Factory.cpp @@ -82,9 +82,9 @@ Sdp::Ptr Factory::getSdpByTrack(const Track::Ptr &track) { Track::Ptr Factory::getTrackBySdp(const SdpTrack::Ptr &track) { if (strcasecmp(track->_codec.data(), "mpeg4-generic") == 0) { - string aac_cfg_str = FindField(track->_fmtp.c_str(), "config=", nullptr); + string aac_cfg_str = FindField(track->_fmtp.data(), "config=", nullptr); if (aac_cfg_str.size() != 4) { - aac_cfg_str = FindField(track->_fmtp.c_str(), "config=", ";"); + aac_cfg_str = FindField(track->_fmtp.data(), "config=", ";"); } if (aac_cfg_str.size() != 4) { //延后获取adts头 @@ -93,12 +93,12 @@ Track::Ptr Factory::getTrackBySdp(const SdpTrack::Ptr &track) { string aac_cfg; unsigned int cfg1; - sscanf(aac_cfg_str.substr(0, 2).c_str(), "%02X", &cfg1); + sscanf(aac_cfg_str.substr(0, 2).data(), "%02X", &cfg1); cfg1 &= 0x00FF; aac_cfg.push_back(cfg1); unsigned int cfg2; - sscanf(aac_cfg_str.substr(2, 2).c_str(), "%02X", &cfg2); + sscanf(aac_cfg_str.substr(2, 2).data(), "%02X", &cfg2); cfg2 &= 0x00FF; aac_cfg.push_back(cfg2); @@ -106,12 +106,12 @@ Track::Ptr Factory::getTrackBySdp(const SdpTrack::Ptr &track) { } if (strcasecmp(track->_codec.data(), "h264") == 0) { - string sps_pps = FindField(track->_fmtp.c_str(), "sprop-parameter-sets=", nullptr); + string sps_pps = FindField(track->_fmtp.data(), "sprop-parameter-sets=", nullptr); if(sps_pps.empty()){ return std::make_shared(); } - string base64_SPS = FindField(sps_pps.c_str(), NULL, ","); - string base64_PPS = FindField(sps_pps.c_str(), ",", NULL); + string base64_SPS = FindField(sps_pps.data(), NULL, ","); + string base64_PPS = FindField(sps_pps.data(), ",", NULL); if(base64_PPS.back() == ';'){ base64_PPS.pop_back(); } @@ -125,13 +125,13 @@ Track::Ptr Factory::getTrackBySdp(const SdpTrack::Ptr &track) { //a=fmtp:96 sprop-sps=QgEBAWAAAAMAsAAAAwAAAwBdoAKAgC0WNrkky/AIAAADAAgAAAMBlQg=; sprop-pps=RAHA8vA8kAA= int pt; char sprop_vps[128] = {0},sprop_sps[128] = {0},sprop_pps[128] = {0}; - if (4 == sscanf(track->_fmtp.c_str(), "%d sprop-vps=%127[^;]; sprop-sps=%127[^;]; sprop-pps=%127[^;]", &pt, sprop_vps,sprop_sps, sprop_pps)) { + if (4 == sscanf(track->_fmtp.data(), "%d sprop-vps=%127[^;]; sprop-sps=%127[^;]; sprop-pps=%127[^;]", &pt, sprop_vps,sprop_sps, sprop_pps)) { auto vps = decodeBase64(sprop_vps); auto sps = decodeBase64(sprop_sps); auto pps = decodeBase64(sprop_pps); return std::make_shared(vps,sps,pps,0,0,0); } - if (3 == sscanf(track->_fmtp.c_str(), "%d sprop-sps=%127[^;]; sprop-pps=%127[^;]", &pt,sprop_sps, sprop_pps)) { + if (3 == sscanf(track->_fmtp.data(), "%d sprop-sps=%127[^;]; sprop-pps=%127[^;]", &pt,sprop_sps, sprop_pps)) { auto sps = decodeBase64(sprop_sps); auto pps = decodeBase64(sprop_pps); return std::make_shared("",sps,pps,0,0,0); diff --git a/src/Http/HttpDownloader.cpp b/src/Http/HttpDownloader.cpp index 4d96f2df..219c0cfb 100644 --- a/src/Http/HttpDownloader.cpp +++ b/src/Http/HttpDownloader.cpp @@ -71,7 +71,7 @@ int64_t HttpDownloader::onResponseHeader(const string& status,const HttpHeader& File::delete_file(_filePath.data()); if(_onResult){ auto errMsg = StrPrinter << "Http Status:" << status << endl; - _onResult(Err_other,errMsg.data(),_filePath.data()); + _onResult(Err_other,errMsg,_filePath); _onResult = nullptr; } } @@ -90,7 +90,7 @@ void HttpDownloader::onResponseCompleted() { //InfoL << "md5Sum:" << getMd5Sum(_filePath); _bDownloadSuccess = true; if(_onResult){ - _onResult(Err_success,"success",_filePath.data()); + _onResult(Err_success,"success",_filePath); _onResult = nullptr; } } @@ -101,7 +101,7 @@ void HttpDownloader::onDisconnect(const SockException &ex) { File::delete_file(_filePath.data()); } if(_onResult){ - _onResult(ex.getErrCode(),ex.what(),_filePath.data()); + _onResult(ex.getErrCode(),ex.what(),_filePath); _onResult = nullptr; } } diff --git a/src/Http/HttpDownloader.h b/src/Http/HttpDownloader.h index 5f8d0d56..fa35c300 100644 --- a/src/Http/HttpDownloader.h +++ b/src/Http/HttpDownloader.h @@ -34,7 +34,7 @@ namespace mediakit { class HttpDownloader: public HttpClientImp { public: typedef std::shared_ptr Ptr; - typedef std::function onDownloadResult; + typedef std::function onDownloadResult; HttpDownloader(); virtual ~HttpDownloader(); //开始下载文件,默认断点续传方式下载 diff --git a/src/MediaFile/HLSMaker.cpp b/src/MediaFile/HLSMaker.cpp index f5d2ee5f..00a5ed44 100644 --- a/src/MediaFile/HLSMaker.cpp +++ b/src/MediaFile/HLSMaker.cpp @@ -114,7 +114,7 @@ bool HLSMaker::write_index_file(int iFirstSegment, unsigned int uiLastSegment, i sizeof(acWriteBuf), "#EXTINF:%.3f,\r\n%s-%u.ts\r\n", _iDurations[i-iFirstSegment]/1000.0, - _strFileName.c_str(), + _strFileName.data(), i); if (fwrite(acWriteBuf, strlen(acWriteBuf), 1, pM3u8File.get()) != 1) { WarnL << "Could not write to m3u8 index file, will not continue writing to index file"; diff --git a/src/MediaFile/MediaReader.cpp b/src/MediaFile/MediaReader.cpp index 9aa72345..19a86536 100644 --- a/src/MediaFile/MediaReader.cpp +++ b/src/MediaFile/MediaReader.cpp @@ -127,7 +127,7 @@ MediaReader::MediaReader(const string &strVhost,const string &strApp, const stri } _iDuration = MAX(_video_ms,_audio_ms); - _mediaMuxer.reset(new MultiMediaSourceMuxer(strVhost.data(),strApp.data(),strId.data(),_iDuration/1000.0,false, false)); + _mediaMuxer.reset(new MultiMediaSourceMuxer(strVhost,strApp,strId,_iDuration/1000.0,false, false)); if (_audio_trId != MP4_INVALID_TRACK_ID) { AACTrack::Ptr track = std::make_shared(_strAacCfg); _mediaMuxer->addTrack(track); diff --git a/src/MediaFile/TSMaker.cpp b/src/MediaFile/TSMaker.cpp index 52ea0ae4..cfc18cd4 100644 --- a/src/MediaFile/TSMaker.cpp +++ b/src/MediaFile/TSMaker.cpp @@ -93,7 +93,7 @@ void TSMaker::flush() { bool TSMaker::init(const string& filename, uint32_t bufsize) { m_strFilename = filename; if (m_pOutVideoTs == NULL) { - m_pOutVideoTs = File::createfile_file(filename.c_str(), "wb"); + m_pOutVideoTs = File::createfile_file(filename.data(), "wb"); if (m_pOutVideoTs == NULL) { return false; } diff --git a/src/Player/MediaPlayer.cpp b/src/Player/MediaPlayer.cpp index f54c51a5..41ab3d08 100644 --- a/src/Player/MediaPlayer.cpp +++ b/src/Player/MediaPlayer.cpp @@ -37,7 +37,7 @@ MediaPlayer::MediaPlayer() { MediaPlayer::~MediaPlayer() { } -void MediaPlayer::play(const char* strUrl) { +void MediaPlayer::play(const string &strUrl) { _parser = PlayerBase::createPlayer(strUrl); _parser->setOnShutdown(_shutdownCB); _parser->setOnPlayResult(_playResultCB); diff --git a/src/Player/MediaPlayer.h b/src/Player/MediaPlayer.h index 4f0b00a1..d889ce21 100644 --- a/src/Player/MediaPlayer.h +++ b/src/Player/MediaPlayer.h @@ -43,7 +43,7 @@ public: MediaPlayer(); virtual ~MediaPlayer(); - void play(const char* strUrl) override; + void play(const string &strUrl) override; void pause(bool bPause) override; void teardown() override; EventPoller::Ptr getPoller(); diff --git a/src/Player/PlayerBase.cpp b/src/Player/PlayerBase.cpp index ef891bad..9276d0ca 100644 --- a/src/Player/PlayerBase.cpp +++ b/src/Player/PlayerBase.cpp @@ -45,14 +45,14 @@ const char PlayerBase::kBeatIntervalMS[] = "beat_interval_ms"; const char PlayerBase::kMaxAnalysisMS[] = "max_analysis_ms"; - PlayerBase::Ptr PlayerBase::createPlayer(const char* strUrl) { +PlayerBase::Ptr PlayerBase::createPlayer(const string &strUrl) { static auto releasePlayer = [](PlayerBase *ptr){ onceToken token(nullptr,[&](){ delete ptr; }); ptr->teardown(); }; - string prefix = FindField(strUrl, NULL, "://"); + string prefix = FindField(strUrl.data(), NULL, "://"); if (strcasecmp("rtsp",prefix.data()) == 0) { return PlayerBase::Ptr(new RtspPlayerImp(),releasePlayer); } diff --git a/src/Player/PlayerBase.h b/src/Player/PlayerBase.h index ed7399a5..f222ae8e 100644 --- a/src/Player/PlayerBase.h +++ b/src/Player/PlayerBase.h @@ -86,13 +86,7 @@ public: class PlayerBase : public DemuxerBase, public mINI{ public: typedef std::shared_ptr Ptr; - typedef enum { - RTP_Invalid = -1, - RTP_TCP = 0, - RTP_UDP = 1, - RTP_MULTICAST = 2, - } eRtpType; - static Ptr createPlayer(const char* strUrl); + static Ptr createPlayer(const string &strUrl); //指定网卡ip static const char kNetAdapter[]; @@ -122,7 +116,7 @@ public: * 开始播放 * @param strUrl 视频url,支持rtsp/rtmp */ - virtual void play(const char* strUrl) {} + virtual void play(const string &strUrl) {} /** * 暂停或恢复 diff --git a/src/Player/PlayerProxy.cpp b/src/Player/PlayerProxy.cpp index a4c1833d..05612171 100644 --- a/src/Player/PlayerProxy.cpp +++ b/src/Player/PlayerProxy.cpp @@ -61,9 +61,9 @@ static uint8_t s_mute_adts[] = {0xff, 0xf1, 0x6c, 0x40, 0x2d, 0x3f, 0xfc, 0x00, #define MUTE_ADTS_DATA_LEN sizeof(s_mute_adts) #define MUTE_ADTS_DATA_MS 130 -PlayerProxy::PlayerProxy(const char *strVhost, - const char *strApp, - const char *strSrc, +PlayerProxy::PlayerProxy(const string &strVhost, + const string &strApp, + const string &strSrc, bool bEnableHls, bool bEnableMp4, int iRetryCount){ @@ -74,10 +74,9 @@ PlayerProxy::PlayerProxy(const char *strVhost, _bEnableMp4 = bEnableMp4; _iRetryCount = iRetryCount; } -void PlayerProxy::play(const char* strUrl) { +void PlayerProxy::play(const string &strUrlTmp) { weak_ptr weakSelf = shared_from_this(); std::shared_ptr piFailedCnt(new int(0)); //连续播放失败次数 - string strUrlTmp(strUrl); setOnPlayResult([weakSelf,strUrlTmp,piFailedCnt](const SockException &err) { auto strongSelf = weakSelf.lock(); if(!strongSelf) { @@ -109,7 +108,7 @@ void PlayerProxy::play(const char* strUrl) { strongSelf->rePlay(strUrlTmp,(*piFailedCnt)++); } }); - MediaPlayer::play(strUrl); + MediaPlayer::play(strUrlTmp); } PlayerProxy::~PlayerProxy() { @@ -126,7 +125,7 @@ void PlayerProxy::rePlay(const string &strUrl,int iFailedCnt){ return false; } WarnL << "重试播放[" << iFailedCnt << "]:" << strUrl; - strongPlayer->MediaPlayer::play(strUrl.data()); + strongPlayer->MediaPlayer::play(strUrl); return false; }, nullptr); } @@ -170,7 +169,7 @@ private: }; void PlayerProxy::onPlaySuccess() { - _mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost.data(),_strApp.data(),_strSrc.data(),getDuration(),_bEnableHls,_bEnableMp4)); + _mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost,_strApp,_strSrc,getDuration(),_bEnableHls,_bEnableMp4)); _mediaMuxer->setListener(shared_from_this()); auto videoTrack = getTrack(TrackVideo,false); diff --git a/src/Player/PlayerProxy.h b/src/Player/PlayerProxy.h index 6bbd5746..0cf330fd 100644 --- a/src/Player/PlayerProxy.h +++ b/src/Player/PlayerProxy.h @@ -46,16 +46,16 @@ public: //如果iRetryCount<0,则一直重试播放;否则重试iRetryCount次数 //默认一直重试 - PlayerProxy(const char *strVhost, - const char *strApp, - const char *strSrc, + PlayerProxy(const string &strVhost, + const string &strApp, + const string &strSrc, bool bEnableHls = true, bool bEnableMp4 = false, int iRetryCount = -1); virtual ~PlayerProxy(); - void play(const char* strUrl) override; + void play(const string &strUrl) override; bool close() override; private: void rePlay(const string &strUrl,int iFailedCnt); diff --git a/src/Pusher/MediaPusher.cpp b/src/Pusher/MediaPusher.cpp new file mode 100644 index 00000000..16a08925 --- /dev/null +++ b/src/Pusher/MediaPusher.cpp @@ -0,0 +1,66 @@ +/* +* MIT License +* +* Copyright (c) 2016 xiongziliang <771730766@qq.com> +* +* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). +* +* Permission is hereby granted, free of charge, to any person obtaining a copy +* of this software and associated documentation files (the "Software"), to deal +* in the Software without restriction, including without limitation the rights +* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +* copies of the Software, and to permit persons to whom the Software is +* furnished to do so, subject to the following conditions: +* +* The above copyright notice and this permission notice shall be included in all +* copies or substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +* SOFTWARE. +*/ + +#include +#include "MediaPusher.h" +#include "PusherBase.h" + +using namespace toolkit; + +namespace mediakit { + +MediaPusher::MediaPusher(const MediaSource::Ptr &src) { + _src = src; +} + +MediaPusher::MediaPusher(const string &schema, + const string &strVhost, + const string &strApp, + const string &strStream) { + _src = MediaSource::find(schema,strVhost,strApp,strStream); +} + +MediaPusher::~MediaPusher() { +} +void MediaPusher::publish(const string &strUrl) { + _parser = PusherBase::createPusher(_src,strUrl); + _parser->setOnShutdown(_shutdownCB); + _parser->setOnPublished(_publishCB); + _parser->mINI::operator=(*this); + _parser->publish(strUrl); +} + +EventPoller::Ptr MediaPusher::getPoller(){ + auto parser = dynamic_pointer_cast(_parser); + if(!parser){ + return nullptr; + } + return parser->getPoller(); +} + + + +} /* namespace mediakit */ diff --git a/src/Pusher/MediaPusher.h b/src/Pusher/MediaPusher.h new file mode 100644 index 00000000..924a175d --- /dev/null +++ b/src/Pusher/MediaPusher.h @@ -0,0 +1,59 @@ +/* +* MIT License +* +* Copyright (c) 2016 xiongziliang <771730766@qq.com> +* +* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). +* +* Permission is hereby granted, free of charge, to any person obtaining a copy +* of this software and associated documentation files (the "Software"), to deal +* in the Software without restriction, including without limitation the rights +* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +* copies of the Software, and to permit persons to whom the Software is +* furnished to do so, subject to the following conditions: +* +* The above copyright notice and this permission notice shall be included in all +* copies or substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +* SOFTWARE. +*/ + +#ifndef SRC_PUSHER_MEDIAPUSHER_H_ +#define SRC_PUSHER_MEDIAPUSHER_H_ + +#include +#include +#include "PusherBase.h" +#include "Thread/TaskExecutor.h" +using namespace toolkit; + +namespace mediakit { + +class MediaPusher : public PusherImp { +public: + typedef std::shared_ptr Ptr; + + MediaPusher(const string &schema, + const string &strVhost, + const string &strApp, + const string &strStream); + + MediaPusher(const MediaSource::Ptr &src); + + virtual ~MediaPusher(); + void publish(const string &strUrl) override; + EventPoller::Ptr getPoller(); + +private: + MediaSource::Ptr _src; +}; + +} /* namespace mediakit */ + +#endif /* SRC_PUSHER_MEDIAPUSHER_H_ */ diff --git a/src/Pusher/PusherBase.cpp b/src/Pusher/PusherBase.cpp new file mode 100644 index 00000000..e42d4d2c --- /dev/null +++ b/src/Pusher/PusherBase.cpp @@ -0,0 +1,72 @@ +/* +* MIT License +* +* Copyright (c) 2016 xiongziliang <771730766@qq.com> +* +* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). +* +* Permission is hereby granted, free of charge, to any person obtaining a copy +* of this software and associated documentation files (the "Software"), to deal +* in the Software without restriction, including without limitation the rights +* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +* copies of the Software, and to permit persons to whom the Software is +* furnished to do so, subject to the following conditions: +* +* The above copyright notice and this permission notice shall be included in all +* copies or substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +* SOFTWARE. +*/ + +#include +#include "PusherBase.h" +#include "Rtsp/Rtsp.h" +#include "Rtsp/RtspPusher.h" +#include "Rtmp/RtmpPusher.h" + +using namespace toolkit; + +namespace mediakit { + +const char PusherBase::kNetAdapter[] = "net_adapter"; +const char PusherBase::kRtpType[] = "rtp_type"; +const char PusherBase::kRtspUser[] = "rtsp_user" ; +const char PusherBase::kRtspPwd[] = "rtsp_pwd"; +const char PusherBase::kRtspPwdIsMD5[] = "rtsp_pwd_md5"; + +const char PusherBase::kPlayTimeoutMS[] = "play_timeout_ms"; +const char PusherBase::kMediaTimeoutMS[] = "media_timeout_ms"; +const char PusherBase::kBeatIntervalMS[] = "beat_interval_ms"; + + +PusherBase::Ptr PusherBase::createPusher(const MediaSource::Ptr &src, + const string & strUrl) { + static auto releasePusher = [](PusherBase *ptr){ + onceToken token(nullptr,[&](){ + delete ptr; + }); + ptr->teardown(); + }; + string prefix = FindField(strUrl.data(), NULL, "://"); + if (strcasecmp("rtsp",prefix.data()) == 0) { + return PusherBase::Ptr(new RtspPusher(dynamic_pointer_cast(src)),releasePusher); + } + if (strcasecmp("rtmp",prefix.data()) == 0) { + return PusherBase::Ptr(new RtmpPusher(dynamic_pointer_cast(src)),releasePusher); + } + return PusherBase::Ptr(new RtspPusher(dynamic_pointer_cast(src)),releasePusher); +} + +PusherBase::PusherBase() { + this->mINI::operator[](kPlayTimeoutMS) = 10000; + this->mINI::operator[](kMediaTimeoutMS) = 5000; + this->mINI::operator[](kBeatIntervalMS) = 5000; +} + +} /* namespace mediakit */ diff --git a/src/Pusher/PusherBase.h b/src/Pusher/PusherBase.h new file mode 100644 index 00000000..fc809d74 --- /dev/null +++ b/src/Pusher/PusherBase.h @@ -0,0 +1,152 @@ +/* +* MIT License +* +* Copyright (c) 2016 xiongziliang <771730766@qq.com> +* +* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). +* +* Permission is hereby granted, free of charge, to any person obtaining a copy +* of this software and associated documentation files (the "Software"), to deal +* in the Software without restriction, including without limitation the rights +* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +* copies of the Software, and to permit persons to whom the Software is +* furnished to do so, subject to the following conditions: +* +* The above copyright notice and this permission notice shall be included in all +* copies or substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +* SOFTWARE. +*/ + +#ifndef SRC_PUSHER_PUSHERBASE_H_ +#define SRC_PUSHER_PUSHERBASE_H_ + +#include +#include +#include +#include +#include "Network/Socket.h" +#include "Util/mini.h" +#include "Common/MediaSource.h" +using namespace toolkit; + +namespace mediakit { + + +class PusherBase : public mINI{ +public: + typedef std::shared_ptr Ptr; + + static Ptr createPusher(const MediaSource::Ptr &src, + const string &strUrl); + + //指定网卡ip + static const char kNetAdapter[]; + //设置rtp传输类型,可选项有0(tcp,默认)、1(udp)、2(组播) + //设置方法:player[PusherBase::kRtpType] = 0/1/2; + static const char kRtpType[]; + //rtsp认证用户名 + static const char kRtspUser[]; + //rtsp认证用用户密码,可以是明文也可以是md5,md5密码生成方式 md5(username:realm:password) + static const char kRtspPwd[]; + //rtsp认证用用户密码是否为md5类型 + static const char kRtspPwdIsMD5[]; + //播放超时时间,默认10,000 毫秒 + static const char kPlayTimeoutMS[]; + //rtp/rtmp包接收超时时间,默认5000秒 + static const char kMediaTimeoutMS[]; + //rtsp/rtmp心跳时间,默认5000毫秒 + static const char kBeatIntervalMS[]; + + typedef std::function Event; + + PusherBase(); + virtual ~PusherBase(){} + + /** + * 开始推流 + * @param strUrl 视频url,支持rtsp/rtmp + */ + virtual void publish(const string &strUrl) = 0; + + /** + * 中断推流 + */ + virtual void teardown() = 0; + + /** + * 摄像推流结果回调 + * @param onPublished + */ + virtual void setOnPublished(const Event &cb) = 0; + + /** + * 设置断开回调 + * @param onShutdown + */ + virtual void setOnShutdown(const Event &cb) = 0; +}; + +template +class PusherImp : public Parent { +public: + typedef std::shared_ptr Ptr; + PusherImp(){} + virtual ~PusherImp(){} + + /** + * 开始推流 + * @param strUrl 推流url,支持rtsp/rtmp + */ + void publish(const string &strUrl) override{ + if (_parser) { + _parser->publish(strUrl); + } + } + + /** + * 中断推流 + */ + void teardown() override{ + if (_parser) { + _parser->teardown(); + } + } + + /** + * 摄像推流结果回调 + * @param onPublished + */ + void setOnPublished(const PusherBase::Event &cb) override{ + if (_parser) { + _parser->setOnPublished(cb); + } + _publishCB = cb; + } + + /** + * 设置断开回调 + * @param onShutdown + */ + void setOnShutdown(const PusherBase::Event &cb) override{ + if (_parser) { + _parser->setOnShutdown(cb); + } + _shutdownCB = cb; + } +protected: + PusherBase::Event _shutdownCB; + PusherBase::Event _publishCB; + std::shared_ptr _parser; +}; + + +} /* namespace mediakit */ + +#endif /* SRC_PUSHER_PUSHERBASE_H_ */ diff --git a/src/Rtmp/Rtmp.h b/src/Rtmp/Rtmp.h index 1c52a3d3..02b80019 100644 --- a/src/Rtmp/Rtmp.h +++ b/src/Rtmp/Rtmp.h @@ -84,6 +84,7 @@ using namespace toolkit; #define FLV_KEY_FRAME 1 #define FLV_INTER_FRAME 2 +namespace mediakit { #if defined(_WIN32) #pragma pack(push, 1) @@ -291,7 +292,7 @@ public: } }; - +}//namespace mediakit diff --git a/src/Rtmp/RtmpPlayer.cpp b/src/Rtmp/RtmpPlayer.cpp index 500e67e2..1d8f5d42 100644 --- a/src/Rtmp/RtmpPlayer.cpp +++ b/src/Rtmp/RtmpPlayer.cpp @@ -72,11 +72,11 @@ void RtmpPlayer::teardown() { shutdown(); } } -void RtmpPlayer::play(const char* strUrl) { +void RtmpPlayer::play(const string &strUrl) { teardown(); - string strHost = FindField(strUrl, "://", "/"); - _strApp = FindField(strUrl, (strHost + "/").data(), "/"); - _strStream = FindField(strUrl, (strHost + "/" + _strApp + "/").data(), NULL); + string strHost = FindField(strUrl.data(), "://", "/"); + _strApp = FindField(strUrl.data(), (strHost + "/").data(), "/"); + _strStream = FindField(strUrl.data(), (strHost + "/" + _strApp + "/").data(), NULL); _strTcUrl = string("rtmp://") + strHost + "/" + _strApp; if (!_strApp.size() || !_strStream.size()) { @@ -85,13 +85,13 @@ void RtmpPlayer::play(const char* strUrl) { } DebugL << strHost << " " << _strApp << " " << _strStream; - auto iPort = atoi(FindField(strHost.c_str(), ":", NULL).c_str()); + auto iPort = atoi(FindField(strHost.data(), ":", NULL).data()); if (iPort <= 0) { //rtmp 默认端口1935 iPort = 1935; } else { //服务器域名 - strHost = FindField(strHost.c_str(), NULL, ":"); + strHost = FindField(strHost.data(), NULL, ":"); } if(!(*this)[PlayerBase::kNetAdapter].empty()){ setNetAdapter((*this)[PlayerBase::kNetAdapter]); diff --git a/src/Rtmp/RtmpPlayer.h b/src/Rtmp/RtmpPlayer.h index 364a4482..f9996234 100644 --- a/src/Rtmp/RtmpPlayer.h +++ b/src/Rtmp/RtmpPlayer.h @@ -49,7 +49,7 @@ public: RtmpPlayer(); virtual ~RtmpPlayer(); - void play(const char* strUrl) override; + void play(const string &strUrl) override; void pause(bool bPause) override; void teardown() override; protected: diff --git a/src/Rtmp/RtmpPlayerImp.h b/src/Rtmp/RtmpPlayerImp.h index c38efd13..1051fd5b 100644 --- a/src/Rtmp/RtmpPlayerImp.h +++ b/src/Rtmp/RtmpPlayerImp.h @@ -56,7 +56,7 @@ public: fProgress = MAX(float(0),MIN(fProgress,float(1.0))); seekToMilliSecond(fProgress * getDuration() * 1000); }; - void play(const char* strUrl) override { + void play(const string &strUrl) override { _analysisMs = (*this)[PlayerBase::kMaxAnalysisMS].as(); PlayerImp::play(strUrl); } diff --git a/src/Rtmp/RtmpProtocol.cpp b/src/Rtmp/RtmpProtocol.cpp index b633337a..f1902a65 100644 --- a/src/Rtmp/RtmpProtocol.cpp +++ b/src/Rtmp/RtmpProtocol.cpp @@ -325,7 +325,7 @@ void RtmpProtocol::handle_C0C1() { if (_strRcvBuf[0] != HANDSHAKE_PLAINTEXT) { throw std::runtime_error("only plaintext[0x03] handshake supported"); } - if(memcmp(_strRcvBuf.c_str() + 5,"\x00\x00\x00\x00",4) ==0 ){ + if(memcmp(_strRcvBuf.data() + 5,"\x00\x00\x00\x00",4) ==0 ){ //simple handsharke handle_C1_simple(); }else{ @@ -347,7 +347,7 @@ void RtmpProtocol::handle_C1_simple(){ RtmpHandshake s1(0); onSendRawData(obtainBuffer((char *) &s1, C1_HANDSHARK_SIZE)); //发送S2 - onSendRawData(obtainBuffer(_strRcvBuf.c_str() + 1, C1_HANDSHARK_SIZE)); + onSendRawData(obtainBuffer(_strRcvBuf.data() + 1, C1_HANDSHARK_SIZE)); //等待C2 _nextHandle = [this]() { handle_C2(); diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index 627ac9b6..07592b3b 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -35,14 +35,6 @@ namespace mediakit { static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE; -RtmpPusher::RtmpPusher(const char *strVhost,const char *strApp,const char *strStream) { - auto src = dynamic_pointer_cast(MediaSource::find(RTMP_SCHEMA,strVhost,strApp,strStream)); - if (!src) { - auto strErr = StrPrinter << "media source:" << strVhost << "/" << strApp << "/" << strStream << "not found!" << endl; - throw std::runtime_error(strErr); - } - _pMediaSrc=src; -} RtmpPusher::RtmpPusher(const RtmpMediaSource::Ptr &src){ _pMediaSrc=src; } @@ -70,11 +62,11 @@ void RtmpPusher::teardown() { } } -void RtmpPusher::publish(const char* strUrl) { +void RtmpPusher::publish(const string &strUrl) { teardown(); - string strHost = FindField(strUrl, "://", "/"); - _strApp = FindField(strUrl, (strHost + "/").data(), "/"); - _strStream = FindField(strUrl, (strHost + "/" + _strApp + "/").data(), NULL); + string strHost = FindField(strUrl.data(), "://", "/"); + _strApp = FindField(strUrl.data(), (strHost + "/").data(), "/"); + _strStream = FindField(strUrl.data(), (strHost + "/" + _strApp + "/").data(), NULL); _strTcUrl = string("rtmp://") + strHost + "/" + _strApp; if (!_strApp.size() || !_strStream.size()) { @@ -83,14 +75,31 @@ void RtmpPusher::publish(const char* strUrl) { } DebugL << strHost << " " << _strApp << " " << _strStream; - auto iPort = atoi(FindField(strHost.c_str(), ":", NULL).c_str()); + auto iPort = atoi(FindField(strHost.data(), ":", NULL).data()); if (iPort <= 0) { //rtmp 默认端口1935 iPort = 1935; } else { //服务器域名 - strHost = FindField(strHost.c_str(), NULL, ":"); + strHost = FindField(strHost.data(), NULL, ":"); } + + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + float playTimeOutSec = (*this)[kPlayTimeoutMS].as() / 1000.0; + _pPublishTimer.reset( new Timer(playTimeOutSec, [weakSelf]() { + auto strongSelf=weakSelf.lock(); + if(!strongSelf) { + return false; + } + strongSelf->onPublishResult(SockException(Err_timeout,"publish rtmp timeout")); + strongSelf->teardown(); + return false; + },getPoller())); + + if(!(*this)[kNetAdapter].empty()){ + setNetAdapter((*this)[kNetAdapter]); + } + startConnect(strHost, iPort); } @@ -98,26 +107,18 @@ void RtmpPusher::onErr(const SockException &ex){ onShutdown(ex); } void RtmpPusher::onConnect(const SockException &err){ - if(err.getErrCode()!=Err_success) { + if(err) { onPublishResult(err); return; } weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - _pPublishTimer.reset( new Timer(10, [weakSelf]() { - auto strongSelf=weakSelf.lock(); - if(!strongSelf) { - return false; - } - strongSelf->onPublishResult(SockException(Err_timeout,"publish rtmp timeout")); - strongSelf->teardown(); - return false; - },getPoller())); startClientSession([weakSelf](){ auto strongSelf=weakSelf.lock(); if(!strongSelf) { return; } - //strongSelf->sendChunkSize(60000); + + strongSelf->sendChunkSize(60000); strongSelf->send_connect(); }); } diff --git a/src/Rtmp/RtmpPusher.h b/src/Rtmp/RtmpPusher.h index c844f85f..d36457b9 100644 --- a/src/Rtmp/RtmpPusher.h +++ b/src/Rtmp/RtmpPusher.h @@ -30,28 +30,27 @@ #include "RtmpProtocol.h" #include "RtmpMediaSource.h" #include "Network/TcpClient.h" +#include "Pusher/PusherBase.h" namespace mediakit { -class RtmpPusher: public RtmpProtocol , public TcpClient{ +class RtmpPusher: public RtmpProtocol , public TcpClient , public PusherBase{ public: typedef std::shared_ptr Ptr; - typedef std::function Event; - RtmpPusher(const char *strVhost,const char *strApp,const char *strStream); RtmpPusher(const RtmpMediaSource::Ptr &src); virtual ~RtmpPusher(); - void publish(const char* strUrl); - void teardown(); + void publish(const string &strUrl) override ; - void setOnPublished(Event onPublished) { - _onPublished = onPublished; + void teardown() override; + + void setOnPublished(const Event &cb) override { + _onPublished = cb; } - void setOnShutdown(Event onShutdown) { - _onShutdown = onShutdown; + void setOnShutdown(const Event &cb) override{ + _onShutdown = cb; } - protected: //for Tcpclient override void onRecv(const Buffer::Ptr &pBuf) override; diff --git a/src/Rtsp/Rtsp.cpp b/src/Rtsp/Rtsp.cpp index 428b0373..eb5f4803 100644 --- a/src/Rtsp/Rtsp.cpp +++ b/src/Rtsp/Rtsp.cpp @@ -27,6 +27,8 @@ #include #include "Rtsp.h" +namespace mediakit{ + string FindField(const char* buf, const char* start, const char *end ,int bufSize) { if(bufSize <=0 ){ bufSize = strlen(buf); @@ -185,5 +187,5 @@ vector SdpAttr::getAvailableTrack() const { return ret; } - +}//namespace mediakit diff --git a/src/Rtsp/Rtsp.h b/src/Rtsp/Rtsp.h index 89fca892..aa774b0c 100644 --- a/src/Rtsp/Rtsp.h +++ b/src/Rtsp/Rtsp.h @@ -38,7 +38,18 @@ using namespace std; using namespace toolkit; using namespace mediakit; -class SdpTrack{ +namespace mediakit { + +namespace Rtsp { +typedef enum { + RTP_Invalid = -1, + RTP_TCP = 0, + RTP_UDP = 1, + RTP_MULTICAST = 2, +} eRtpType; +}; + +class SdpTrack { public: typedef std::shared_ptr Ptr; @@ -54,8 +65,8 @@ public: float _start = 0; float _end = 0; - map _other; - map _attr; + map _other; + map _attr; public: int _pt; string _codec; @@ -65,50 +76,58 @@ public: string _control_surffix; TrackType _type; public: - uint8_t _interleaved = 0; - bool _inited = false; - uint32_t _ssrc = 0; - uint16_t _seq = 0; + uint8_t _interleaved = 0; + bool _inited = false; + uint32_t _ssrc = 0; + uint16_t _seq = 0; //时间戳,单位毫秒 - uint32_t _time_stamp = 0; + uint32_t _time_stamp = 0; }; + class SdpAttr { public: typedef std::shared_ptr Ptr; - SdpAttr(){} - SdpAttr(const string &sdp){load(sdp);} - ~SdpAttr(){} - void load(const string &sdp); - bool available() const ; + SdpAttr() {} + + SdpAttr(const string &sdp) { load(sdp); } + + ~SdpAttr() {} + + void load(const string &sdp); + + bool available() const; + + SdpTrack::Ptr getTrack(TrackType type) const; + + vector getAvailableTrack() const; - SdpTrack::Ptr getTrack(TrackType type) const; - vector getAvailableTrack() const; private: - map _track_map; + map _track_map; }; class RtcpCounter { public: - uint32_t pktCnt = 0; - uint32_t octCount = 0; - uint32_t timeStamp = 0; + uint32_t pktCnt = 0; + uint32_t octCount = 0; + uint32_t timeStamp = 0; }; -string FindField(const char* buf, const char* start, const char *end,int bufSize = 0 ); +string FindField(const char *buf, const char *start, const char *end, int bufSize = 0); -struct StrCaseCompare -{ - bool operator()(const string& __x, const string& __y) const - {return strcasecmp(__x.data(), __y.data()) < 0 ;} +struct StrCaseCompare { + bool operator()(const string &__x, const string &__y) const { return strcasecmp(__x.data(), __y.data()) < 0; } }; -typedef map StrCaseMap; + +typedef map StrCaseMap; class Parser { public: Parser() {} + virtual ~Parser() {} + void Parse(const char *buf) { //解析 const char *start = buf; @@ -119,19 +138,19 @@ public: break; } if (start == buf) { - _strMethod = FindField(line.c_str(), NULL, " "); - _strFullUrl = FindField(line.c_str(), " ", " "); - auto args_pos = _strFullUrl.find('?'); - if(args_pos != string::npos){ - _strUrl = _strFullUrl.substr(0,args_pos); - _mapUrlArgs = parseArgs(_strFullUrl.substr(args_pos + 1 )); - }else{ + _strMethod = FindField(line.data(), NULL, " "); + _strFullUrl = FindField(line.data(), " ", " "); + auto args_pos = _strFullUrl.find('?'); + if (args_pos != string::npos) { + _strUrl = _strFullUrl.substr(0, args_pos); + _mapUrlArgs = parseArgs(_strFullUrl.substr(args_pos + 1)); + } else { _strUrl = _strFullUrl; } - _strTail = FindField(line.c_str(), (_strFullUrl + " ").c_str(), NULL); + _strTail = FindField(line.data(), (_strFullUrl + " ").data(), NULL); } else { - auto field = FindField(line.c_str(), NULL, ": "); - auto value = FindField(line.c_str(), ": ", NULL); + auto field = FindField(line.data(), NULL, ": "); + auto value = FindField(line.data(), ": ", NULL); if (field.size() != 0) { _mapHeaders[field] = value; } @@ -143,23 +162,28 @@ public: } } } - const string& Method() const { + + const string &Method() const { //rtsp方法 return _strMethod; } - const string& Url() const { + + const string &Url() const { //rtsp url return _strUrl; } - const string& FullUrl() const { - //rtsp url with args - return _strFullUrl; - } - const string& Tail() const { + + const string &FullUrl() const { + //rtsp url with args + return _strFullUrl; + } + + const string &Tail() const { //RTSP/1.0 return _strTail; } - const string& operator[](const char *name) const { + + const string &operator[](const char *name) const { //rtsp field auto it = _mapHeaders.find(name); if (it == _mapHeaders.end()) { @@ -167,39 +191,43 @@ public: } return it->second; } - const string& Content() const { + + const string &Content() const { return _strContent; } + void Clear() { _strMethod.clear(); _strUrl.clear(); - _strFullUrl.clear(); + _strFullUrl.clear(); _strTail.clear(); _strContent.clear(); _mapHeaders.clear(); _mapUrlArgs.clear(); } - void setUrl(const string& url) { + void setUrl(const string &url) { this->_strUrl = url; } - void setContent(const string& content) { + + void setContent(const string &content) { this->_strContent = content; } - StrCaseMap& getValues() const { + StrCaseMap &getValues() const { return _mapHeaders; } - StrCaseMap& getUrlArgs() const { + + StrCaseMap &getUrlArgs() const { return _mapUrlArgs; } - static StrCaseMap parseArgs(const string &str,const char *pair_delim = "&", const char *key_delim = "="){ + static StrCaseMap parseArgs(const string &str, const char *pair_delim = "&", const char *key_delim = "=") { StrCaseMap ret; auto arg_vec = split(str, pair_delim); for (string &key_val : arg_vec) { - auto key = FindField(key_val.data(),NULL,key_delim); - auto val = FindField(key_val.data(),key_delim, NULL); + auto key = FindField(key_val.data(), NULL, key_delim); + auto val = FindField(key_val.data(), key_delim, NULL); ret[key] = val; } return ret; @@ -211,11 +239,11 @@ private: string _strTail; string _strContent; string _strNull; - string _strFullUrl; + string _strFullUrl; mutable StrCaseMap _mapHeaders; mutable StrCaseMap _mapUrlArgs; }; - +} //namespace mediakit #endif //RTSP_RTSP_H_ diff --git a/src/Rtsp/RtspPlayer.cpp b/src/Rtsp/RtspPlayer.cpp index 18f3935e..2799c2bf 100644 --- a/src/Rtsp/RtspPlayer.cpp +++ b/src/Rtsp/RtspPlayer.cpp @@ -42,9 +42,6 @@ using namespace toolkit; namespace mediakit { -const char kRtspMd5Nonce[] = "rtsp_md5_nonce"; -const char kRtspRealm[] = "rtsp_realm"; - RtspPlayer::RtspPlayer(void){ RtpReceiver::setPoolSize(64); } @@ -57,8 +54,8 @@ void RtspPlayer::teardown(){ shutdown(); } - erase(kRtspMd5Nonce); - erase(kRtspRealm); + _rtspMd5Nonce.clear(); + _rtspRealm.clear(); _aTrackInfo.clear(); _strSession.clear(); @@ -81,58 +78,55 @@ void RtspPlayer::teardown(){ _onHandshake = nullptr; } -void RtspPlayer::play(const char* strUrl){ - auto userAndPwd = FindField(strUrl,"://","@"); - eRtpType eType = (eRtpType)(int)(*this)[kRtpType]; +void RtspPlayer::play(const string &strUrl){ + auto userAndPwd = FindField(strUrl.data(),"://","@"); + Rtsp::eRtpType eType = (Rtsp::eRtpType)(int)(*this)[kRtpType]; if(userAndPwd.empty()){ - play(strUrl,nullptr,nullptr,eType); + play(strUrl,"","",eType); return; } - auto suffix = FindField(strUrl,"@",nullptr); + auto suffix = FindField(strUrl.data(),"@",nullptr); auto url = StrPrinter << "rtsp://" << suffix << endl; if(userAndPwd.find(":") == string::npos){ - play(url.data(),userAndPwd.data(),nullptr,eType); + play(url,userAndPwd,"",eType); return; } auto user = FindField(userAndPwd.data(),nullptr,":"); auto pwd = FindField(userAndPwd.data(),":",nullptr); - play(url.data(),user.data(),pwd.data(),eType); + play(url,user,pwd,eType); } //播放,指定是否走rtp over tcp -void RtspPlayer::play(const char* strUrl, const char *strUser, const char *strPwd, eRtpType eType ) { +void RtspPlayer::play(const string &strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType ) { DebugL << strUrl << " " - << (strUser ? strUser : "null") << " " - << (strPwd ? strPwd:"null") << " " + << (strUser.size() ? strUser : "null") << " " + << (strPwd.size() ? strPwd:"null") << " " << eType; teardown(); - if(strUser){ + if(strUser.size()){ (*this)[kRtspUser] = strUser; } - if(strPwd){ + if(strPwd.size()){ (*this)[kRtspPwd] = strPwd; (*this)[kRtspPwdIsMD5] = false; } _eType = eType; - auto ip = FindField(strUrl, "://", "/"); + auto ip = FindField(strUrl.data(), "://", "/"); if (!ip.size()) { - ip = FindField(strUrl, "://", NULL); + ip = FindField(strUrl.data(), "://", NULL); } - auto port = atoi(FindField(ip.c_str(), ":", NULL).c_str()); + auto port = atoi(FindField(ip.data(), ":", NULL).data()); if (port <= 0) { //rtsp 默认端口554 port = 554; } else { //服务器域名 - ip = FindField(ip.c_str(), NULL, ":"); + ip = FindField(ip.data(), NULL, ":"); } _strUrl = strUrl; - if(!(*this)[PlayerBase::kNetAdapter].empty()){ - setNetAdapter((*this)[PlayerBase::kNetAdapter]); - } weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); float playTimeOutSec = (*this)[kPlayTimeoutMS].as() / 1000.0; @@ -146,7 +140,10 @@ void RtspPlayer::play(const char* strUrl, const char *strUser, const char *strPw return false; },getPoller())); - startConnect(ip.data(), port , playTimeOutSec); + if(!(*this)[PlayerBase::kNetAdapter].empty()){ + setNetAdapter((*this)[PlayerBase::kNetAdapter]); + } + startConnect(ip, port , playTimeOutSec); } void RtspPlayer::onConnect(const SockException &err){ if(err.getErrCode()!=Err_success) { @@ -166,7 +163,7 @@ void RtspPlayer::onErr(const SockException &ex) { } // from live555 bool RtspPlayer::handleAuthenticationFailure(const string ¶msStr) { - if(!(*this)[kRtspRealm].empty()){ + if(!_rtspRealm.empty()){ //已经认证过了 return false; } @@ -181,17 +178,17 @@ bool RtspPlayer::handleAuthenticationFailure(const string ¶msStr) { }); if (sscanf(paramsStr.data(), "Digest realm=\"%[^\"]\", nonce=\"%[^\"]\", stale=%[a-zA-Z]", realm, nonce, stale) == 3) { - (*this)[kRtspRealm] = (const char *)realm; - (*this)[kRtspMd5Nonce] = (const char *)nonce; + _rtspRealm = (const char *)realm; + _rtspMd5Nonce = (const char *)nonce; return true; } if (sscanf(paramsStr.data(), "Digest realm=\"%[^\"]\", nonce=\"%[^\"]\"", realm, nonce) == 2) { - (*this)[kRtspRealm] = (const char *)realm; - (*this)[kRtspMd5Nonce] = (const char *)nonce; + _rtspRealm = (const char *)realm; + _rtspMd5Nonce = (const char *)nonce; return true; } if (sscanf(paramsStr.data(), "Basic realm=\"%[^\"]\"", realm) == 1) { - (*this)[kRtspRealm] = (const char *)realm; + _rtspRealm = (const char *)realm; return true; } return false; @@ -208,7 +205,7 @@ void RtspPlayer::handleResDESCRIBE(const Parser& parser) { if(newUrl.empty()){ throw std::runtime_error("未找到Location字段(跳转url)"); } - play(newUrl.data()); + play(newUrl); return; } if (parser.Url() != "200") { @@ -244,15 +241,13 @@ bool RtspPlayer::sendSetup(unsigned int trackIndex) { auto &track = _aTrackInfo[trackIndex]; auto baseUrl = _strContentBase + "/" + track->_control_surffix; switch (_eType) { - case RTP_TCP: { + case Rtsp::RTP_TCP: { return sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP/TCP;unicast;interleaved=" << track->_type * 2 << "-" << track->_type * 2 + 1}); } - break; - case RTP_MULTICAST: { + case Rtsp::RTP_MULTICAST: { return sendRtspRequest("SETUP",baseUrl,{"Transport","Transport: RTP/AVP;multicast"}); } - break; - case RTP_UDP: { + case Rtsp::RTP_UDP: { _apUdpSock[trackIndex].reset(new Socket()); if (!_apUdpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) { _apUdpSock[trackIndex].reset(); @@ -261,10 +256,8 @@ bool RtspPlayer::sendSetup(unsigned int trackIndex) { int port = _apUdpSock[trackIndex]->get_local_port(); return sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP;unicast;client_port=" << port << "-" << port + 1}); } - break; default: return false; - break; } } @@ -281,29 +274,29 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) auto strTransport = parser["Transport"]; if(strTransport.find("TCP") != string::npos){ - _eType = RTP_TCP; + _eType = Rtsp::RTP_TCP; }else if(strTransport.find("multicast") != string::npos){ - _eType = RTP_MULTICAST; + _eType = Rtsp::RTP_MULTICAST; }else{ - _eType = RTP_UDP; + _eType = Rtsp::RTP_UDP; } - RtspSplitter::enableRecvRtp(_eType == RTP_TCP); + RtspSplitter::enableRecvRtp(_eType == Rtsp::RTP_TCP); - if(_eType == RTP_TCP) { - string interleaved = FindField( FindField((strTransport + ";").c_str(), "interleaved=", ";").c_str(), NULL, "-"); - _aTrackInfo[uiTrackIndex]->_interleaved = atoi(interleaved.c_str()); + if(_eType == Rtsp::RTP_TCP) { + string interleaved = FindField( FindField((strTransport + ";").data(), "interleaved=", ";").data(), NULL, "-"); + _aTrackInfo[uiTrackIndex]->_interleaved = atoi(interleaved.data()); }else{ - const char *strPos = (_eType == RTP_MULTICAST ? "port=" : "server_port=") ; - auto port_str = FindField((strTransport + ";").c_str(), strPos, ";"); - uint16_t port = atoi(FindField(port_str.c_str(), NULL, "-").c_str()); + const char *strPos = (_eType == Rtsp::RTP_MULTICAST ? "port=" : "server_port=") ; + auto port_str = FindField((strTransport + ";").data(), strPos, ";"); + uint16_t port = atoi(FindField(port_str.data(), NULL, "-").data()); auto &pUdpSockRef = _apUdpSock[uiTrackIndex]; if(!pUdpSockRef){ pUdpSockRef.reset(new Socket()); } - if (_eType == RTP_MULTICAST) { - auto multiAddr = FindField((strTransport + ";").c_str(), "destination=", ";"); + if (_eType == Rtsp::RTP_MULTICAST) { + auto multiAddr = FindField((strTransport + ";").data(), "destination=", ";"); if (!pUdpSockRef->bindUdpSock(port, "0.0.0.0")) { pUdpSockRef.reset(); throw std::runtime_error("open udp sock err"); @@ -316,7 +309,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) struct sockaddr_in rtpto; rtpto.sin_port = ntohs(port); rtpto.sin_family = AF_INET; - rtpto.sin_addr.s_addr = inet_addr(get_peer_ip().c_str()); + rtpto.sin_addr.s_addr = inet_addr(get_peer_ip().data()); pUdpSockRef->setSendPeerAddr((struct sockaddr *)&(rtpto)); pUdpSockRef->send("\xce\xfa\xed\xfe", 4); } @@ -328,7 +321,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) return; } - for (unsigned int i = 0; i < _aTrackInfo.size() && _eType != RTP_TCP; i++) { + for (unsigned int i = 0; i < _aTrackInfo.size() && _eType != Rtsp::RTP_TCP; i++) { auto &pUdpSockRef = _apUdpSock[i]; if(!pUdpSockRef){ continue; @@ -348,21 +341,23 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) }); } /////////////////////////心跳///////////////////////////////// - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - _pBeatTimer.reset(new Timer((*this)[kBeatIntervalMS].as() / 1000.0, [weakSelf](){ - auto strongSelf = weakSelf.lock(); - if (!strongSelf){ - return false; - } - return strongSelf->sendOptions(); - },getPoller())); + if(_eType != Rtsp::RTP_TCP){ + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + _pBeatTimer.reset(new Timer((*this)[kBeatIntervalMS].as() / 1000.0, [weakSelf](){ + auto strongSelf = weakSelf.lock(); + if (!strongSelf){ + return false; + } + return strongSelf->sendOptions(); + },getPoller())); + } + pause(false); } bool RtspPlayer::sendOptions() { _onHandshake = [](const Parser& parser){ // DebugL << "options response"; - return true; }; return sendRtspRequest("OPTIONS",_strContentBase); } @@ -530,12 +525,14 @@ bool RtspPlayer::sendRtspRequest(const string &cmd, const string &url, const std bool RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrCaseMap &header_const) { auto header = header_const; header.emplace("CSeq",StrPrinter << _uiCseq++); + header.emplace("User-Agent",SERVER_NAME "(build in " __DATE__ " " __TIME__ ")"); + if(!_strSession.empty()){ header.emplace("Session",_strSession); } - if(!(*this)[kRtspRealm].empty() && !(*this)[PlayerBase::kRtspUser].empty()){ - if(!(*this)[kRtspMd5Nonce].empty()){ + if(!_rtspRealm.empty() && !(*this)[PlayerBase::kRtspUser].empty()){ + if(!_rtspMd5Nonce.empty()){ //MD5认证 /* response计算方法如下: @@ -547,14 +544,14 @@ bool RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrC */ string encrypted_pwd = (*this)[PlayerBase::kRtspPwd]; if(!(*this)[PlayerBase::kRtspPwdIsMD5].as()){ - encrypted_pwd = MD5((*this)[PlayerBase::kRtspUser]+ ":" + (*this)[kRtspRealm] + ":" + encrypted_pwd).hexdigest(); + encrypted_pwd = MD5((*this)[PlayerBase::kRtspUser]+ ":" + _rtspRealm + ":" + encrypted_pwd).hexdigest(); } - auto response = MD5( encrypted_pwd + ":" + (*this)[kRtspMd5Nonce] + ":" + MD5(cmd + ":" + url).hexdigest()).hexdigest(); + auto response = MD5( encrypted_pwd + ":" + _rtspMd5Nonce + ":" + MD5(cmd + ":" + url).hexdigest()).hexdigest(); _StrPrinter printer; printer << "Digest "; printer << "username=\"" << (*this)[PlayerBase::kRtspUser] << "\", "; - printer << "realm=\"" << (*this)[kRtspRealm] << "\", "; - printer << "nonce=\"" << (*this)[kRtspMd5Nonce] << "\", "; + printer << "realm=\"" << _rtspRealm << "\", "; + printer << "nonce=\"" << _rtspMd5Nonce << "\", "; printer << "uri=\"" << url << "\", "; printer << "response=\"" << response << "\""; header.emplace("Authorization",printer); diff --git a/src/Rtsp/RtspPlayer.h b/src/Rtsp/RtspPlayer.h index 8a2ccd42..50f9aaa7 100644 --- a/src/Rtsp/RtspPlayer.h +++ b/src/Rtsp/RtspPlayer.h @@ -54,7 +54,7 @@ public: RtspPlayer(); virtual ~RtspPlayer(void); - void play(const char* strUrl) override; + void play(const string &strUrl) override; void pause(bool bPause) override; void teardown() override; float getPacketLossRate(TrackType type) const override; @@ -93,7 +93,7 @@ private: int getTrackIndexByInterleaved(int interleaved) const; int getTrackIndexByTrackType(TrackType trackId) const; - void play(const char* strUrl, const char *strUser, const char *strPwd, eRtpType eType); + void play(const string &strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType); void onConnect(const SockException &err) override; void onRecv(const Buffer::Ptr &pBuf) override; void onErr(const SockException &ex) override; @@ -115,14 +115,16 @@ private: string _strUrl; SdpAttr _sdpAttr; vector _aTrackInfo; - function _onHandshake; Socket::Ptr _apUdpSock[2]; + //rtsp鉴权相关 + string _rtspMd5Nonce; + string _rtspRealm; //rtsp info string _strSession; unsigned int _uiCseq = 1; string _strContentBase; - eRtpType _eType = RTP_TCP; + Rtsp::eRtpType _eType = Rtsp::RTP_TCP; /* 丢包率统计需要用到的参数 */ uint16_t _aui16FirstSeq[2] = { 0 , 0}; diff --git a/src/Rtsp/RtspPusher.cpp b/src/Rtsp/RtspPusher.cpp new file mode 100644 index 00000000..009470e1 --- /dev/null +++ b/src/Rtsp/RtspPusher.cpp @@ -0,0 +1,450 @@ +// +// Created by xzl on 2019/3/27. +// + +#include "Util/MD5.h" +#include "Util/base64.h" +#include "RtspPusher.h" +#include "RtspSession.h" + +namespace mediakit { + +static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE; + +RtspPusher::RtspPusher(const RtspMediaSource::Ptr &src) { + _pMediaSrc = src; +} + +RtspPusher::~RtspPusher() { + teardown(); + DebugL << endl; +} + +void RtspPusher::teardown() { + if (alive()) { + sendRtspRequest("TEARDOWN" ,_strContentBase); + shutdown(); + } + + reset(); + CLEAR_ARR(_apUdpSock); + _rtspMd5Nonce.clear(); + _rtspRealm.clear(); + _aTrackInfo.clear(); + _strSession.clear(); + _strContentBase.clear(); + _strSession.clear(); + _uiCseq = 1; + _pPublishTimer.reset(); + _pBeatTimer.reset(); + _pRtspReader.reset(); + _aTrackInfo.clear(); + _onHandshake = nullptr; +} + +void RtspPusher::publish(const string &strUrl) { + auto userAndPwd = FindField(strUrl.data(),"://","@"); + Rtsp::eRtpType eType = (Rtsp::eRtpType)(int)(*this)[ PlayerBase::kRtpType]; + if(userAndPwd.empty()){ + publish(strUrl,"","",eType); + return; + } + auto suffix = FindField(strUrl.data(),"@",nullptr); + auto url = StrPrinter << "rtsp://" << suffix << endl; + if(userAndPwd.find(":") == string::npos){ + publish(url,userAndPwd,"",eType); + return; + } + auto user = FindField(userAndPwd.data(),nullptr,":"); + auto pwd = FindField(userAndPwd.data(),":",nullptr); + publish(url,user,pwd,eType); +} + +void RtspPusher::publish(const string & strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType ) { + DebugL << strUrl << " " + << (strUser.size() ? strUser : "null") << " " + << (strPwd.size() ? strPwd:"null") << " " + << eType; + teardown(); + + if(strUser.size()){ + (*this)[PlayerBase::kRtspUser] = strUser; + } + if(strPwd.size()){ + (*this)[PlayerBase::kRtspPwd] = strPwd; + (*this)[PlayerBase::kRtspPwdIsMD5] = false; + } + + _eType = eType; + + auto ip = FindField(strUrl.data(), "://", "/"); + if (!ip.size()) { + ip = FindField(strUrl.data(), "://", NULL); + } + auto port = atoi(FindField(ip.data(), ":", NULL).data()); + if (port <= 0) { + //rtsp 默认端口554 + port = 554; + } else { + //服务器域名 + ip = FindField(ip.data(), NULL, ":"); + } + + _strUrl = strUrl; + + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + float playTimeOutSec = (*this)[kPlayTimeoutMS].as() / 1000.0; + _pPublishTimer.reset( new Timer(playTimeOutSec, [weakSelf]() { + auto strongSelf=weakSelf.lock(); + if(!strongSelf) { + return false; + } + strongSelf->onPublishResult(SockException(Err_timeout,"publish rtsp timeout")); + strongSelf->teardown(); + return false; + },getPoller())); + + if(!(*this)[kNetAdapter].empty()){ + setNetAdapter((*this)[kNetAdapter]); + } + startConnect(ip, port , playTimeOutSec); +} + +void RtspPusher::onErr(const SockException &ex) { + onShutdown(ex); +} + +void RtspPusher::onConnect(const SockException &err) { + if(err) { + onPublishResult(err); + return; + } + sendAnnounce(); +} + +void RtspPusher::onRecv(const Buffer::Ptr &pBuf){ + try { + input(pBuf->data(), pBuf->size()); + } catch (exception &e) { + SockException ex(Err_other, e.what()); + onPublishResult(ex); + onShutdown(ex); + teardown(); + } +} + +void RtspPusher::onWholeRtspPacket(Parser &parser) { + decltype(_onHandshake) fun; + _onHandshake.swap(fun); + if(fun){ + fun(parser); + } + parser.Clear(); +} + + +bool RtspPusher::sendAnnounce() { + auto src = _pMediaSrc.lock(); + if (!src) { + throw std::runtime_error("the media source was released"); + } + //解析sdp + _sdpAttr.load(src->getSdp()); + _aTrackInfo = _sdpAttr.getAvailableTrack(); + + if (_aTrackInfo.empty()) { + throw std::runtime_error("无有效的Sdp Track"); + } + + _onHandshake = std::bind(&RtspPusher::handleResAnnounce,this, placeholders::_1); + return sendRtspRequest("ANNOUNCE",_strUrl,{},src->getSdp()); +} + +void RtspPusher::handleResAnnounce(const Parser &parser) { + string authInfo = parser["WWW-Authenticate"]; + //发送DESCRIBE命令后的回复 + if ((parser.Url() == "401") && handleAuthenticationFailure(authInfo)) { + sendAnnounce(); + return; + } + if(parser.Url() == "302"){ + auto newUrl = parser["Location"]; + if(newUrl.empty()){ + throw std::runtime_error("未找到Location字段(跳转url)"); + } + publish(newUrl); + return; + } + if (parser.Url() != "200") { + throw std::runtime_error(StrPrinter << "ANNOUNCE:" << parser.Url() << " " << parser.Tail()); + } + _strContentBase = parser["Content-Base"]; + + if(_strContentBase.empty()){ + _strContentBase = _strUrl; + } + if (_strContentBase.back() == '/') { + _strContentBase.pop_back(); + } + + sendSetup(0); +} + +bool RtspPusher::handleAuthenticationFailure(const string ¶msStr) { + if(!_rtspRealm.empty()){ + //已经认证过了 + return false; + } + + char *realm = new char[paramsStr.size()]; + char *nonce = new char[paramsStr.size()]; + char *stale = new char[paramsStr.size()]; + onceToken token(nullptr,[&](){ + delete[] realm; + delete[] nonce; + delete[] stale; + }); + + if (sscanf(paramsStr.data(), "Digest realm=\"%[^\"]\", nonce=\"%[^\"]\", stale=%[a-zA-Z]", realm, nonce, stale) == 3) { + _rtspRealm = (const char *)realm; + _rtspMd5Nonce = (const char *)nonce; + return true; + } + if (sscanf(paramsStr.data(), "Digest realm=\"%[^\"]\", nonce=\"%[^\"]\"", realm, nonce) == 2) { + _rtspRealm = (const char *)realm; + _rtspMd5Nonce = (const char *)nonce; + return true; + } + if (sscanf(paramsStr.data(), "Basic realm=\"%[^\"]\"", realm) == 1) { + _rtspRealm = (const char *)realm; + return true; + } + return false; +} + +bool RtspPusher::sendSetup(unsigned int trackIndex) { + _onHandshake = std::bind(&RtspPusher::handleResSetup,this, placeholders::_1,trackIndex); + + auto &track = _aTrackInfo[trackIndex]; + auto baseUrl = _strContentBase + "/" + track->_control_surffix; + switch (_eType) { + case Rtsp::RTP_TCP: { + return sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP/TCP;unicast;interleaved=" << track->_type * 2 << "-" << track->_type * 2 + 1}); + } + case Rtsp::RTP_UDP: { + _apUdpSock[trackIndex].reset(new Socket()); + if (!_apUdpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) { + _apUdpSock[trackIndex].reset(); + throw std::runtime_error("open udp sock err"); + } + int port = _apUdpSock[trackIndex]->get_local_port(); + return sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP;unicast;client_port=" << port << "-" << port + 1}); + } + default: + return false; + } +} + +void RtspPusher::handleResSetup(const Parser &parser, unsigned int uiTrackIndex) { + if (parser.Url() != "200") { + throw std::runtime_error( + StrPrinter << "SETUP:" << parser.Url() << " " << parser.Tail() << endl); + } + if (uiTrackIndex == 0) { + _strSession = parser["Session"]; + _strSession.append(";"); + _strSession = FindField(_strSession.data(), nullptr, ";"); + } + + auto strTransport = parser["Transport"]; + if(strTransport.find("TCP") != string::npos){ + _eType = Rtsp::RTP_TCP; + string interleaved = FindField( FindField((strTransport + ";").data(), "interleaved=", ";").data(), NULL, "-"); + _aTrackInfo[uiTrackIndex]->_interleaved = atoi(interleaved.data()); + }else if(strTransport.find("multicast") != string::npos){ + throw std::runtime_error("SETUP rtsp pusher can not support multicast!"); + }else{ + _eType = Rtsp::RTP_UDP; + const char *strPos = "server_port=" ; + auto port_str = FindField((strTransport + ";").data(), strPos, ";"); + uint16_t port = atoi(FindField(port_str.data(), NULL, "-").data()); + auto &pUdpSockRef = _apUdpSock[uiTrackIndex]; + if(!pUdpSockRef){ + pUdpSockRef.reset(new Socket()); + } + + struct sockaddr_in rtpto; + rtpto.sin_port = ntohs(port); + rtpto.sin_family = AF_INET; + rtpto.sin_addr.s_addr = inet_addr(get_peer_ip().data()); + pUdpSockRef->setSendPeerAddr((struct sockaddr *)&(rtpto)); + } + + RtspSplitter::enableRecvRtp(_eType == Rtsp::RTP_TCP); + + if (uiTrackIndex < _aTrackInfo.size() - 1) { + //需要继续发送SETUP命令 + sendSetup(uiTrackIndex + 1); + return; + } + + sendRecord(); +} + +bool RtspPusher::sendOptions() { + _onHandshake = [this](const Parser& parser){}; + return sendRtspRequest("OPTIONS",_strContentBase); +} + +inline void RtspPusher::sendRtpPacket(const RtpPacket::Ptr & pkt) { + //InfoL<<(int)pkt.Interleaved; + switch (_eType) { + case Rtsp::RTP_TCP: { + BufferRtp::Ptr buffer(new BufferRtp(pkt)); + send(buffer); + } + break; + case Rtsp::RTP_UDP: { + int iTrackIndex = getTrackIndexByTrackType(pkt->type); + auto &pSock = _apUdpSock[iTrackIndex]; + if (!pSock) { + shutdown(); + return; + } + BufferRtp::Ptr buffer(new BufferRtp(pkt,4)); + pSock->send(buffer); + } + break; + default: + break; + } +} + +inline int RtspPusher::getTrackIndexByTrackType(TrackType type) { + for (unsigned int i = 0; i < _aTrackInfo.size(); i++) { + if (type == _aTrackInfo[i]->_type) { + return i; + } + } + return -1; +} + + +bool RtspPusher::sendRecord() { + _onHandshake = [this](const Parser& parser){ + auto src = _pMediaSrc.lock(); + if (!src) { + throw std::runtime_error("the media source was released"); + } + + _pRtspReader = src->getRing()->attach(getPoller()); + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + _pRtspReader->setReadCB([weakSelf](const RtpPacket::Ptr &pkt){ + auto strongSelf = weakSelf.lock(); + if(!strongSelf) { + return; + } + strongSelf->sendRtpPacket(pkt); + }); + _pRtspReader->setDetachCB([weakSelf](){ + auto strongSelf = weakSelf.lock(); + if(strongSelf){ + strongSelf->onShutdown(SockException(Err_other,"媒体源被释放")); + strongSelf->teardown(); + } + }); + if(_eType != Rtsp::RTP_TCP){ + /////////////////////////心跳///////////////////////////////// + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + _pBeatTimer.reset(new Timer((*this)[kBeatIntervalMS].as() / 1000.0, [weakSelf](){ + auto strongSelf = weakSelf.lock(); + if (!strongSelf){ + return false; + } + return strongSelf->sendOptions(); + },getPoller())); + } + onPublishResult(SockException(Err_success,"success")); + //提高发送性能 + (*this) << SocketFlags(kSockFlags); + SockUtil::setNoDelay(_sock->rawFD(),false); + }; + return sendRtspRequest("RECORD",_strContentBase,{"Range","npt=0.000-"}); +} + +bool RtspPusher::sendRtspRequest(const string &cmd, const string &url, const std::initializer_list &header,const string &sdp ) { + string key; + StrCaseMap header_map; + int i = 0; + for(auto &val : header){ + if(++i % 2 == 0){ + header_map.emplace(key,val); + }else{ + key = val; + } + } + return sendRtspRequest(cmd,url,header_map,sdp); +} +bool RtspPusher::sendRtspRequest(const string &cmd, const string &url,const StrCaseMap &header_const,const string &sdp ) { + auto header = header_const; + header.emplace("CSeq",StrPrinter << _uiCseq++); + header.emplace("User-Agent",SERVER_NAME "(build in " __DATE__ " " __TIME__ ")"); + + if(!_strSession.empty()){ + header.emplace("Session",_strSession); + } + + if(!_rtspRealm.empty() && !(*this)[kRtspUser].empty()){ + if(!_rtspMd5Nonce.empty()){ + //MD5认证 + /* + response计算方法如下: + RTSP客户端应该使用username + password并计算response如下: + (1)当password为MD5编码,则 + response = md5( password:nonce:md5(public_method:url) ); + (2)当password为ANSI字符串,则 + response= md5( md5(username:realm:password):nonce:md5(public_method:url) ); + */ + string encrypted_pwd = (*this)[kRtspPwd]; + if(!(*this)[kRtspPwdIsMD5].as()){ + encrypted_pwd = MD5((*this)[kRtspUser]+ ":" + _rtspRealm + ":" + encrypted_pwd).hexdigest(); + } + auto response = MD5( encrypted_pwd + ":" + _rtspMd5Nonce + ":" + MD5(cmd + ":" + url).hexdigest()).hexdigest(); + _StrPrinter printer; + printer << "Digest "; + printer << "username=\"" << (*this)[kRtspUser] << "\", "; + printer << "realm=\"" << _rtspRealm << "\", "; + printer << "nonce=\"" << _rtspMd5Nonce << "\", "; + printer << "uri=\"" << url << "\", "; + printer << "response=\"" << response << "\""; + header.emplace("Authorization",printer); + }else if(!(*this)[kRtspPwdIsMD5].as()){ + //base64认证 + string authStr = StrPrinter << (*this)[kRtspUser] << ":" << (*this)[kRtspPwd]; + char authStrBase64[1024] = {0}; + av_base64_encode(authStrBase64,sizeof(authStrBase64),(uint8_t *)authStr.data(),authStr.size()); + header.emplace("Authorization",StrPrinter << "Basic " << authStrBase64 ); + } + } + + if(!sdp.empty()){ + header.emplace("Content-Length",StrPrinter << sdp.size()); + header.emplace("Content-Type","application/sdp"); + } + + _StrPrinter printer; + printer << cmd << " " << url << " RTSP/1.0\r\n"; + for (auto &pr : header){ + printer << pr.first << ": " << pr.second << "\r\n"; + } + + printer << "\r\n"; + + if(!sdp.empty()){ + printer << sdp; + } + return send(printer) > 0; +} + + +} /* namespace mediakit */ \ No newline at end of file diff --git a/src/Rtsp/RtspPusher.h b/src/Rtsp/RtspPusher.h new file mode 100644 index 00000000..f8571550 --- /dev/null +++ b/src/Rtsp/RtspPusher.h @@ -0,0 +1,112 @@ +// +// Created by xzl on 2019/3/27. +// + +#ifndef ZLMEDIAKIT_RTSPPUSHER_H +#define ZLMEDIAKIT_RTSPPUSHER_H + +#include +#include +#include "Rtsp.h" +#include "RtspMediaSource.h" +#include "Util/util.h" +#include "Util/logger.h" +#include "Poller/Timer.h" +#include "Network/Socket.h" +#include "Network/TcpClient.h" +#include "RtspSplitter.h" +#include "Pusher/PusherBase.h" + +using namespace std; +using namespace toolkit; + +namespace mediakit { + +class RtspPusher : public TcpClient, public RtspSplitter, public PusherBase { +public: + typedef std::shared_ptr Ptr; + RtspPusher(const RtspMediaSource::Ptr &src); + virtual ~RtspPusher(); + + void publish(const string &strUrl) override; + + void teardown() override; + + void setOnPublished(const Event &cb) override { + _onPublished = cb; + } + + void setOnShutdown(const Event & cb) override{ + _onShutdown = cb; + } +protected: + //for Tcpclient override + void onRecv(const Buffer::Ptr &pBuf) override; + void onConnect(const SockException &err) override; + void onErr(const SockException &ex) override; + + //RtspSplitter override + void onWholeRtspPacket(Parser &parser) override ; + void onRtpPacket(const char *data,uint64_t len) override {}; +private: + void publish(const string &strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType ); + + void onShutdown(const SockException &ex) { + _pPublishTimer.reset(); + if(_onShutdown){ + _onShutdown(ex); + } + _pRtspReader.reset(); + } + void onPublishResult(const SockException &ex) { + _pPublishTimer.reset(); + if(_onPublished){ + _onPublished(ex); + } + } + + bool sendAnnounce(); + bool sendSetup(unsigned int uiTrackIndex); + bool sendRecord(); + bool sendOptions(); + + void handleResAnnounce(const Parser &parser); + void handleResSetup(const Parser &parser, unsigned int uiTrackIndex); + bool handleAuthenticationFailure(const string ¶msStr); + + inline int getTrackIndexByTrackType(TrackType type); + + void sendRtpPacket(const RtpPacket::Ptr & pkt) ; + bool sendRtspRequest(const string &cmd, const string &url ,const StrCaseMap &header = StrCaseMap(),const string &sdp = "" ); + bool sendRtspRequest(const string &cmd, const string &url ,const std::initializer_list &header,const string &sdp = ""); +private: + //rtsp鉴权相关 + string _rtspMd5Nonce; + string _rtspRealm; + + //超时功能实现 + std::shared_ptr _pPublishTimer; + //源 + std::weak_ptr _pMediaSrc; + RtspMediaSource::RingType::RingReader::Ptr _pRtspReader; + //事件监听 + Event _onShutdown; + Event _onPublished; + + string _strUrl; + SdpAttr _sdpAttr; + vector _aTrackInfo; + string _strSession; + unsigned int _uiCseq = 1; + string _strContentBase; + Rtsp::eRtpType _eType = Rtsp::RTP_TCP; + Socket::Ptr _apUdpSock[2]; + function _onHandshake; + //心跳定时器 + std::shared_ptr _pBeatTimer; + + +}; + +} /* namespace mediakit */ +#endif //ZLMEDIAKIT_RTSPPUSHER_H diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 55e0244c..b3bb742b 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -1,4 +1,4 @@ -/* +/* * MIT License * * Copyright (c) 2016 xiongziliang <771730766@qq.com> @@ -85,7 +85,7 @@ RtspSession::~RtspSession() { void RtspSession::onError(const SockException& err) { TraceL << err.getErrCode() << " " << err.what(); - if (_rtpType == PlayerBase::RTP_MULTICAST) { + if (_rtpType == Rtsp::RTP_MULTICAST) { //取消UDP端口监听 UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this); } @@ -118,7 +118,7 @@ void RtspSession::onManager() { } - if ((_rtpType == PlayerBase::RTP_UDP || _pushSrc ) && _ticker.elapsedTime() > 15 * 1000) { + if ((_rtpType == Rtsp::RTP_UDP || _pushSrc ) && _ticker.elapsedTime() > 15 * 1000) { //如果是推流端或者rtp over udp类型的播放端,那么就做超时检测 WarnL << "RTSP会话超时:" << get_peer_ip(); shutdown(); @@ -550,22 +550,22 @@ bool RtspSession::handleReq_Setup(const Parser &parser) { } trackRef->_inited = true; //现在初始化 - if(_rtpType == PlayerBase::RTP_Invalid){ + if(_rtpType == Rtsp::RTP_Invalid){ auto strTransport = parser["Transport"]; if(strTransport.find("TCP") != string::npos){ - _rtpType = PlayerBase::RTP_TCP; + _rtpType = Rtsp::RTP_TCP; }else if(strTransport.find("multicast") != string::npos){ - _rtpType = PlayerBase::RTP_MULTICAST; + _rtpType = Rtsp::RTP_MULTICAST; }else{ - _rtpType = PlayerBase::RTP_UDP; + _rtpType = Rtsp::RTP_UDP; } } //允许接收rtp、rtcp包 - RtspSplitter::enableRecvRtp(_rtpType == PlayerBase::RTP_TCP); + RtspSplitter::enableRecvRtp(_rtpType == Rtsp::RTP_TCP); switch (_rtpType) { - case PlayerBase::RTP_TCP: { + case Rtsp::RTP_TCP: { trackRef->_interleaved = trackRef->_type * 2; sendRtspResponse("200 OK", {"Transport",StrPrinter << "RTP/AVP/TCP;unicast;" @@ -576,7 +576,7 @@ bool RtspSession::handleReq_Setup(const Parser &parser) { }); } break; - case PlayerBase::RTP_UDP: { + case Rtsp::RTP_UDP: { //我们用trackIdx区分rtp和rtcp包 auto pSockRtp = std::make_shared(_sock->getPoller()); if (!pSockRtp->bindUdpSock(0,get_local_ip().data())) { @@ -615,7 +615,7 @@ bool RtspSession::handleReq_Setup(const Parser &parser) { }); } break; - case PlayerBase::RTP_MULTICAST: { + case Rtsp::RTP_MULTICAST: { if(!_pBrdcaster){ _pBrdcaster = RtpBroadCaster::get(get_local_ip(),_mediaInfo._vhost, _mediaInfo._app, _mediaInfo._streamid); if (!_pBrdcaster) { @@ -729,7 +729,7 @@ bool RtspSession::handleReq_Play(const Parser &parser) { SockUtil::setNoDelay(_sock->rawFD(),false); (*this) << SocketFlags(kSockFlags); - if (!_pRtpReader && _rtpType != PlayerBase::RTP_MULTICAST) { + if (!_pRtpReader && _rtpType != Rtsp::RTP_MULTICAST) { weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); _pRtpReader = pMediaSrc->getRing()->attach(getPoller(),useBuf); _pRtpReader->setDetachCB([weakSelf]() { @@ -959,7 +959,7 @@ inline bool RtspSession::findStream() { inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) { //InfoL<<(int)pkt.Interleaved; switch (_rtpType) { - case PlayerBase::RTP_TCP: { + case Rtsp::RTP_TCP: { BufferRtp::Ptr buffer(new BufferRtp(pkt)); send(buffer); #ifdef RTSP_SEND_RTCP @@ -977,7 +977,7 @@ inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) { #endif } break; - case PlayerBase::RTP_UDP: { + case Rtsp::RTP_UDP: { int iTrackIndex = getTrackIndexByTrackType(pkt->type); auto &pSock = _apRtpSock[iTrackIndex]; if (!pSock) { @@ -1051,7 +1051,7 @@ inline void RtspSession::startListenPeerUdpData(int trackIdx) { }; switch (_rtpType){ - case PlayerBase::RTP_MULTICAST:{ + case Rtsp::RTP_MULTICAST:{ //组播使用的共享rtcp端口 UDPServer::Instance().listenPeer(get_peer_ip().data(), this, [onUdpData]( int iTrackIdx, const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr) { @@ -1059,7 +1059,7 @@ inline void RtspSession::startListenPeerUdpData(int trackIdx) { }); } break; - case PlayerBase::RTP_UDP:{ + case Rtsp::RTP_UDP:{ auto setEvent = [&](Socket::Ptr &sock,int iTrackIdx){ if(!sock){ WarnL << "udp端口为空:" << iTrackIdx; diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index 3aa2009f..5b2a7fa6 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -159,7 +159,7 @@ private: MediaInfo _mediaInfo; std::weak_ptr _pMediaSrc; RingBuffer::RingReader::Ptr _pRtpReader; - PlayerBase::eRtpType _rtpType = PlayerBase::RTP_Invalid; + Rtsp::eRtpType _rtpType = Rtsp::RTP_Invalid; vector _aTrackInfo; //RTP over udp diff --git a/tests/test_rtmpPusher.cpp b/tests/test_pusher.cpp similarity index 74% rename from tests/test_rtmpPusher.cpp rename to tests/test_pusher.cpp index dafd13cd..836cf382 100644 --- a/tests/test_rtmpPusher.cpp +++ b/tests/test_pusher.cpp @@ -32,47 +32,48 @@ #include "Player/PlayerProxy.h" #include "Rtmp/RtmpPusher.h" #include "Common/config.h" +#include "Pusher/MediaPusher.h" using namespace std; using namespace toolkit; using namespace mediakit; //推流器,保持强引用 -RtmpPusher::Ptr pusher; +MediaPusher::Ptr pusher; //声明函数 -void rePushDelay(const string &app, const string &stream, const string &url); +void rePushDelay(const string &schema,const string &vhost,const string &app, const string &stream, const string &url); //创建推流器并开始推流 -void createPusher(const string &app, const string &stream, const string &url) { - //创建推流器并绑定一个RtmpMediaSource - pusher.reset(new RtmpPusher(DEFAULT_VHOST, app.data(), stream.data())); +void createPusher(const string &schema,const string &vhost,const string &app, const string &stream, const string &url) { + //创建推流器并绑定一个MediaSource + pusher.reset(new MediaPusher(schema,vhost, app, stream)); //设置推流中断处理逻辑 - pusher->setOnShutdown([app, stream, url](const SockException &ex) { + pusher->setOnShutdown([schema,vhost, app, stream, url](const SockException &ex) { WarnL << "Server connection is closed:" << ex.getErrCode() << " " << ex.what(); //重试 - rePushDelay(app, stream, url); + rePushDelay(schema,vhost,app, stream, url); }); //设置发布结果处理逻辑 - pusher->setOnPublished([app, stream, url](const SockException &ex) { + pusher->setOnPublished([schema,vhost, app, stream, url](const SockException &ex) { if (ex) { WarnL << "Publish fail:" << ex.getErrCode() << " " << ex.what(); //如果发布失败,就重试 - rePushDelay(app, stream, url); + rePushDelay(schema,vhost,app, stream, url); } else { InfoL << "Publish success,Please play with player:" << url; } }); - pusher->publish(url.data()); + pusher->publish(url); } Timer::Ptr g_timer; //推流失败或断开延迟2秒后重试推流 -void rePushDelay(const string &app, const string &stream, const string &url) { - g_timer = std::make_shared(2,[app, stream, url]() { +void rePushDelay(const string &schema,const string &vhost,const string &app, const string &stream, const string &url) { + g_timer = std::make_shared(2,[schema,vhost,app, stream, url]() { InfoL << "Re-Publishing..."; //重新推流 - createPusher(app, stream, url); + createPusher(schema,vhost,app, stream, url); //此任务不重复 return false; }, nullptr); @@ -80,9 +81,6 @@ void rePushDelay(const string &app, const string &stream, const string &url) { //这里才是真正执行main函数,你可以把函数名(domain)改成main,然后就可以输入自定义url了 int domain(const string &playUrl, const string &pushUrl) { - //设置退出信号处理函数 - static semaphore sem; - signal(SIGINT, [](int) { sem.post(); });// 设置退出信号 //设置日志 Logger::Instance().add(std::make_shared()); Logger::Instance().setWriter(std::make_shared()); @@ -96,17 +94,21 @@ int domain(const string &playUrl, const string &pushUrl) { NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaChanged, [pushUrl](BroadcastMediaChangedArgs) { //媒体源"app/stream"已经注册,这时方可新建一个RtmpPusher对象并绑定该媒体源 - if(bRegist && schema == RTMP_SCHEMA){ - createPusher(app, stream, pushUrl); + if(bRegist && pushUrl.find(schema) == 0){ + createPusher(schema,vhost,app, stream, pushUrl); } }); + + //设置退出信号处理函数 + static semaphore sem; + signal(SIGINT, [](int) { sem.post(); });// 设置退出信号 sem.wait(); return 0; } int main(int argc, char *argv[]) { - return domain("rtmp://live.hkstv.hk.lxdns.com/live/hks", "rtmp://127.0.0.1/live/stream"); + return domain("rtmp://live.hkstv.hk.lxdns.com/live/hks1", "rtsp://127.0.0.1/live/rtsp_push"); } diff --git a/tests/test_rtmpPusherMp4.cpp b/tests/test_pusherMp4.cpp similarity index 73% rename from tests/test_rtmpPusherMp4.cpp rename to tests/test_pusherMp4.cpp index 78d36fe9..89ae36de 100644 --- a/tests/test_rtmpPusherMp4.cpp +++ b/tests/test_pusherMp4.cpp @@ -32,6 +32,7 @@ #include "Player/PlayerProxy.h" #include "Rtmp/RtmpPusher.h" #include "Common/config.h" +#include "Pusher/MediaPusher.h" #include "MediaFile/MediaReader.h" using namespace std; @@ -39,49 +40,49 @@ using namespace toolkit; using namespace mediakit; //推流器,保持强引用 -RtmpPusher::Ptr pusher; +MediaPusher::Ptr pusher; //声明函数 -void rePushDelay(const string &app,const string &stream,const string &url); -void createPusher(const string &app,const string &stream,const string &url); +void rePushDelay(const string &schema,const string &vhost,const string &app, const string &stream, const string &url); + //创建推流器并开始推流 -void createPusher(const string &app,const string &stream,const string &url){ - auto rtmpSrc = dynamic_pointer_cast(MediaReader::onMakeMediaSource(RTMP_SCHEMA,DEFAULT_VHOST,app,stream)); - if(!rtmpSrc){ +void createPusher(const string &schema,const string &vhost,const string &app, const string &stream, const string &url) { + auto src = MediaReader::onMakeMediaSource(schema,vhost,app,stream); + if(!src){ //文件不存在 WarnL << "MP4 file not exited!"; return; } - //创建推流器并绑定一个RtmpMediaSource - pusher.reset(new RtmpPusher(rtmpSrc)); + //创建推流器并绑定一个MediaSource + pusher.reset(new MediaPusher(src)); //设置推流中断处理逻辑 - pusher->setOnShutdown([app,stream, url](const SockException &ex) { + pusher->setOnShutdown([schema,vhost,app,stream, url](const SockException &ex) { WarnL << "Server connection is closed:" << ex.getErrCode() << " " << ex.what(); //重新推流 - rePushDelay(app, stream, url); + rePushDelay(schema,vhost,app, stream, url); }); //设置发布结果处理逻辑 - pusher->setOnPublished([app,stream, url](const SockException &ex) { + pusher->setOnPublished([schema,vhost,app,stream, url](const SockException &ex) { if (ex) { WarnL << "Publish fail:" << ex.getErrCode() << " " << ex.what(); //如果发布失败,就重试 - rePushDelay(app,stream, url); + rePushDelay(schema,vhost,app, stream, url); }else { InfoL << "Publish success,Please play with player:" << url; } }); - pusher->publish(url.data()); + pusher->publish(url); } Timer::Ptr g_timer; //推流失败或断开延迟2秒后重试推流 -void rePushDelay(const string &app, const string &stream, const string &url) { - g_timer = std::make_shared(2,[app, stream, url]() { +void rePushDelay(const string &schema,const string &vhost,const string &app, const string &stream, const string &url) { + g_timer = std::make_shared(2,[schema,vhost,app, stream, url]() { InfoL << "Re-Publishing..."; //重新推流 - createPusher(app, stream, url); + createPusher(schema,vhost,app, stream, url); //此任务不重复 return false; }, nullptr); @@ -101,7 +102,7 @@ int domain(const string & filePath,const string & pushUrl){ string appName = mINI::Instance()[Record::kAppName]; //app必须record,filePath(流id)为相对于httpRoot/record的路径,否则MediaReader会找到不该文件 //限制app为record是为了防止服务器上的文件被肆意访问 - createPusher(appName,filePath,pushUrl); + createPusher(FindField(pushUrl.data(), nullptr,"://"),DEFAULT_VHOST,appName,filePath,pushUrl); sem.wait(); return 0; @@ -112,7 +113,7 @@ int domain(const string & filePath,const string & pushUrl){ int main(int argc,char *argv[]){ //MP4文件需要放置在 httpRoot/record目录下,文件负载必须为h264+aac //可以使用test_server生成的mp4文件 - return domain("app/stream/2017-09-30/12-55-38.mp4","rtmp://jizan.iok.la/live/test"); + return domain("app/stream/2017-09-30/12-55-38.mp4","rtsp://127.0.0.1/live/rtsp_push"); }