添加rtsp推流器

整理代码
This commit is contained in:
xiongziliang 2019-03-27 18:41:52 +08:00
parent e3ab51b337
commit b1a2de3853
35 changed files with 1210 additions and 273 deletions

View File

@ -35,9 +35,9 @@ using namespace toolkit;
namespace mediakit { namespace mediakit {
DevChannel::DevChannel(const char *strVhost, DevChannel::DevChannel(const string &strVhost,
const char *strApp, const string &strApp,
const char *strId, const string &strId,
float fDuration, float fDuration,
bool bEanbleHls, bool bEanbleHls,
bool bEnableMp4) : bool bEnableMp4) :

View File

@ -70,9 +70,9 @@ class DevChannel : public MultiMediaSourceMuxer{
public: public:
typedef std::shared_ptr<DevChannel> Ptr; typedef std::shared_ptr<DevChannel> Ptr;
//fDuration<=0为直播否则为点播 //fDuration<=0为直播否则为点播
DevChannel(const char *strVhost, DevChannel(const string &strVhost,
const char *strApp, const string &strApp,
const char *strId, const string &strId,
float fDuration = 0, float fDuration = 0,
bool bEanbleHls = true, bool bEanbleHls = true,
bool bEnableMp4 = false); bool bEnableMp4 = false);

View File

@ -82,9 +82,9 @@ Sdp::Ptr Factory::getSdpByTrack(const Track::Ptr &track) {
Track::Ptr Factory::getTrackBySdp(const SdpTrack::Ptr &track) { Track::Ptr Factory::getTrackBySdp(const SdpTrack::Ptr &track) {
if (strcasecmp(track->_codec.data(), "mpeg4-generic") == 0) { if (strcasecmp(track->_codec.data(), "mpeg4-generic") == 0) {
string aac_cfg_str = FindField(track->_fmtp.c_str(), "config=", nullptr); string aac_cfg_str = FindField(track->_fmtp.data(), "config=", nullptr);
if (aac_cfg_str.size() != 4) { if (aac_cfg_str.size() != 4) {
aac_cfg_str = FindField(track->_fmtp.c_str(), "config=", ";"); aac_cfg_str = FindField(track->_fmtp.data(), "config=", ";");
} }
if (aac_cfg_str.size() != 4) { if (aac_cfg_str.size() != 4) {
//延后获取adts头 //延后获取adts头
@ -93,12 +93,12 @@ Track::Ptr Factory::getTrackBySdp(const SdpTrack::Ptr &track) {
string aac_cfg; string aac_cfg;
unsigned int cfg1; unsigned int cfg1;
sscanf(aac_cfg_str.substr(0, 2).c_str(), "%02X", &cfg1); sscanf(aac_cfg_str.substr(0, 2).data(), "%02X", &cfg1);
cfg1 &= 0x00FF; cfg1 &= 0x00FF;
aac_cfg.push_back(cfg1); aac_cfg.push_back(cfg1);
unsigned int cfg2; unsigned int cfg2;
sscanf(aac_cfg_str.substr(2, 2).c_str(), "%02X", &cfg2); sscanf(aac_cfg_str.substr(2, 2).data(), "%02X", &cfg2);
cfg2 &= 0x00FF; cfg2 &= 0x00FF;
aac_cfg.push_back(cfg2); aac_cfg.push_back(cfg2);
@ -106,12 +106,12 @@ Track::Ptr Factory::getTrackBySdp(const SdpTrack::Ptr &track) {
} }
if (strcasecmp(track->_codec.data(), "h264") == 0) { if (strcasecmp(track->_codec.data(), "h264") == 0) {
string sps_pps = FindField(track->_fmtp.c_str(), "sprop-parameter-sets=", nullptr); string sps_pps = FindField(track->_fmtp.data(), "sprop-parameter-sets=", nullptr);
if(sps_pps.empty()){ if(sps_pps.empty()){
return std::make_shared<H264Track>(); return std::make_shared<H264Track>();
} }
string base64_SPS = FindField(sps_pps.c_str(), NULL, ","); string base64_SPS = FindField(sps_pps.data(), NULL, ",");
string base64_PPS = FindField(sps_pps.c_str(), ",", NULL); string base64_PPS = FindField(sps_pps.data(), ",", NULL);
if(base64_PPS.back() == ';'){ if(base64_PPS.back() == ';'){
base64_PPS.pop_back(); base64_PPS.pop_back();
} }
@ -125,13 +125,13 @@ Track::Ptr Factory::getTrackBySdp(const SdpTrack::Ptr &track) {
//a=fmtp:96 sprop-sps=QgEBAWAAAAMAsAAAAwAAAwBdoAKAgC0WNrkky/AIAAADAAgAAAMBlQg=; sprop-pps=RAHA8vA8kAA= //a=fmtp:96 sprop-sps=QgEBAWAAAAMAsAAAAwAAAwBdoAKAgC0WNrkky/AIAAADAAgAAAMBlQg=; sprop-pps=RAHA8vA8kAA=
int pt; int pt;
char sprop_vps[128] = {0},sprop_sps[128] = {0},sprop_pps[128] = {0}; char sprop_vps[128] = {0},sprop_sps[128] = {0},sprop_pps[128] = {0};
if (4 == sscanf(track->_fmtp.c_str(), "%d sprop-vps=%127[^;]; sprop-sps=%127[^;]; sprop-pps=%127[^;]", &pt, sprop_vps,sprop_sps, sprop_pps)) { if (4 == sscanf(track->_fmtp.data(), "%d sprop-vps=%127[^;]; sprop-sps=%127[^;]; sprop-pps=%127[^;]", &pt, sprop_vps,sprop_sps, sprop_pps)) {
auto vps = decodeBase64(sprop_vps); auto vps = decodeBase64(sprop_vps);
auto sps = decodeBase64(sprop_sps); auto sps = decodeBase64(sprop_sps);
auto pps = decodeBase64(sprop_pps); auto pps = decodeBase64(sprop_pps);
return std::make_shared<H265Track>(vps,sps,pps,0,0,0); return std::make_shared<H265Track>(vps,sps,pps,0,0,0);
} }
if (3 == sscanf(track->_fmtp.c_str(), "%d sprop-sps=%127[^;]; sprop-pps=%127[^;]", &pt,sprop_sps, sprop_pps)) { if (3 == sscanf(track->_fmtp.data(), "%d sprop-sps=%127[^;]; sprop-pps=%127[^;]", &pt,sprop_sps, sprop_pps)) {
auto sps = decodeBase64(sprop_sps); auto sps = decodeBase64(sprop_sps);
auto pps = decodeBase64(sprop_pps); auto pps = decodeBase64(sprop_pps);
return std::make_shared<H265Track>("",sps,pps,0,0,0); return std::make_shared<H265Track>("",sps,pps,0,0,0);

View File

@ -71,7 +71,7 @@ int64_t HttpDownloader::onResponseHeader(const string& status,const HttpHeader&
File::delete_file(_filePath.data()); File::delete_file(_filePath.data());
if(_onResult){ if(_onResult){
auto errMsg = StrPrinter << "Http Status:" << status << endl; auto errMsg = StrPrinter << "Http Status:" << status << endl;
_onResult(Err_other,errMsg.data(),_filePath.data()); _onResult(Err_other,errMsg,_filePath);
_onResult = nullptr; _onResult = nullptr;
} }
} }
@ -90,7 +90,7 @@ void HttpDownloader::onResponseCompleted() {
//InfoL << "md5Sum:" << getMd5Sum(_filePath); //InfoL << "md5Sum:" << getMd5Sum(_filePath);
_bDownloadSuccess = true; _bDownloadSuccess = true;
if(_onResult){ if(_onResult){
_onResult(Err_success,"success",_filePath.data()); _onResult(Err_success,"success",_filePath);
_onResult = nullptr; _onResult = nullptr;
} }
} }
@ -101,7 +101,7 @@ void HttpDownloader::onDisconnect(const SockException &ex) {
File::delete_file(_filePath.data()); File::delete_file(_filePath.data());
} }
if(_onResult){ if(_onResult){
_onResult(ex.getErrCode(),ex.what(),_filePath.data()); _onResult(ex.getErrCode(),ex.what(),_filePath);
_onResult = nullptr; _onResult = nullptr;
} }
} }

View File

@ -34,7 +34,7 @@ namespace mediakit {
class HttpDownloader: public HttpClientImp { class HttpDownloader: public HttpClientImp {
public: public:
typedef std::shared_ptr<HttpDownloader> Ptr; typedef std::shared_ptr<HttpDownloader> Ptr;
typedef std::function<void(ErrCode code,const char *errMsg,const char *filePath)> onDownloadResult; typedef std::function<void(ErrCode code,const string &errMsg,const string &filePath)> onDownloadResult;
HttpDownloader(); HttpDownloader();
virtual ~HttpDownloader(); virtual ~HttpDownloader();
//开始下载文件,默认断点续传方式下载 //开始下载文件,默认断点续传方式下载

View File

@ -114,7 +114,7 @@ bool HLSMaker::write_index_file(int iFirstSegment, unsigned int uiLastSegment, i
sizeof(acWriteBuf), sizeof(acWriteBuf),
"#EXTINF:%.3f,\r\n%s-%u.ts\r\n", "#EXTINF:%.3f,\r\n%s-%u.ts\r\n",
_iDurations[i-iFirstSegment]/1000.0, _iDurations[i-iFirstSegment]/1000.0,
_strFileName.c_str(), _strFileName.data(),
i); i);
if (fwrite(acWriteBuf, strlen(acWriteBuf), 1, pM3u8File.get()) != 1) { if (fwrite(acWriteBuf, strlen(acWriteBuf), 1, pM3u8File.get()) != 1) {
WarnL << "Could not write to m3u8 index file, will not continue writing to index file"; WarnL << "Could not write to m3u8 index file, will not continue writing to index file";

View File

@ -127,7 +127,7 @@ MediaReader::MediaReader(const string &strVhost,const string &strApp, const stri
} }
_iDuration = MAX(_video_ms,_audio_ms); _iDuration = MAX(_video_ms,_audio_ms);
_mediaMuxer.reset(new MultiMediaSourceMuxer(strVhost.data(),strApp.data(),strId.data(),_iDuration/1000.0,false, false)); _mediaMuxer.reset(new MultiMediaSourceMuxer(strVhost,strApp,strId,_iDuration/1000.0,false, false));
if (_audio_trId != MP4_INVALID_TRACK_ID) { if (_audio_trId != MP4_INVALID_TRACK_ID) {
AACTrack::Ptr track = std::make_shared<AACTrack>(_strAacCfg); AACTrack::Ptr track = std::make_shared<AACTrack>(_strAacCfg);
_mediaMuxer->addTrack(track); _mediaMuxer->addTrack(track);

View File

@ -93,7 +93,7 @@ void TSMaker::flush() {
bool TSMaker::init(const string& filename, uint32_t bufsize) { bool TSMaker::init(const string& filename, uint32_t bufsize) {
m_strFilename = filename; m_strFilename = filename;
if (m_pOutVideoTs == NULL) { if (m_pOutVideoTs == NULL) {
m_pOutVideoTs = File::createfile_file(filename.c_str(), "wb"); m_pOutVideoTs = File::createfile_file(filename.data(), "wb");
if (m_pOutVideoTs == NULL) { if (m_pOutVideoTs == NULL) {
return false; return false;
} }

View File

@ -37,7 +37,7 @@ MediaPlayer::MediaPlayer() {
MediaPlayer::~MediaPlayer() { MediaPlayer::~MediaPlayer() {
} }
void MediaPlayer::play(const char* strUrl) { void MediaPlayer::play(const string &strUrl) {
_parser = PlayerBase::createPlayer(strUrl); _parser = PlayerBase::createPlayer(strUrl);
_parser->setOnShutdown(_shutdownCB); _parser->setOnShutdown(_shutdownCB);
_parser->setOnPlayResult(_playResultCB); _parser->setOnPlayResult(_playResultCB);

View File

@ -43,7 +43,7 @@ public:
MediaPlayer(); MediaPlayer();
virtual ~MediaPlayer(); virtual ~MediaPlayer();
void play(const char* strUrl) override; void play(const string &strUrl) override;
void pause(bool bPause) override; void pause(bool bPause) override;
void teardown() override; void teardown() override;
EventPoller::Ptr getPoller(); EventPoller::Ptr getPoller();

View File

@ -45,14 +45,14 @@ const char PlayerBase::kBeatIntervalMS[] = "beat_interval_ms";
const char PlayerBase::kMaxAnalysisMS[] = "max_analysis_ms"; const char PlayerBase::kMaxAnalysisMS[] = "max_analysis_ms";
PlayerBase::Ptr PlayerBase::createPlayer(const char* strUrl) { PlayerBase::Ptr PlayerBase::createPlayer(const string &strUrl) {
static auto releasePlayer = [](PlayerBase *ptr){ static auto releasePlayer = [](PlayerBase *ptr){
onceToken token(nullptr,[&](){ onceToken token(nullptr,[&](){
delete ptr; delete ptr;
}); });
ptr->teardown(); ptr->teardown();
}; };
string prefix = FindField(strUrl, NULL, "://"); string prefix = FindField(strUrl.data(), NULL, "://");
if (strcasecmp("rtsp",prefix.data()) == 0) { if (strcasecmp("rtsp",prefix.data()) == 0) {
return PlayerBase::Ptr(new RtspPlayerImp(),releasePlayer); return PlayerBase::Ptr(new RtspPlayerImp(),releasePlayer);
} }

View File

@ -86,13 +86,7 @@ public:
class PlayerBase : public DemuxerBase, public mINI{ class PlayerBase : public DemuxerBase, public mINI{
public: public:
typedef std::shared_ptr<PlayerBase> Ptr; typedef std::shared_ptr<PlayerBase> Ptr;
typedef enum { static Ptr createPlayer(const string &strUrl);
RTP_Invalid = -1,
RTP_TCP = 0,
RTP_UDP = 1,
RTP_MULTICAST = 2,
} eRtpType;
static Ptr createPlayer(const char* strUrl);
//指定网卡ip //指定网卡ip
static const char kNetAdapter[]; static const char kNetAdapter[];
@ -122,7 +116,7 @@ public:
* *
* @param strUrl urlrtsp/rtmp * @param strUrl urlrtsp/rtmp
*/ */
virtual void play(const char* strUrl) {} virtual void play(const string &strUrl) {}
/** /**
* *

View File

@ -61,9 +61,9 @@ static uint8_t s_mute_adts[] = {0xff, 0xf1, 0x6c, 0x40, 0x2d, 0x3f, 0xfc, 0x00,
#define MUTE_ADTS_DATA_LEN sizeof(s_mute_adts) #define MUTE_ADTS_DATA_LEN sizeof(s_mute_adts)
#define MUTE_ADTS_DATA_MS 130 #define MUTE_ADTS_DATA_MS 130
PlayerProxy::PlayerProxy(const char *strVhost, PlayerProxy::PlayerProxy(const string &strVhost,
const char *strApp, const string &strApp,
const char *strSrc, const string &strSrc,
bool bEnableHls, bool bEnableHls,
bool bEnableMp4, bool bEnableMp4,
int iRetryCount){ int iRetryCount){
@ -74,10 +74,9 @@ PlayerProxy::PlayerProxy(const char *strVhost,
_bEnableMp4 = bEnableMp4; _bEnableMp4 = bEnableMp4;
_iRetryCount = iRetryCount; _iRetryCount = iRetryCount;
} }
void PlayerProxy::play(const char* strUrl) { void PlayerProxy::play(const string &strUrlTmp) {
weak_ptr<PlayerProxy> weakSelf = shared_from_this(); weak_ptr<PlayerProxy> weakSelf = shared_from_this();
std::shared_ptr<int> piFailedCnt(new int(0)); //连续播放失败次数 std::shared_ptr<int> piFailedCnt(new int(0)); //连续播放失败次数
string strUrlTmp(strUrl);
setOnPlayResult([weakSelf,strUrlTmp,piFailedCnt](const SockException &err) { setOnPlayResult([weakSelf,strUrlTmp,piFailedCnt](const SockException &err) {
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {
@ -109,7 +108,7 @@ void PlayerProxy::play(const char* strUrl) {
strongSelf->rePlay(strUrlTmp,(*piFailedCnt)++); strongSelf->rePlay(strUrlTmp,(*piFailedCnt)++);
} }
}); });
MediaPlayer::play(strUrl); MediaPlayer::play(strUrlTmp);
} }
PlayerProxy::~PlayerProxy() { PlayerProxy::~PlayerProxy() {
@ -126,7 +125,7 @@ void PlayerProxy::rePlay(const string &strUrl,int iFailedCnt){
return false; return false;
} }
WarnL << "重试播放[" << iFailedCnt << "]:" << strUrl; WarnL << "重试播放[" << iFailedCnt << "]:" << strUrl;
strongPlayer->MediaPlayer::play(strUrl.data()); strongPlayer->MediaPlayer::play(strUrl);
return false; return false;
}, nullptr); }, nullptr);
} }
@ -170,7 +169,7 @@ private:
}; };
void PlayerProxy::onPlaySuccess() { void PlayerProxy::onPlaySuccess() {
_mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost.data(),_strApp.data(),_strSrc.data(),getDuration(),_bEnableHls,_bEnableMp4)); _mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost,_strApp,_strSrc,getDuration(),_bEnableHls,_bEnableMp4));
_mediaMuxer->setListener(shared_from_this()); _mediaMuxer->setListener(shared_from_this());
auto videoTrack = getTrack(TrackVideo,false); auto videoTrack = getTrack(TrackVideo,false);

View File

@ -46,16 +46,16 @@ public:
//如果iRetryCount<0,则一直重试播放否则重试iRetryCount次数 //如果iRetryCount<0,则一直重试播放否则重试iRetryCount次数
//默认一直重试 //默认一直重试
PlayerProxy(const char *strVhost, PlayerProxy(const string &strVhost,
const char *strApp, const string &strApp,
const char *strSrc, const string &strSrc,
bool bEnableHls = true, bool bEnableHls = true,
bool bEnableMp4 = false, bool bEnableMp4 = false,
int iRetryCount = -1); int iRetryCount = -1);
virtual ~PlayerProxy(); virtual ~PlayerProxy();
void play(const char* strUrl) override; void play(const string &strUrl) override;
bool close() override; bool close() override;
private: private:
void rePlay(const string &strUrl,int iFailedCnt); void rePlay(const string &strUrl,int iFailedCnt);

View File

@ -0,0 +1,66 @@
/*
* MIT License
*
* Copyright (c) 2016 xiongziliang <771730766@qq.com>
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#include <algorithm>
#include "MediaPusher.h"
#include "PusherBase.h"
using namespace toolkit;
namespace mediakit {
MediaPusher::MediaPusher(const MediaSource::Ptr &src) {
_src = src;
}
MediaPusher::MediaPusher(const string &schema,
const string &strVhost,
const string &strApp,
const string &strStream) {
_src = MediaSource::find(schema,strVhost,strApp,strStream);
}
MediaPusher::~MediaPusher() {
}
void MediaPusher::publish(const string &strUrl) {
_parser = PusherBase::createPusher(_src,strUrl);
_parser->setOnShutdown(_shutdownCB);
_parser->setOnPublished(_publishCB);
_parser->mINI::operator=(*this);
_parser->publish(strUrl);
}
EventPoller::Ptr MediaPusher::getPoller(){
auto parser = dynamic_pointer_cast<SocketHelper>(_parser);
if(!parser){
return nullptr;
}
return parser->getPoller();
}
} /* namespace mediakit */

59
src/Pusher/MediaPusher.h Normal file
View File

@ -0,0 +1,59 @@
/*
* MIT License
*
* Copyright (c) 2016 xiongziliang <771730766@qq.com>
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#ifndef SRC_PUSHER_MEDIAPUSHER_H_
#define SRC_PUSHER_MEDIAPUSHER_H_
#include <memory>
#include <string>
#include "PusherBase.h"
#include "Thread/TaskExecutor.h"
using namespace toolkit;
namespace mediakit {
class MediaPusher : public PusherImp<PusherBase,PusherBase> {
public:
typedef std::shared_ptr<MediaPusher> Ptr;
MediaPusher(const string &schema,
const string &strVhost,
const string &strApp,
const string &strStream);
MediaPusher(const MediaSource::Ptr &src);
virtual ~MediaPusher();
void publish(const string &strUrl) override;
EventPoller::Ptr getPoller();
private:
MediaSource::Ptr _src;
};
} /* namespace mediakit */
#endif /* SRC_PUSHER_MEDIAPUSHER_H_ */

72
src/Pusher/PusherBase.cpp Normal file
View File

@ -0,0 +1,72 @@
/*
* MIT License
*
* Copyright (c) 2016 xiongziliang <771730766@qq.com>
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#include <algorithm>
#include "PusherBase.h"
#include "Rtsp/Rtsp.h"
#include "Rtsp/RtspPusher.h"
#include "Rtmp/RtmpPusher.h"
using namespace toolkit;
namespace mediakit {
const char PusherBase::kNetAdapter[] = "net_adapter";
const char PusherBase::kRtpType[] = "rtp_type";
const char PusherBase::kRtspUser[] = "rtsp_user" ;
const char PusherBase::kRtspPwd[] = "rtsp_pwd";
const char PusherBase::kRtspPwdIsMD5[] = "rtsp_pwd_md5";
const char PusherBase::kPlayTimeoutMS[] = "play_timeout_ms";
const char PusherBase::kMediaTimeoutMS[] = "media_timeout_ms";
const char PusherBase::kBeatIntervalMS[] = "beat_interval_ms";
PusherBase::Ptr PusherBase::createPusher(const MediaSource::Ptr &src,
const string & strUrl) {
static auto releasePusher = [](PusherBase *ptr){
onceToken token(nullptr,[&](){
delete ptr;
});
ptr->teardown();
};
string prefix = FindField(strUrl.data(), NULL, "://");
if (strcasecmp("rtsp",prefix.data()) == 0) {
return PusherBase::Ptr(new RtspPusher(dynamic_pointer_cast<RtspMediaSource>(src)),releasePusher);
}
if (strcasecmp("rtmp",prefix.data()) == 0) {
return PusherBase::Ptr(new RtmpPusher(dynamic_pointer_cast<RtmpMediaSource>(src)),releasePusher);
}
return PusherBase::Ptr(new RtspPusher(dynamic_pointer_cast<RtspMediaSource>(src)),releasePusher);
}
PusherBase::PusherBase() {
this->mINI::operator[](kPlayTimeoutMS) = 10000;
this->mINI::operator[](kMediaTimeoutMS) = 5000;
this->mINI::operator[](kBeatIntervalMS) = 5000;
}
} /* namespace mediakit */

152
src/Pusher/PusherBase.h Normal file
View File

@ -0,0 +1,152 @@
/*
* MIT License
*
* Copyright (c) 2016 xiongziliang <771730766@qq.com>
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#ifndef SRC_PUSHER_PUSHERBASE_H_
#define SRC_PUSHER_PUSHERBASE_H_
#include <map>
#include <memory>
#include <string>
#include <functional>
#include "Network/Socket.h"
#include "Util/mini.h"
#include "Common/MediaSource.h"
using namespace toolkit;
namespace mediakit {
class PusherBase : public mINI{
public:
typedef std::shared_ptr<PusherBase> Ptr;
static Ptr createPusher(const MediaSource::Ptr &src,
const string &strUrl);
//指定网卡ip
static const char kNetAdapter[];
//设置rtp传输类型可选项有0(tcp默认)、1(udp)、2(组播)
//设置方法:player[PusherBase::kRtpType] = 0/1/2;
static const char kRtpType[];
//rtsp认证用户名
static const char kRtspUser[];
//rtsp认证用用户密码可以是明文也可以是md5,md5密码生成方式 md5(username:realm:password)
static const char kRtspPwd[];
//rtsp认证用用户密码是否为md5类型
static const char kRtspPwdIsMD5[];
//播放超时时间默认10,000 毫秒
static const char kPlayTimeoutMS[];
//rtp/rtmp包接收超时时间默认5000秒
static const char kMediaTimeoutMS[];
//rtsp/rtmp心跳时间,默认5000毫秒
static const char kBeatIntervalMS[];
typedef std::function<void(const SockException &ex)> Event;
PusherBase();
virtual ~PusherBase(){}
/**
*
* @param strUrl urlrtsp/rtmp
*/
virtual void publish(const string &strUrl) = 0;
/**
*
*/
virtual void teardown() = 0;
/**
*
* @param onPublished
*/
virtual void setOnPublished(const Event &cb) = 0;
/**
*
* @param onShutdown
*/
virtual void setOnShutdown(const Event &cb) = 0;
};
template<typename Parent,typename Parser>
class PusherImp : public Parent {
public:
typedef std::shared_ptr<PusherImp> Ptr;
PusherImp(){}
virtual ~PusherImp(){}
/**
*
* @param strUrl urlrtsp/rtmp
*/
void publish(const string &strUrl) override{
if (_parser) {
_parser->publish(strUrl);
}
}
/**
*
*/
void teardown() override{
if (_parser) {
_parser->teardown();
}
}
/**
*
* @param onPublished
*/
void setOnPublished(const PusherBase::Event &cb) override{
if (_parser) {
_parser->setOnPublished(cb);
}
_publishCB = cb;
}
/**
*
* @param onShutdown
*/
void setOnShutdown(const PusherBase::Event &cb) override{
if (_parser) {
_parser->setOnShutdown(cb);
}
_shutdownCB = cb;
}
protected:
PusherBase::Event _shutdownCB;
PusherBase::Event _publishCB;
std::shared_ptr<Parser> _parser;
};
} /* namespace mediakit */
#endif /* SRC_PUSHER_PUSHERBASE_H_ */

View File

@ -84,6 +84,7 @@ using namespace toolkit;
#define FLV_KEY_FRAME 1 #define FLV_KEY_FRAME 1
#define FLV_INTER_FRAME 2 #define FLV_INTER_FRAME 2
namespace mediakit {
#if defined(_WIN32) #if defined(_WIN32)
#pragma pack(push, 1) #pragma pack(push, 1)
@ -291,7 +292,7 @@ public:
} }
}; };
}//namespace mediakit

View File

@ -72,11 +72,11 @@ void RtmpPlayer::teardown() {
shutdown(); shutdown();
} }
} }
void RtmpPlayer::play(const char* strUrl) { void RtmpPlayer::play(const string &strUrl) {
teardown(); teardown();
string strHost = FindField(strUrl, "://", "/"); string strHost = FindField(strUrl.data(), "://", "/");
_strApp = FindField(strUrl, (strHost + "/").data(), "/"); _strApp = FindField(strUrl.data(), (strHost + "/").data(), "/");
_strStream = FindField(strUrl, (strHost + "/" + _strApp + "/").data(), NULL); _strStream = FindField(strUrl.data(), (strHost + "/" + _strApp + "/").data(), NULL);
_strTcUrl = string("rtmp://") + strHost + "/" + _strApp; _strTcUrl = string("rtmp://") + strHost + "/" + _strApp;
if (!_strApp.size() || !_strStream.size()) { if (!_strApp.size() || !_strStream.size()) {
@ -85,13 +85,13 @@ void RtmpPlayer::play(const char* strUrl) {
} }
DebugL << strHost << " " << _strApp << " " << _strStream; DebugL << strHost << " " << _strApp << " " << _strStream;
auto iPort = atoi(FindField(strHost.c_str(), ":", NULL).c_str()); auto iPort = atoi(FindField(strHost.data(), ":", NULL).data());
if (iPort <= 0) { if (iPort <= 0) {
//rtmp 默认端口1935 //rtmp 默认端口1935
iPort = 1935; iPort = 1935;
} else { } else {
//服务器域名 //服务器域名
strHost = FindField(strHost.c_str(), NULL, ":"); strHost = FindField(strHost.data(), NULL, ":");
} }
if(!(*this)[PlayerBase::kNetAdapter].empty()){ if(!(*this)[PlayerBase::kNetAdapter].empty()){
setNetAdapter((*this)[PlayerBase::kNetAdapter]); setNetAdapter((*this)[PlayerBase::kNetAdapter]);

View File

@ -49,7 +49,7 @@ public:
RtmpPlayer(); RtmpPlayer();
virtual ~RtmpPlayer(); virtual ~RtmpPlayer();
void play(const char* strUrl) override; void play(const string &strUrl) override;
void pause(bool bPause) override; void pause(bool bPause) override;
void teardown() override; void teardown() override;
protected: protected:

View File

@ -56,7 +56,7 @@ public:
fProgress = MAX(float(0),MIN(fProgress,float(1.0))); fProgress = MAX(float(0),MIN(fProgress,float(1.0)));
seekToMilliSecond(fProgress * getDuration() * 1000); seekToMilliSecond(fProgress * getDuration() * 1000);
}; };
void play(const char* strUrl) override { void play(const string &strUrl) override {
_analysisMs = (*this)[PlayerBase::kMaxAnalysisMS].as<int>(); _analysisMs = (*this)[PlayerBase::kMaxAnalysisMS].as<int>();
PlayerImp<RtmpPlayer,RtmpDemuxer>::play(strUrl); PlayerImp<RtmpPlayer,RtmpDemuxer>::play(strUrl);
} }

View File

@ -325,7 +325,7 @@ void RtmpProtocol::handle_C0C1() {
if (_strRcvBuf[0] != HANDSHAKE_PLAINTEXT) { if (_strRcvBuf[0] != HANDSHAKE_PLAINTEXT) {
throw std::runtime_error("only plaintext[0x03] handshake supported"); throw std::runtime_error("only plaintext[0x03] handshake supported");
} }
if(memcmp(_strRcvBuf.c_str() + 5,"\x00\x00\x00\x00",4) ==0 ){ if(memcmp(_strRcvBuf.data() + 5,"\x00\x00\x00\x00",4) ==0 ){
//simple handsharke //simple handsharke
handle_C1_simple(); handle_C1_simple();
}else{ }else{
@ -347,7 +347,7 @@ void RtmpProtocol::handle_C1_simple(){
RtmpHandshake s1(0); RtmpHandshake s1(0);
onSendRawData(obtainBuffer((char *) &s1, C1_HANDSHARK_SIZE)); onSendRawData(obtainBuffer((char *) &s1, C1_HANDSHARK_SIZE));
//发送S2 //发送S2
onSendRawData(obtainBuffer(_strRcvBuf.c_str() + 1, C1_HANDSHARK_SIZE)); onSendRawData(obtainBuffer(_strRcvBuf.data() + 1, C1_HANDSHARK_SIZE));
//等待C2 //等待C2
_nextHandle = [this]() { _nextHandle = [this]() {
handle_C2(); handle_C2();

View File

@ -35,14 +35,6 @@ namespace mediakit {
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE; static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;
RtmpPusher::RtmpPusher(const char *strVhost,const char *strApp,const char *strStream) {
auto src = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTMP_SCHEMA,strVhost,strApp,strStream));
if (!src) {
auto strErr = StrPrinter << "media source:" << strVhost << "/" << strApp << "/" << strStream << "not found!" << endl;
throw std::runtime_error(strErr);
}
_pMediaSrc=src;
}
RtmpPusher::RtmpPusher(const RtmpMediaSource::Ptr &src){ RtmpPusher::RtmpPusher(const RtmpMediaSource::Ptr &src){
_pMediaSrc=src; _pMediaSrc=src;
} }
@ -70,11 +62,11 @@ void RtmpPusher::teardown() {
} }
} }
void RtmpPusher::publish(const char* strUrl) { void RtmpPusher::publish(const string &strUrl) {
teardown(); teardown();
string strHost = FindField(strUrl, "://", "/"); string strHost = FindField(strUrl.data(), "://", "/");
_strApp = FindField(strUrl, (strHost + "/").data(), "/"); _strApp = FindField(strUrl.data(), (strHost + "/").data(), "/");
_strStream = FindField(strUrl, (strHost + "/" + _strApp + "/").data(), NULL); _strStream = FindField(strUrl.data(), (strHost + "/" + _strApp + "/").data(), NULL);
_strTcUrl = string("rtmp://") + strHost + "/" + _strApp; _strTcUrl = string("rtmp://") + strHost + "/" + _strApp;
if (!_strApp.size() || !_strStream.size()) { if (!_strApp.size() || !_strStream.size()) {
@ -83,27 +75,18 @@ void RtmpPusher::publish(const char* strUrl) {
} }
DebugL << strHost << " " << _strApp << " " << _strStream; DebugL << strHost << " " << _strApp << " " << _strStream;
auto iPort = atoi(FindField(strHost.c_str(), ":", NULL).c_str()); auto iPort = atoi(FindField(strHost.data(), ":", NULL).data());
if (iPort <= 0) { if (iPort <= 0) {
//rtmp 默认端口1935 //rtmp 默认端口1935
iPort = 1935; iPort = 1935;
} else { } else {
//服务器域名 //服务器域名
strHost = FindField(strHost.c_str(), NULL, ":"); strHost = FindField(strHost.data(), NULL, ":");
} }
startConnect(strHost, iPort);
}
void RtmpPusher::onErr(const SockException &ex){
onShutdown(ex);
}
void RtmpPusher::onConnect(const SockException &err){
if(err.getErrCode()!=Err_success) {
onPublishResult(err);
return;
}
weak_ptr<RtmpPusher> weakSelf = dynamic_pointer_cast<RtmpPusher>(shared_from_this()); weak_ptr<RtmpPusher> weakSelf = dynamic_pointer_cast<RtmpPusher>(shared_from_this());
_pPublishTimer.reset( new Timer(10, [weakSelf]() { float playTimeOutSec = (*this)[kPlayTimeoutMS].as<int>() / 1000.0;
_pPublishTimer.reset( new Timer(playTimeOutSec, [weakSelf]() {
auto strongSelf=weakSelf.lock(); auto strongSelf=weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {
return false; return false;
@ -112,12 +95,30 @@ void RtmpPusher::onConnect(const SockException &err){
strongSelf->teardown(); strongSelf->teardown();
return false; return false;
},getPoller())); },getPoller()));
if(!(*this)[kNetAdapter].empty()){
setNetAdapter((*this)[kNetAdapter]);
}
startConnect(strHost, iPort);
}
void RtmpPusher::onErr(const SockException &ex){
onShutdown(ex);
}
void RtmpPusher::onConnect(const SockException &err){
if(err) {
onPublishResult(err);
return;
}
weak_ptr<RtmpPusher> weakSelf = dynamic_pointer_cast<RtmpPusher>(shared_from_this());
startClientSession([weakSelf](){ startClientSession([weakSelf](){
auto strongSelf=weakSelf.lock(); auto strongSelf=weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {
return; return;
} }
//strongSelf->sendChunkSize(60000);
strongSelf->sendChunkSize(60000);
strongSelf->send_connect(); strongSelf->send_connect();
}); });
} }

View File

@ -30,28 +30,27 @@
#include "RtmpProtocol.h" #include "RtmpProtocol.h"
#include "RtmpMediaSource.h" #include "RtmpMediaSource.h"
#include "Network/TcpClient.h" #include "Network/TcpClient.h"
#include "Pusher/PusherBase.h"
namespace mediakit { namespace mediakit {
class RtmpPusher: public RtmpProtocol , public TcpClient{ class RtmpPusher: public RtmpProtocol , public TcpClient , public PusherBase{
public: public:
typedef std::shared_ptr<RtmpPusher> Ptr; typedef std::shared_ptr<RtmpPusher> Ptr;
typedef std::function<void(const SockException &ex)> Event;
RtmpPusher(const char *strVhost,const char *strApp,const char *strStream);
RtmpPusher(const RtmpMediaSource::Ptr &src); RtmpPusher(const RtmpMediaSource::Ptr &src);
virtual ~RtmpPusher(); virtual ~RtmpPusher();
void publish(const char* strUrl); void publish(const string &strUrl) override ;
void teardown();
void setOnPublished(Event onPublished) { void teardown() override;
_onPublished = onPublished;
void setOnPublished(const Event &cb) override {
_onPublished = cb;
} }
void setOnShutdown(Event onShutdown) { void setOnShutdown(const Event &cb) override{
_onShutdown = onShutdown; _onShutdown = cb;
} }
protected: protected:
//for Tcpclient override //for Tcpclient override
void onRecv(const Buffer::Ptr &pBuf) override; void onRecv(const Buffer::Ptr &pBuf) override;

View File

@ -27,6 +27,8 @@
#include <stdlib.h> #include <stdlib.h>
#include "Rtsp.h" #include "Rtsp.h"
namespace mediakit{
string FindField(const char* buf, const char* start, const char *end ,int bufSize) { string FindField(const char* buf, const char* start, const char *end ,int bufSize) {
if(bufSize <=0 ){ if(bufSize <=0 ){
bufSize = strlen(buf); bufSize = strlen(buf);
@ -185,5 +187,5 @@ vector<SdpTrack::Ptr> SdpAttr::getAvailableTrack() const {
return ret; return ret;
} }
}//namespace mediakit

View File

@ -38,7 +38,18 @@ using namespace std;
using namespace toolkit; using namespace toolkit;
using namespace mediakit; using namespace mediakit;
class SdpTrack{ namespace mediakit {
namespace Rtsp {
typedef enum {
RTP_Invalid = -1,
RTP_TCP = 0,
RTP_UDP = 1,
RTP_MULTICAST = 2,
} eRtpType;
};
class SdpTrack {
public: public:
typedef std::shared_ptr<SdpTrack> Ptr; typedef std::shared_ptr<SdpTrack> Ptr;
@ -54,8 +65,8 @@ public:
float _start = 0; float _start = 0;
float _end = 0; float _end = 0;
map<char,string> _other; map<char, string> _other;
map<string,string> _attr; map<string, string> _attr;
public: public:
int _pt; int _pt;
string _codec; string _codec;
@ -72,20 +83,27 @@ public:
//时间戳,单位毫秒 //时间戳,单位毫秒
uint32_t _time_stamp = 0; uint32_t _time_stamp = 0;
}; };
class SdpAttr { class SdpAttr {
public: public:
typedef std::shared_ptr<SdpAttr> Ptr; typedef std::shared_ptr<SdpAttr> Ptr;
SdpAttr(){}
SdpAttr(const string &sdp){load(sdp);} SdpAttr() {}
~SdpAttr(){}
SdpAttr(const string &sdp) { load(sdp); }
~SdpAttr() {}
void load(const string &sdp); void load(const string &sdp);
bool available() const ;
bool available() const;
SdpTrack::Ptr getTrack(TrackType type) const; SdpTrack::Ptr getTrack(TrackType type) const;
vector<SdpTrack::Ptr> getAvailableTrack() const; vector<SdpTrack::Ptr> getAvailableTrack() const;
private: private:
map<string,SdpTrack::Ptr> _track_map; map<string, SdpTrack::Ptr> _track_map;
}; };
@ -96,19 +114,20 @@ public:
uint32_t timeStamp = 0; uint32_t timeStamp = 0;
}; };
string FindField(const char* buf, const char* start, const char *end,int bufSize = 0 ); string FindField(const char *buf, const char *start, const char *end, int bufSize = 0);
struct StrCaseCompare struct StrCaseCompare {
{ bool operator()(const string &__x, const string &__y) const { return strcasecmp(__x.data(), __y.data()) < 0; }
bool operator()(const string& __x, const string& __y) const
{return strcasecmp(__x.data(), __y.data()) < 0 ;}
}; };
typedef map<string,string,StrCaseCompare> StrCaseMap;
typedef map<string, string, StrCaseCompare> StrCaseMap;
class Parser { class Parser {
public: public:
Parser() {} Parser() {}
virtual ~Parser() {} virtual ~Parser() {}
void Parse(const char *buf) { void Parse(const char *buf) {
//解析 //解析
const char *start = buf; const char *start = buf;
@ -119,19 +138,19 @@ public:
break; break;
} }
if (start == buf) { if (start == buf) {
_strMethod = FindField(line.c_str(), NULL, " "); _strMethod = FindField(line.data(), NULL, " ");
_strFullUrl = FindField(line.c_str(), " ", " "); _strFullUrl = FindField(line.data(), " ", " ");
auto args_pos = _strFullUrl.find('?'); auto args_pos = _strFullUrl.find('?');
if(args_pos != string::npos){ if (args_pos != string::npos) {
_strUrl = _strFullUrl.substr(0,args_pos); _strUrl = _strFullUrl.substr(0, args_pos);
_mapUrlArgs = parseArgs(_strFullUrl.substr(args_pos + 1 )); _mapUrlArgs = parseArgs(_strFullUrl.substr(args_pos + 1));
}else{ } else {
_strUrl = _strFullUrl; _strUrl = _strFullUrl;
} }
_strTail = FindField(line.c_str(), (_strFullUrl + " ").c_str(), NULL); _strTail = FindField(line.data(), (_strFullUrl + " ").data(), NULL);
} else { } else {
auto field = FindField(line.c_str(), NULL, ": "); auto field = FindField(line.data(), NULL, ": ");
auto value = FindField(line.c_str(), ": ", NULL); auto value = FindField(line.data(), ": ", NULL);
if (field.size() != 0) { if (field.size() != 0) {
_mapHeaders[field] = value; _mapHeaders[field] = value;
} }
@ -143,23 +162,28 @@ public:
} }
} }
} }
const string& Method() const {
const string &Method() const {
//rtsp方法 //rtsp方法
return _strMethod; return _strMethod;
} }
const string& Url() const {
const string &Url() const {
//rtsp url //rtsp url
return _strUrl; return _strUrl;
} }
const string& FullUrl() const {
const string &FullUrl() const {
//rtsp url with args //rtsp url with args
return _strFullUrl; return _strFullUrl;
} }
const string& Tail() const {
const string &Tail() const {
//RTSP/1.0 //RTSP/1.0
return _strTail; return _strTail;
} }
const string& operator[](const char *name) const {
const string &operator[](const char *name) const {
//rtsp field //rtsp field
auto it = _mapHeaders.find(name); auto it = _mapHeaders.find(name);
if (it == _mapHeaders.end()) { if (it == _mapHeaders.end()) {
@ -167,9 +191,11 @@ public:
} }
return it->second; return it->second;
} }
const string& Content() const {
const string &Content() const {
return _strContent; return _strContent;
} }
void Clear() { void Clear() {
_strMethod.clear(); _strMethod.clear();
_strUrl.clear(); _strUrl.clear();
@ -180,26 +206,28 @@ public:
_mapUrlArgs.clear(); _mapUrlArgs.clear();
} }
void setUrl(const string& url) { void setUrl(const string &url) {
this->_strUrl = url; this->_strUrl = url;
} }
void setContent(const string& content) {
void setContent(const string &content) {
this->_strContent = content; this->_strContent = content;
} }
StrCaseMap& getValues() const { StrCaseMap &getValues() const {
return _mapHeaders; return _mapHeaders;
} }
StrCaseMap& getUrlArgs() const {
StrCaseMap &getUrlArgs() const {
return _mapUrlArgs; return _mapUrlArgs;
} }
static StrCaseMap parseArgs(const string &str,const char *pair_delim = "&", const char *key_delim = "="){ static StrCaseMap parseArgs(const string &str, const char *pair_delim = "&", const char *key_delim = "=") {
StrCaseMap ret; StrCaseMap ret;
auto arg_vec = split(str, pair_delim); auto arg_vec = split(str, pair_delim);
for (string &key_val : arg_vec) { for (string &key_val : arg_vec) {
auto key = FindField(key_val.data(),NULL,key_delim); auto key = FindField(key_val.data(), NULL, key_delim);
auto val = FindField(key_val.data(),key_delim, NULL); auto val = FindField(key_val.data(), key_delim, NULL);
ret[key] = val; ret[key] = val;
} }
return ret; return ret;
@ -216,6 +244,6 @@ private:
mutable StrCaseMap _mapUrlArgs; mutable StrCaseMap _mapUrlArgs;
}; };
} //namespace mediakit
#endif //RTSP_RTSP_H_ #endif //RTSP_RTSP_H_

View File

@ -42,9 +42,6 @@ using namespace toolkit;
namespace mediakit { namespace mediakit {
const char kRtspMd5Nonce[] = "rtsp_md5_nonce";
const char kRtspRealm[] = "rtsp_realm";
RtspPlayer::RtspPlayer(void){ RtspPlayer::RtspPlayer(void){
RtpReceiver::setPoolSize(64); RtpReceiver::setPoolSize(64);
} }
@ -57,8 +54,8 @@ void RtspPlayer::teardown(){
shutdown(); shutdown();
} }
erase(kRtspMd5Nonce); _rtspMd5Nonce.clear();
erase(kRtspRealm); _rtspRealm.clear();
_aTrackInfo.clear(); _aTrackInfo.clear();
_strSession.clear(); _strSession.clear();
@ -81,58 +78,55 @@ void RtspPlayer::teardown(){
_onHandshake = nullptr; _onHandshake = nullptr;
} }
void RtspPlayer::play(const char* strUrl){ void RtspPlayer::play(const string &strUrl){
auto userAndPwd = FindField(strUrl,"://","@"); auto userAndPwd = FindField(strUrl.data(),"://","@");
eRtpType eType = (eRtpType)(int)(*this)[kRtpType]; Rtsp::eRtpType eType = (Rtsp::eRtpType)(int)(*this)[kRtpType];
if(userAndPwd.empty()){ if(userAndPwd.empty()){
play(strUrl,nullptr,nullptr,eType); play(strUrl,"","",eType);
return; return;
} }
auto suffix = FindField(strUrl,"@",nullptr); auto suffix = FindField(strUrl.data(),"@",nullptr);
auto url = StrPrinter << "rtsp://" << suffix << endl; auto url = StrPrinter << "rtsp://" << suffix << endl;
if(userAndPwd.find(":") == string::npos){ if(userAndPwd.find(":") == string::npos){
play(url.data(),userAndPwd.data(),nullptr,eType); play(url,userAndPwd,"",eType);
return; return;
} }
auto user = FindField(userAndPwd.data(),nullptr,":"); auto user = FindField(userAndPwd.data(),nullptr,":");
auto pwd = FindField(userAndPwd.data(),":",nullptr); auto pwd = FindField(userAndPwd.data(),":",nullptr);
play(url.data(),user.data(),pwd.data(),eType); play(url,user,pwd,eType);
} }
//播放指定是否走rtp over tcp //播放指定是否走rtp over tcp
void RtspPlayer::play(const char* strUrl, const char *strUser, const char *strPwd, eRtpType eType ) { void RtspPlayer::play(const string &strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType ) {
DebugL << strUrl << " " DebugL << strUrl << " "
<< (strUser ? strUser : "null") << " " << (strUser.size() ? strUser : "null") << " "
<< (strPwd ? strPwd:"null") << " " << (strPwd.size() ? strPwd:"null") << " "
<< eType; << eType;
teardown(); teardown();
if(strUser){ if(strUser.size()){
(*this)[kRtspUser] = strUser; (*this)[kRtspUser] = strUser;
} }
if(strPwd){ if(strPwd.size()){
(*this)[kRtspPwd] = strPwd; (*this)[kRtspPwd] = strPwd;
(*this)[kRtspPwdIsMD5] = false; (*this)[kRtspPwdIsMD5] = false;
} }
_eType = eType; _eType = eType;
auto ip = FindField(strUrl, "://", "/"); auto ip = FindField(strUrl.data(), "://", "/");
if (!ip.size()) { if (!ip.size()) {
ip = FindField(strUrl, "://", NULL); ip = FindField(strUrl.data(), "://", NULL);
} }
auto port = atoi(FindField(ip.c_str(), ":", NULL).c_str()); auto port = atoi(FindField(ip.data(), ":", NULL).data());
if (port <= 0) { if (port <= 0) {
//rtsp 默认端口554 //rtsp 默认端口554
port = 554; port = 554;
} else { } else {
//服务器域名 //服务器域名
ip = FindField(ip.c_str(), NULL, ":"); ip = FindField(ip.data(), NULL, ":");
} }
_strUrl = strUrl; _strUrl = strUrl;
if(!(*this)[PlayerBase::kNetAdapter].empty()){
setNetAdapter((*this)[PlayerBase::kNetAdapter]);
}
weak_ptr<RtspPlayer> weakSelf = dynamic_pointer_cast<RtspPlayer>(shared_from_this()); weak_ptr<RtspPlayer> weakSelf = dynamic_pointer_cast<RtspPlayer>(shared_from_this());
float playTimeOutSec = (*this)[kPlayTimeoutMS].as<int>() / 1000.0; float playTimeOutSec = (*this)[kPlayTimeoutMS].as<int>() / 1000.0;
@ -146,7 +140,10 @@ void RtspPlayer::play(const char* strUrl, const char *strUser, const char *strPw
return false; return false;
},getPoller())); },getPoller()));
startConnect(ip.data(), port , playTimeOutSec); if(!(*this)[PlayerBase::kNetAdapter].empty()){
setNetAdapter((*this)[PlayerBase::kNetAdapter]);
}
startConnect(ip, port , playTimeOutSec);
} }
void RtspPlayer::onConnect(const SockException &err){ void RtspPlayer::onConnect(const SockException &err){
if(err.getErrCode()!=Err_success) { if(err.getErrCode()!=Err_success) {
@ -166,7 +163,7 @@ void RtspPlayer::onErr(const SockException &ex) {
} }
// from live555 // from live555
bool RtspPlayer::handleAuthenticationFailure(const string &paramsStr) { bool RtspPlayer::handleAuthenticationFailure(const string &paramsStr) {
if(!(*this)[kRtspRealm].empty()){ if(!_rtspRealm.empty()){
//已经认证过了 //已经认证过了
return false; return false;
} }
@ -181,17 +178,17 @@ bool RtspPlayer::handleAuthenticationFailure(const string &paramsStr) {
}); });
if (sscanf(paramsStr.data(), "Digest realm=\"%[^\"]\", nonce=\"%[^\"]\", stale=%[a-zA-Z]", realm, nonce, stale) == 3) { if (sscanf(paramsStr.data(), "Digest realm=\"%[^\"]\", nonce=\"%[^\"]\", stale=%[a-zA-Z]", realm, nonce, stale) == 3) {
(*this)[kRtspRealm] = (const char *)realm; _rtspRealm = (const char *)realm;
(*this)[kRtspMd5Nonce] = (const char *)nonce; _rtspMd5Nonce = (const char *)nonce;
return true; return true;
} }
if (sscanf(paramsStr.data(), "Digest realm=\"%[^\"]\", nonce=\"%[^\"]\"", realm, nonce) == 2) { if (sscanf(paramsStr.data(), "Digest realm=\"%[^\"]\", nonce=\"%[^\"]\"", realm, nonce) == 2) {
(*this)[kRtspRealm] = (const char *)realm; _rtspRealm = (const char *)realm;
(*this)[kRtspMd5Nonce] = (const char *)nonce; _rtspMd5Nonce = (const char *)nonce;
return true; return true;
} }
if (sscanf(paramsStr.data(), "Basic realm=\"%[^\"]\"", realm) == 1) { if (sscanf(paramsStr.data(), "Basic realm=\"%[^\"]\"", realm) == 1) {
(*this)[kRtspRealm] = (const char *)realm; _rtspRealm = (const char *)realm;
return true; return true;
} }
return false; return false;
@ -208,7 +205,7 @@ void RtspPlayer::handleResDESCRIBE(const Parser& parser) {
if(newUrl.empty()){ if(newUrl.empty()){
throw std::runtime_error("未找到Location字段(跳转url)"); throw std::runtime_error("未找到Location字段(跳转url)");
} }
play(newUrl.data()); play(newUrl);
return; return;
} }
if (parser.Url() != "200") { if (parser.Url() != "200") {
@ -244,15 +241,13 @@ bool RtspPlayer::sendSetup(unsigned int trackIndex) {
auto &track = _aTrackInfo[trackIndex]; auto &track = _aTrackInfo[trackIndex];
auto baseUrl = _strContentBase + "/" + track->_control_surffix; auto baseUrl = _strContentBase + "/" + track->_control_surffix;
switch (_eType) { switch (_eType) {
case RTP_TCP: { case Rtsp::RTP_TCP: {
return sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP/TCP;unicast;interleaved=" << track->_type * 2 << "-" << track->_type * 2 + 1}); return sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP/TCP;unicast;interleaved=" << track->_type * 2 << "-" << track->_type * 2 + 1});
} }
break; case Rtsp::RTP_MULTICAST: {
case RTP_MULTICAST: {
return sendRtspRequest("SETUP",baseUrl,{"Transport","Transport: RTP/AVP;multicast"}); return sendRtspRequest("SETUP",baseUrl,{"Transport","Transport: RTP/AVP;multicast"});
} }
break; case Rtsp::RTP_UDP: {
case RTP_UDP: {
_apUdpSock[trackIndex].reset(new Socket()); _apUdpSock[trackIndex].reset(new Socket());
if (!_apUdpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) { if (!_apUdpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) {
_apUdpSock[trackIndex].reset(); _apUdpSock[trackIndex].reset();
@ -261,10 +256,8 @@ bool RtspPlayer::sendSetup(unsigned int trackIndex) {
int port = _apUdpSock[trackIndex]->get_local_port(); int port = _apUdpSock[trackIndex]->get_local_port();
return sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP;unicast;client_port=" << port << "-" << port + 1}); return sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP;unicast;client_port=" << port << "-" << port + 1});
} }
break;
default: default:
return false; return false;
break;
} }
} }
@ -281,29 +274,29 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
auto strTransport = parser["Transport"]; auto strTransport = parser["Transport"];
if(strTransport.find("TCP") != string::npos){ if(strTransport.find("TCP") != string::npos){
_eType = RTP_TCP; _eType = Rtsp::RTP_TCP;
}else if(strTransport.find("multicast") != string::npos){ }else if(strTransport.find("multicast") != string::npos){
_eType = RTP_MULTICAST; _eType = Rtsp::RTP_MULTICAST;
}else{ }else{
_eType = RTP_UDP; _eType = Rtsp::RTP_UDP;
} }
RtspSplitter::enableRecvRtp(_eType == RTP_TCP); RtspSplitter::enableRecvRtp(_eType == Rtsp::RTP_TCP);
if(_eType == RTP_TCP) { if(_eType == Rtsp::RTP_TCP) {
string interleaved = FindField( FindField((strTransport + ";").c_str(), "interleaved=", ";").c_str(), NULL, "-"); string interleaved = FindField( FindField((strTransport + ";").data(), "interleaved=", ";").data(), NULL, "-");
_aTrackInfo[uiTrackIndex]->_interleaved = atoi(interleaved.c_str()); _aTrackInfo[uiTrackIndex]->_interleaved = atoi(interleaved.data());
}else{ }else{
const char *strPos = (_eType == RTP_MULTICAST ? "port=" : "server_port=") ; const char *strPos = (_eType == Rtsp::RTP_MULTICAST ? "port=" : "server_port=") ;
auto port_str = FindField((strTransport + ";").c_str(), strPos, ";"); auto port_str = FindField((strTransport + ";").data(), strPos, ";");
uint16_t port = atoi(FindField(port_str.c_str(), NULL, "-").c_str()); uint16_t port = atoi(FindField(port_str.data(), NULL, "-").data());
auto &pUdpSockRef = _apUdpSock[uiTrackIndex]; auto &pUdpSockRef = _apUdpSock[uiTrackIndex];
if(!pUdpSockRef){ if(!pUdpSockRef){
pUdpSockRef.reset(new Socket()); pUdpSockRef.reset(new Socket());
} }
if (_eType == RTP_MULTICAST) { if (_eType == Rtsp::RTP_MULTICAST) {
auto multiAddr = FindField((strTransport + ";").c_str(), "destination=", ";"); auto multiAddr = FindField((strTransport + ";").data(), "destination=", ";");
if (!pUdpSockRef->bindUdpSock(port, "0.0.0.0")) { if (!pUdpSockRef->bindUdpSock(port, "0.0.0.0")) {
pUdpSockRef.reset(); pUdpSockRef.reset();
throw std::runtime_error("open udp sock err"); throw std::runtime_error("open udp sock err");
@ -316,7 +309,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
struct sockaddr_in rtpto; struct sockaddr_in rtpto;
rtpto.sin_port = ntohs(port); rtpto.sin_port = ntohs(port);
rtpto.sin_family = AF_INET; rtpto.sin_family = AF_INET;
rtpto.sin_addr.s_addr = inet_addr(get_peer_ip().c_str()); rtpto.sin_addr.s_addr = inet_addr(get_peer_ip().data());
pUdpSockRef->setSendPeerAddr((struct sockaddr *)&(rtpto)); pUdpSockRef->setSendPeerAddr((struct sockaddr *)&(rtpto));
pUdpSockRef->send("\xce\xfa\xed\xfe", 4); pUdpSockRef->send("\xce\xfa\xed\xfe", 4);
} }
@ -328,7 +321,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
return; return;
} }
for (unsigned int i = 0; i < _aTrackInfo.size() && _eType != RTP_TCP; i++) { for (unsigned int i = 0; i < _aTrackInfo.size() && _eType != Rtsp::RTP_TCP; i++) {
auto &pUdpSockRef = _apUdpSock[i]; auto &pUdpSockRef = _apUdpSock[i];
if(!pUdpSockRef){ if(!pUdpSockRef){
continue; continue;
@ -348,6 +341,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
}); });
} }
/////////////////////////心跳///////////////////////////////// /////////////////////////心跳/////////////////////////////////
if(_eType != Rtsp::RTP_TCP){
weak_ptr<RtspPlayer> weakSelf = dynamic_pointer_cast<RtspPlayer>(shared_from_this()); weak_ptr<RtspPlayer> weakSelf = dynamic_pointer_cast<RtspPlayer>(shared_from_this());
_pBeatTimer.reset(new Timer((*this)[kBeatIntervalMS].as<int>() / 1000.0, [weakSelf](){ _pBeatTimer.reset(new Timer((*this)[kBeatIntervalMS].as<int>() / 1000.0, [weakSelf](){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
@ -356,13 +350,14 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
} }
return strongSelf->sendOptions(); return strongSelf->sendOptions();
},getPoller())); },getPoller()));
}
pause(false); pause(false);
} }
bool RtspPlayer::sendOptions() { bool RtspPlayer::sendOptions() {
_onHandshake = [](const Parser& parser){ _onHandshake = [](const Parser& parser){
// DebugL << "options response"; // DebugL << "options response";
return true;
}; };
return sendRtspRequest("OPTIONS",_strContentBase); return sendRtspRequest("OPTIONS",_strContentBase);
} }
@ -530,12 +525,14 @@ bool RtspPlayer::sendRtspRequest(const string &cmd, const string &url, const std
bool RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrCaseMap &header_const) { bool RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrCaseMap &header_const) {
auto header = header_const; auto header = header_const;
header.emplace("CSeq",StrPrinter << _uiCseq++); header.emplace("CSeq",StrPrinter << _uiCseq++);
header.emplace("User-Agent",SERVER_NAME "(build in " __DATE__ " " __TIME__ ")");
if(!_strSession.empty()){ if(!_strSession.empty()){
header.emplace("Session",_strSession); header.emplace("Session",_strSession);
} }
if(!(*this)[kRtspRealm].empty() && !(*this)[PlayerBase::kRtspUser].empty()){ if(!_rtspRealm.empty() && !(*this)[PlayerBase::kRtspUser].empty()){
if(!(*this)[kRtspMd5Nonce].empty()){ if(!_rtspMd5Nonce.empty()){
//MD5认证 //MD5认证
/* /*
response计算方法如下 response计算方法如下
@ -547,14 +544,14 @@ bool RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrC
*/ */
string encrypted_pwd = (*this)[PlayerBase::kRtspPwd]; string encrypted_pwd = (*this)[PlayerBase::kRtspPwd];
if(!(*this)[PlayerBase::kRtspPwdIsMD5].as<bool>()){ if(!(*this)[PlayerBase::kRtspPwdIsMD5].as<bool>()){
encrypted_pwd = MD5((*this)[PlayerBase::kRtspUser]+ ":" + (*this)[kRtspRealm] + ":" + encrypted_pwd).hexdigest(); encrypted_pwd = MD5((*this)[PlayerBase::kRtspUser]+ ":" + _rtspRealm + ":" + encrypted_pwd).hexdigest();
} }
auto response = MD5( encrypted_pwd + ":" + (*this)[kRtspMd5Nonce] + ":" + MD5(cmd + ":" + url).hexdigest()).hexdigest(); auto response = MD5( encrypted_pwd + ":" + _rtspMd5Nonce + ":" + MD5(cmd + ":" + url).hexdigest()).hexdigest();
_StrPrinter printer; _StrPrinter printer;
printer << "Digest "; printer << "Digest ";
printer << "username=\"" << (*this)[PlayerBase::kRtspUser] << "\", "; printer << "username=\"" << (*this)[PlayerBase::kRtspUser] << "\", ";
printer << "realm=\"" << (*this)[kRtspRealm] << "\", "; printer << "realm=\"" << _rtspRealm << "\", ";
printer << "nonce=\"" << (*this)[kRtspMd5Nonce] << "\", "; printer << "nonce=\"" << _rtspMd5Nonce << "\", ";
printer << "uri=\"" << url << "\", "; printer << "uri=\"" << url << "\", ";
printer << "response=\"" << response << "\""; printer << "response=\"" << response << "\"";
header.emplace("Authorization",printer); header.emplace("Authorization",printer);

View File

@ -54,7 +54,7 @@ public:
RtspPlayer(); RtspPlayer();
virtual ~RtspPlayer(void); virtual ~RtspPlayer(void);
void play(const char* strUrl) override; void play(const string &strUrl) override;
void pause(bool bPause) override; void pause(bool bPause) override;
void teardown() override; void teardown() override;
float getPacketLossRate(TrackType type) const override; float getPacketLossRate(TrackType type) const override;
@ -93,7 +93,7 @@ private:
int getTrackIndexByInterleaved(int interleaved) const; int getTrackIndexByInterleaved(int interleaved) const;
int getTrackIndexByTrackType(TrackType trackId) const; int getTrackIndexByTrackType(TrackType trackId) const;
void play(const char* strUrl, const char *strUser, const char *strPwd, eRtpType eType); void play(const string &strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType);
void onConnect(const SockException &err) override; void onConnect(const SockException &err) override;
void onRecv(const Buffer::Ptr &pBuf) override; void onRecv(const Buffer::Ptr &pBuf) override;
void onErr(const SockException &ex) override; void onErr(const SockException &ex) override;
@ -115,14 +115,16 @@ private:
string _strUrl; string _strUrl;
SdpAttr _sdpAttr; SdpAttr _sdpAttr;
vector<SdpTrack::Ptr> _aTrackInfo; vector<SdpTrack::Ptr> _aTrackInfo;
function<void(const Parser&)> _onHandshake; function<void(const Parser&)> _onHandshake;
Socket::Ptr _apUdpSock[2]; Socket::Ptr _apUdpSock[2];
//rtsp鉴权相关
string _rtspMd5Nonce;
string _rtspRealm;
//rtsp info //rtsp info
string _strSession; string _strSession;
unsigned int _uiCseq = 1; unsigned int _uiCseq = 1;
string _strContentBase; string _strContentBase;
eRtpType _eType = RTP_TCP; Rtsp::eRtpType _eType = Rtsp::RTP_TCP;
/* 丢包率统计需要用到的参数 */ /* 丢包率统计需要用到的参数 */
uint16_t _aui16FirstSeq[2] = { 0 , 0}; uint16_t _aui16FirstSeq[2] = { 0 , 0};

450
src/Rtsp/RtspPusher.cpp Normal file
View File

@ -0,0 +1,450 @@
//
// Created by xzl on 2019/3/27.
//
#include "Util/MD5.h"
#include "Util/base64.h"
#include "RtspPusher.h"
#include "RtspSession.h"
namespace mediakit {
static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE;
RtspPusher::RtspPusher(const RtspMediaSource::Ptr &src) {
_pMediaSrc = src;
}
RtspPusher::~RtspPusher() {
teardown();
DebugL << endl;
}
void RtspPusher::teardown() {
if (alive()) {
sendRtspRequest("TEARDOWN" ,_strContentBase);
shutdown();
}
reset();
CLEAR_ARR(_apUdpSock);
_rtspMd5Nonce.clear();
_rtspRealm.clear();
_aTrackInfo.clear();
_strSession.clear();
_strContentBase.clear();
_strSession.clear();
_uiCseq = 1;
_pPublishTimer.reset();
_pBeatTimer.reset();
_pRtspReader.reset();
_aTrackInfo.clear();
_onHandshake = nullptr;
}
void RtspPusher::publish(const string &strUrl) {
auto userAndPwd = FindField(strUrl.data(),"://","@");
Rtsp::eRtpType eType = (Rtsp::eRtpType)(int)(*this)[ PlayerBase::kRtpType];
if(userAndPwd.empty()){
publish(strUrl,"","",eType);
return;
}
auto suffix = FindField(strUrl.data(),"@",nullptr);
auto url = StrPrinter << "rtsp://" << suffix << endl;
if(userAndPwd.find(":") == string::npos){
publish(url,userAndPwd,"",eType);
return;
}
auto user = FindField(userAndPwd.data(),nullptr,":");
auto pwd = FindField(userAndPwd.data(),":",nullptr);
publish(url,user,pwd,eType);
}
void RtspPusher::publish(const string & strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType ) {
DebugL << strUrl << " "
<< (strUser.size() ? strUser : "null") << " "
<< (strPwd.size() ? strPwd:"null") << " "
<< eType;
teardown();
if(strUser.size()){
(*this)[PlayerBase::kRtspUser] = strUser;
}
if(strPwd.size()){
(*this)[PlayerBase::kRtspPwd] = strPwd;
(*this)[PlayerBase::kRtspPwdIsMD5] = false;
}
_eType = eType;
auto ip = FindField(strUrl.data(), "://", "/");
if (!ip.size()) {
ip = FindField(strUrl.data(), "://", NULL);
}
auto port = atoi(FindField(ip.data(), ":", NULL).data());
if (port <= 0) {
//rtsp 默认端口554
port = 554;
} else {
//服务器域名
ip = FindField(ip.data(), NULL, ":");
}
_strUrl = strUrl;
weak_ptr<RtspPusher> weakSelf = dynamic_pointer_cast<RtspPusher>(shared_from_this());
float playTimeOutSec = (*this)[kPlayTimeoutMS].as<int>() / 1000.0;
_pPublishTimer.reset( new Timer(playTimeOutSec, [weakSelf]() {
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
return false;
}
strongSelf->onPublishResult(SockException(Err_timeout,"publish rtsp timeout"));
strongSelf->teardown();
return false;
},getPoller()));
if(!(*this)[kNetAdapter].empty()){
setNetAdapter((*this)[kNetAdapter]);
}
startConnect(ip, port , playTimeOutSec);
}
void RtspPusher::onErr(const SockException &ex) {
onShutdown(ex);
}
void RtspPusher::onConnect(const SockException &err) {
if(err) {
onPublishResult(err);
return;
}
sendAnnounce();
}
void RtspPusher::onRecv(const Buffer::Ptr &pBuf){
try {
input(pBuf->data(), pBuf->size());
} catch (exception &e) {
SockException ex(Err_other, e.what());
onPublishResult(ex);
onShutdown(ex);
teardown();
}
}
void RtspPusher::onWholeRtspPacket(Parser &parser) {
decltype(_onHandshake) fun;
_onHandshake.swap(fun);
if(fun){
fun(parser);
}
parser.Clear();
}
bool RtspPusher::sendAnnounce() {
auto src = _pMediaSrc.lock();
if (!src) {
throw std::runtime_error("the media source was released");
}
//解析sdp
_sdpAttr.load(src->getSdp());
_aTrackInfo = _sdpAttr.getAvailableTrack();
if (_aTrackInfo.empty()) {
throw std::runtime_error("无有效的Sdp Track");
}
_onHandshake = std::bind(&RtspPusher::handleResAnnounce,this, placeholders::_1);
return sendRtspRequest("ANNOUNCE",_strUrl,{},src->getSdp());
}
void RtspPusher::handleResAnnounce(const Parser &parser) {
string authInfo = parser["WWW-Authenticate"];
//发送DESCRIBE命令后的回复
if ((parser.Url() == "401") && handleAuthenticationFailure(authInfo)) {
sendAnnounce();
return;
}
if(parser.Url() == "302"){
auto newUrl = parser["Location"];
if(newUrl.empty()){
throw std::runtime_error("未找到Location字段(跳转url)");
}
publish(newUrl);
return;
}
if (parser.Url() != "200") {
throw std::runtime_error(StrPrinter << "ANNOUNCE:" << parser.Url() << " " << parser.Tail());
}
_strContentBase = parser["Content-Base"];
if(_strContentBase.empty()){
_strContentBase = _strUrl;
}
if (_strContentBase.back() == '/') {
_strContentBase.pop_back();
}
sendSetup(0);
}
bool RtspPusher::handleAuthenticationFailure(const string &paramsStr) {
if(!_rtspRealm.empty()){
//已经认证过了
return false;
}
char *realm = new char[paramsStr.size()];
char *nonce = new char[paramsStr.size()];
char *stale = new char[paramsStr.size()];
onceToken token(nullptr,[&](){
delete[] realm;
delete[] nonce;
delete[] stale;
});
if (sscanf(paramsStr.data(), "Digest realm=\"%[^\"]\", nonce=\"%[^\"]\", stale=%[a-zA-Z]", realm, nonce, stale) == 3) {
_rtspRealm = (const char *)realm;
_rtspMd5Nonce = (const char *)nonce;
return true;
}
if (sscanf(paramsStr.data(), "Digest realm=\"%[^\"]\", nonce=\"%[^\"]\"", realm, nonce) == 2) {
_rtspRealm = (const char *)realm;
_rtspMd5Nonce = (const char *)nonce;
return true;
}
if (sscanf(paramsStr.data(), "Basic realm=\"%[^\"]\"", realm) == 1) {
_rtspRealm = (const char *)realm;
return true;
}
return false;
}
bool RtspPusher::sendSetup(unsigned int trackIndex) {
_onHandshake = std::bind(&RtspPusher::handleResSetup,this, placeholders::_1,trackIndex);
auto &track = _aTrackInfo[trackIndex];
auto baseUrl = _strContentBase + "/" + track->_control_surffix;
switch (_eType) {
case Rtsp::RTP_TCP: {
return sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP/TCP;unicast;interleaved=" << track->_type * 2 << "-" << track->_type * 2 + 1});
}
case Rtsp::RTP_UDP: {
_apUdpSock[trackIndex].reset(new Socket());
if (!_apUdpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) {
_apUdpSock[trackIndex].reset();
throw std::runtime_error("open udp sock err");
}
int port = _apUdpSock[trackIndex]->get_local_port();
return sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP;unicast;client_port=" << port << "-" << port + 1});
}
default:
return false;
}
}
void RtspPusher::handleResSetup(const Parser &parser, unsigned int uiTrackIndex) {
if (parser.Url() != "200") {
throw std::runtime_error(
StrPrinter << "SETUP:" << parser.Url() << " " << parser.Tail() << endl);
}
if (uiTrackIndex == 0) {
_strSession = parser["Session"];
_strSession.append(";");
_strSession = FindField(_strSession.data(), nullptr, ";");
}
auto strTransport = parser["Transport"];
if(strTransport.find("TCP") != string::npos){
_eType = Rtsp::RTP_TCP;
string interleaved = FindField( FindField((strTransport + ";").data(), "interleaved=", ";").data(), NULL, "-");
_aTrackInfo[uiTrackIndex]->_interleaved = atoi(interleaved.data());
}else if(strTransport.find("multicast") != string::npos){
throw std::runtime_error("SETUP rtsp pusher can not support multicast!");
}else{
_eType = Rtsp::RTP_UDP;
const char *strPos = "server_port=" ;
auto port_str = FindField((strTransport + ";").data(), strPos, ";");
uint16_t port = atoi(FindField(port_str.data(), NULL, "-").data());
auto &pUdpSockRef = _apUdpSock[uiTrackIndex];
if(!pUdpSockRef){
pUdpSockRef.reset(new Socket());
}
struct sockaddr_in rtpto;
rtpto.sin_port = ntohs(port);
rtpto.sin_family = AF_INET;
rtpto.sin_addr.s_addr = inet_addr(get_peer_ip().data());
pUdpSockRef->setSendPeerAddr((struct sockaddr *)&(rtpto));
}
RtspSplitter::enableRecvRtp(_eType == Rtsp::RTP_TCP);
if (uiTrackIndex < _aTrackInfo.size() - 1) {
//需要继续发送SETUP命令
sendSetup(uiTrackIndex + 1);
return;
}
sendRecord();
}
bool RtspPusher::sendOptions() {
_onHandshake = [this](const Parser& parser){};
return sendRtspRequest("OPTIONS",_strContentBase);
}
inline void RtspPusher::sendRtpPacket(const RtpPacket::Ptr & pkt) {
//InfoL<<(int)pkt.Interleaved;
switch (_eType) {
case Rtsp::RTP_TCP: {
BufferRtp::Ptr buffer(new BufferRtp(pkt));
send(buffer);
}
break;
case Rtsp::RTP_UDP: {
int iTrackIndex = getTrackIndexByTrackType(pkt->type);
auto &pSock = _apUdpSock[iTrackIndex];
if (!pSock) {
shutdown();
return;
}
BufferRtp::Ptr buffer(new BufferRtp(pkt,4));
pSock->send(buffer);
}
break;
default:
break;
}
}
inline int RtspPusher::getTrackIndexByTrackType(TrackType type) {
for (unsigned int i = 0; i < _aTrackInfo.size(); i++) {
if (type == _aTrackInfo[i]->_type) {
return i;
}
}
return -1;
}
bool RtspPusher::sendRecord() {
_onHandshake = [this](const Parser& parser){
auto src = _pMediaSrc.lock();
if (!src) {
throw std::runtime_error("the media source was released");
}
_pRtspReader = src->getRing()->attach(getPoller());
weak_ptr<RtspPusher> weakSelf = dynamic_pointer_cast<RtspPusher>(shared_from_this());
_pRtspReader->setReadCB([weakSelf](const RtpPacket::Ptr &pkt){
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
}
strongSelf->sendRtpPacket(pkt);
});
_pRtspReader->setDetachCB([weakSelf](){
auto strongSelf = weakSelf.lock();
if(strongSelf){
strongSelf->onShutdown(SockException(Err_other,"媒体源被释放"));
strongSelf->teardown();
}
});
if(_eType != Rtsp::RTP_TCP){
/////////////////////////心跳/////////////////////////////////
weak_ptr<RtspPusher> weakSelf = dynamic_pointer_cast<RtspPusher>(shared_from_this());
_pBeatTimer.reset(new Timer((*this)[kBeatIntervalMS].as<int>() / 1000.0, [weakSelf](){
auto strongSelf = weakSelf.lock();
if (!strongSelf){
return false;
}
return strongSelf->sendOptions();
},getPoller()));
}
onPublishResult(SockException(Err_success,"success"));
//提高发送性能
(*this) << SocketFlags(kSockFlags);
SockUtil::setNoDelay(_sock->rawFD(),false);
};
return sendRtspRequest("RECORD",_strContentBase,{"Range","npt=0.000-"});
}
bool RtspPusher::sendRtspRequest(const string &cmd, const string &url, const std::initializer_list<string> &header,const string &sdp ) {
string key;
StrCaseMap header_map;
int i = 0;
for(auto &val : header){
if(++i % 2 == 0){
header_map.emplace(key,val);
}else{
key = val;
}
}
return sendRtspRequest(cmd,url,header_map,sdp);
}
bool RtspPusher::sendRtspRequest(const string &cmd, const string &url,const StrCaseMap &header_const,const string &sdp ) {
auto header = header_const;
header.emplace("CSeq",StrPrinter << _uiCseq++);
header.emplace("User-Agent",SERVER_NAME "(build in " __DATE__ " " __TIME__ ")");
if(!_strSession.empty()){
header.emplace("Session",_strSession);
}
if(!_rtspRealm.empty() && !(*this)[kRtspUser].empty()){
if(!_rtspMd5Nonce.empty()){
//MD5认证
/*
response计算方法如下
RTSP客户端应该使用username + password并计算response如下:
(1)password为MD5编码,
response = md5( password:nonce:md5(public_method:url) );
(2)password为ANSI字符串,
response= md5( md5(username:realm:password):nonce:md5(public_method:url) );
*/
string encrypted_pwd = (*this)[kRtspPwd];
if(!(*this)[kRtspPwdIsMD5].as<bool>()){
encrypted_pwd = MD5((*this)[kRtspUser]+ ":" + _rtspRealm + ":" + encrypted_pwd).hexdigest();
}
auto response = MD5( encrypted_pwd + ":" + _rtspMd5Nonce + ":" + MD5(cmd + ":" + url).hexdigest()).hexdigest();
_StrPrinter printer;
printer << "Digest ";
printer << "username=\"" << (*this)[kRtspUser] << "\", ";
printer << "realm=\"" << _rtspRealm << "\", ";
printer << "nonce=\"" << _rtspMd5Nonce << "\", ";
printer << "uri=\"" << url << "\", ";
printer << "response=\"" << response << "\"";
header.emplace("Authorization",printer);
}else if(!(*this)[kRtspPwdIsMD5].as<bool>()){
//base64认证
string authStr = StrPrinter << (*this)[kRtspUser] << ":" << (*this)[kRtspPwd];
char authStrBase64[1024] = {0};
av_base64_encode(authStrBase64,sizeof(authStrBase64),(uint8_t *)authStr.data(),authStr.size());
header.emplace("Authorization",StrPrinter << "Basic " << authStrBase64 );
}
}
if(!sdp.empty()){
header.emplace("Content-Length",StrPrinter << sdp.size());
header.emplace("Content-Type","application/sdp");
}
_StrPrinter printer;
printer << cmd << " " << url << " RTSP/1.0\r\n";
for (auto &pr : header){
printer << pr.first << ": " << pr.second << "\r\n";
}
printer << "\r\n";
if(!sdp.empty()){
printer << sdp;
}
return send(printer) > 0;
}
} /* namespace mediakit */

112
src/Rtsp/RtspPusher.h Normal file
View File

@ -0,0 +1,112 @@
//
// Created by xzl on 2019/3/27.
//
#ifndef ZLMEDIAKIT_RTSPPUSHER_H
#define ZLMEDIAKIT_RTSPPUSHER_H
#include <string>
#include <memory>
#include "Rtsp.h"
#include "RtspMediaSource.h"
#include "Util/util.h"
#include "Util/logger.h"
#include "Poller/Timer.h"
#include "Network/Socket.h"
#include "Network/TcpClient.h"
#include "RtspSplitter.h"
#include "Pusher/PusherBase.h"
using namespace std;
using namespace toolkit;
namespace mediakit {
class RtspPusher : public TcpClient, public RtspSplitter, public PusherBase {
public:
typedef std::shared_ptr<RtspPusher> Ptr;
RtspPusher(const RtspMediaSource::Ptr &src);
virtual ~RtspPusher();
void publish(const string &strUrl) override;
void teardown() override;
void setOnPublished(const Event &cb) override {
_onPublished = cb;
}
void setOnShutdown(const Event & cb) override{
_onShutdown = cb;
}
protected:
//for Tcpclient override
void onRecv(const Buffer::Ptr &pBuf) override;
void onConnect(const SockException &err) override;
void onErr(const SockException &ex) override;
//RtspSplitter override
void onWholeRtspPacket(Parser &parser) override ;
void onRtpPacket(const char *data,uint64_t len) override {};
private:
void publish(const string &strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType );
void onShutdown(const SockException &ex) {
_pPublishTimer.reset();
if(_onShutdown){
_onShutdown(ex);
}
_pRtspReader.reset();
}
void onPublishResult(const SockException &ex) {
_pPublishTimer.reset();
if(_onPublished){
_onPublished(ex);
}
}
bool sendAnnounce();
bool sendSetup(unsigned int uiTrackIndex);
bool sendRecord();
bool sendOptions();
void handleResAnnounce(const Parser &parser);
void handleResSetup(const Parser &parser, unsigned int uiTrackIndex);
bool handleAuthenticationFailure(const string &paramsStr);
inline int getTrackIndexByTrackType(TrackType type);
void sendRtpPacket(const RtpPacket::Ptr & pkt) ;
bool sendRtspRequest(const string &cmd, const string &url ,const StrCaseMap &header = StrCaseMap(),const string &sdp = "" );
bool sendRtspRequest(const string &cmd, const string &url ,const std::initializer_list<string> &header,const string &sdp = "");
private:
//rtsp鉴权相关
string _rtspMd5Nonce;
string _rtspRealm;
//超时功能实现
std::shared_ptr<Timer> _pPublishTimer;
//源
std::weak_ptr<RtspMediaSource> _pMediaSrc;
RtspMediaSource::RingType::RingReader::Ptr _pRtspReader;
//事件监听
Event _onShutdown;
Event _onPublished;
string _strUrl;
SdpAttr _sdpAttr;
vector<SdpTrack::Ptr> _aTrackInfo;
string _strSession;
unsigned int _uiCseq = 1;
string _strContentBase;
Rtsp::eRtpType _eType = Rtsp::RTP_TCP;
Socket::Ptr _apUdpSock[2];
function<void(const Parser&)> _onHandshake;
//心跳定时器
std::shared_ptr<Timer> _pBeatTimer;
};
} /* namespace mediakit */
#endif //ZLMEDIAKIT_RTSPPUSHER_H

View File

@ -1,4 +1,4 @@
/* /*
* MIT License * MIT License
* *
* Copyright (c) 2016 xiongziliang <771730766@qq.com> * Copyright (c) 2016 xiongziliang <771730766@qq.com>
@ -85,7 +85,7 @@ RtspSession::~RtspSession() {
void RtspSession::onError(const SockException& err) { void RtspSession::onError(const SockException& err) {
TraceL << err.getErrCode() << " " << err.what(); TraceL << err.getErrCode() << " " << err.what();
if (_rtpType == PlayerBase::RTP_MULTICAST) { if (_rtpType == Rtsp::RTP_MULTICAST) {
//取消UDP端口监听 //取消UDP端口监听
UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this); UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this);
} }
@ -118,7 +118,7 @@ void RtspSession::onManager() {
} }
if ((_rtpType == PlayerBase::RTP_UDP || _pushSrc ) && _ticker.elapsedTime() > 15 * 1000) { if ((_rtpType == Rtsp::RTP_UDP || _pushSrc ) && _ticker.elapsedTime() > 15 * 1000) {
//如果是推流端或者rtp over udp类型的播放端那么就做超时检测 //如果是推流端或者rtp over udp类型的播放端那么就做超时检测
WarnL << "RTSP会话超时:" << get_peer_ip(); WarnL << "RTSP会话超时:" << get_peer_ip();
shutdown(); shutdown();
@ -550,22 +550,22 @@ bool RtspSession::handleReq_Setup(const Parser &parser) {
} }
trackRef->_inited = true; //现在初始化 trackRef->_inited = true; //现在初始化
if(_rtpType == PlayerBase::RTP_Invalid){ if(_rtpType == Rtsp::RTP_Invalid){
auto strTransport = parser["Transport"]; auto strTransport = parser["Transport"];
if(strTransport.find("TCP") != string::npos){ if(strTransport.find("TCP") != string::npos){
_rtpType = PlayerBase::RTP_TCP; _rtpType = Rtsp::RTP_TCP;
}else if(strTransport.find("multicast") != string::npos){ }else if(strTransport.find("multicast") != string::npos){
_rtpType = PlayerBase::RTP_MULTICAST; _rtpType = Rtsp::RTP_MULTICAST;
}else{ }else{
_rtpType = PlayerBase::RTP_UDP; _rtpType = Rtsp::RTP_UDP;
} }
} }
//允许接收rtp、rtcp包 //允许接收rtp、rtcp包
RtspSplitter::enableRecvRtp(_rtpType == PlayerBase::RTP_TCP); RtspSplitter::enableRecvRtp(_rtpType == Rtsp::RTP_TCP);
switch (_rtpType) { switch (_rtpType) {
case PlayerBase::RTP_TCP: { case Rtsp::RTP_TCP: {
trackRef->_interleaved = trackRef->_type * 2; trackRef->_interleaved = trackRef->_type * 2;
sendRtspResponse("200 OK", sendRtspResponse("200 OK",
{"Transport",StrPrinter << "RTP/AVP/TCP;unicast;" {"Transport",StrPrinter << "RTP/AVP/TCP;unicast;"
@ -576,7 +576,7 @@ bool RtspSession::handleReq_Setup(const Parser &parser) {
}); });
} }
break; break;
case PlayerBase::RTP_UDP: { case Rtsp::RTP_UDP: {
//我们用trackIdx区分rtp和rtcp包 //我们用trackIdx区分rtp和rtcp包
auto pSockRtp = std::make_shared<Socket>(_sock->getPoller()); auto pSockRtp = std::make_shared<Socket>(_sock->getPoller());
if (!pSockRtp->bindUdpSock(0,get_local_ip().data())) { if (!pSockRtp->bindUdpSock(0,get_local_ip().data())) {
@ -615,7 +615,7 @@ bool RtspSession::handleReq_Setup(const Parser &parser) {
}); });
} }
break; break;
case PlayerBase::RTP_MULTICAST: { case Rtsp::RTP_MULTICAST: {
if(!_pBrdcaster){ if(!_pBrdcaster){
_pBrdcaster = RtpBroadCaster::get(get_local_ip(),_mediaInfo._vhost, _mediaInfo._app, _mediaInfo._streamid); _pBrdcaster = RtpBroadCaster::get(get_local_ip(),_mediaInfo._vhost, _mediaInfo._app, _mediaInfo._streamid);
if (!_pBrdcaster) { if (!_pBrdcaster) {
@ -729,7 +729,7 @@ bool RtspSession::handleReq_Play(const Parser &parser) {
SockUtil::setNoDelay(_sock->rawFD(),false); SockUtil::setNoDelay(_sock->rawFD(),false);
(*this) << SocketFlags(kSockFlags); (*this) << SocketFlags(kSockFlags);
if (!_pRtpReader && _rtpType != PlayerBase::RTP_MULTICAST) { if (!_pRtpReader && _rtpType != Rtsp::RTP_MULTICAST) {
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this()); weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
_pRtpReader = pMediaSrc->getRing()->attach(getPoller(),useBuf); _pRtpReader = pMediaSrc->getRing()->attach(getPoller(),useBuf);
_pRtpReader->setDetachCB([weakSelf]() { _pRtpReader->setDetachCB([weakSelf]() {
@ -959,7 +959,7 @@ inline bool RtspSession::findStream() {
inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) { inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) {
//InfoL<<(int)pkt.Interleaved; //InfoL<<(int)pkt.Interleaved;
switch (_rtpType) { switch (_rtpType) {
case PlayerBase::RTP_TCP: { case Rtsp::RTP_TCP: {
BufferRtp::Ptr buffer(new BufferRtp(pkt)); BufferRtp::Ptr buffer(new BufferRtp(pkt));
send(buffer); send(buffer);
#ifdef RTSP_SEND_RTCP #ifdef RTSP_SEND_RTCP
@ -977,7 +977,7 @@ inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) {
#endif #endif
} }
break; break;
case PlayerBase::RTP_UDP: { case Rtsp::RTP_UDP: {
int iTrackIndex = getTrackIndexByTrackType(pkt->type); int iTrackIndex = getTrackIndexByTrackType(pkt->type);
auto &pSock = _apRtpSock[iTrackIndex]; auto &pSock = _apRtpSock[iTrackIndex];
if (!pSock) { if (!pSock) {
@ -1051,7 +1051,7 @@ inline void RtspSession::startListenPeerUdpData(int trackIdx) {
}; };
switch (_rtpType){ switch (_rtpType){
case PlayerBase::RTP_MULTICAST:{ case Rtsp::RTP_MULTICAST:{
//组播使用的共享rtcp端口 //组播使用的共享rtcp端口
UDPServer::Instance().listenPeer(get_peer_ip().data(), this, [onUdpData]( UDPServer::Instance().listenPeer(get_peer_ip().data(), this, [onUdpData](
int iTrackIdx, const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr) { int iTrackIdx, const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr) {
@ -1059,7 +1059,7 @@ inline void RtspSession::startListenPeerUdpData(int trackIdx) {
}); });
} }
break; break;
case PlayerBase::RTP_UDP:{ case Rtsp::RTP_UDP:{
auto setEvent = [&](Socket::Ptr &sock,int iTrackIdx){ auto setEvent = [&](Socket::Ptr &sock,int iTrackIdx){
if(!sock){ if(!sock){
WarnL << "udp端口为空:" << iTrackIdx; WarnL << "udp端口为空:" << iTrackIdx;

View File

@ -159,7 +159,7 @@ private:
MediaInfo _mediaInfo; MediaInfo _mediaInfo;
std::weak_ptr<RtspMediaSource> _pMediaSrc; std::weak_ptr<RtspMediaSource> _pMediaSrc;
RingBuffer<RtpPacket::Ptr>::RingReader::Ptr _pRtpReader; RingBuffer<RtpPacket::Ptr>::RingReader::Ptr _pRtpReader;
PlayerBase::eRtpType _rtpType = PlayerBase::RTP_Invalid; Rtsp::eRtpType _rtpType = Rtsp::RTP_Invalid;
vector<SdpTrack::Ptr> _aTrackInfo; vector<SdpTrack::Ptr> _aTrackInfo;
//RTP over udp //RTP over udp

View File

@ -32,47 +32,48 @@
#include "Player/PlayerProxy.h" #include "Player/PlayerProxy.h"
#include "Rtmp/RtmpPusher.h" #include "Rtmp/RtmpPusher.h"
#include "Common/config.h" #include "Common/config.h"
#include "Pusher/MediaPusher.h"
using namespace std; using namespace std;
using namespace toolkit; using namespace toolkit;
using namespace mediakit; using namespace mediakit;
//推流器,保持强引用 //推流器,保持强引用
RtmpPusher::Ptr pusher; MediaPusher::Ptr pusher;
//声明函数 //声明函数
void rePushDelay(const string &app, const string &stream, const string &url); void rePushDelay(const string &schema,const string &vhost,const string &app, const string &stream, const string &url);
//创建推流器并开始推流 //创建推流器并开始推流
void createPusher(const string &app, const string &stream, const string &url) { void createPusher(const string &schema,const string &vhost,const string &app, const string &stream, const string &url) {
//创建推流器并绑定一个RtmpMediaSource //创建推流器并绑定一个MediaSource
pusher.reset(new RtmpPusher(DEFAULT_VHOST, app.data(), stream.data())); pusher.reset(new MediaPusher(schema,vhost, app, stream));
//设置推流中断处理逻辑 //设置推流中断处理逻辑
pusher->setOnShutdown([app, stream, url](const SockException &ex) { pusher->setOnShutdown([schema,vhost, app, stream, url](const SockException &ex) {
WarnL << "Server connection is closed:" << ex.getErrCode() << " " << ex.what(); WarnL << "Server connection is closed:" << ex.getErrCode() << " " << ex.what();
//重试 //重试
rePushDelay(app, stream, url); rePushDelay(schema,vhost,app, stream, url);
}); });
//设置发布结果处理逻辑 //设置发布结果处理逻辑
pusher->setOnPublished([app, stream, url](const SockException &ex) { pusher->setOnPublished([schema,vhost, app, stream, url](const SockException &ex) {
if (ex) { if (ex) {
WarnL << "Publish fail:" << ex.getErrCode() << " " << ex.what(); WarnL << "Publish fail:" << ex.getErrCode() << " " << ex.what();
//如果发布失败,就重试 //如果发布失败,就重试
rePushDelay(app, stream, url); rePushDelay(schema,vhost,app, stream, url);
} else { } else {
InfoL << "Publish success,Please play with player:" << url; InfoL << "Publish success,Please play with player:" << url;
} }
}); });
pusher->publish(url.data()); pusher->publish(url);
} }
Timer::Ptr g_timer; Timer::Ptr g_timer;
//推流失败或断开延迟2秒后重试推流 //推流失败或断开延迟2秒后重试推流
void rePushDelay(const string &app, const string &stream, const string &url) { void rePushDelay(const string &schema,const string &vhost,const string &app, const string &stream, const string &url) {
g_timer = std::make_shared<Timer>(2,[app, stream, url]() { g_timer = std::make_shared<Timer>(2,[schema,vhost,app, stream, url]() {
InfoL << "Re-Publishing..."; InfoL << "Re-Publishing...";
//重新推流 //重新推流
createPusher(app, stream, url); createPusher(schema,vhost,app, stream, url);
//此任务不重复 //此任务不重复
return false; return false;
}, nullptr); }, nullptr);
@ -80,9 +81,6 @@ void rePushDelay(const string &app, const string &stream, const string &url) {
//这里才是真正执行main函数你可以把函数名(domain)改成main然后就可以输入自定义url了 //这里才是真正执行main函数你可以把函数名(domain)改成main然后就可以输入自定义url了
int domain(const string &playUrl, const string &pushUrl) { int domain(const string &playUrl, const string &pushUrl) {
//设置退出信号处理函数
static semaphore sem;
signal(SIGINT, [](int) { sem.post(); });// 设置退出信号
//设置日志 //设置日志
Logger::Instance().add(std::make_shared<ConsoleChannel>()); Logger::Instance().add(std::make_shared<ConsoleChannel>());
Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>()); Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());
@ -96,17 +94,21 @@ int domain(const string &playUrl, const string &pushUrl) {
NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaChanged, NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaChanged,
[pushUrl](BroadcastMediaChangedArgs) { [pushUrl](BroadcastMediaChangedArgs) {
//媒体源"app/stream"已经注册这时方可新建一个RtmpPusher对象并绑定该媒体源 //媒体源"app/stream"已经注册这时方可新建一个RtmpPusher对象并绑定该媒体源
if(bRegist && schema == RTMP_SCHEMA){ if(bRegist && pushUrl.find(schema) == 0){
createPusher(app, stream, pushUrl); createPusher(schema,vhost,app, stream, pushUrl);
} }
}); });
//设置退出信号处理函数
static semaphore sem;
signal(SIGINT, [](int) { sem.post(); });// 设置退出信号
sem.wait(); sem.wait();
return 0; return 0;
} }
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
return domain("rtmp://live.hkstv.hk.lxdns.com/live/hks", "rtmp://127.0.0.1/live/stream"); return domain("rtmp://live.hkstv.hk.lxdns.com/live/hks1", "rtsp://127.0.0.1/live/rtsp_push");
} }

View File

@ -32,6 +32,7 @@
#include "Player/PlayerProxy.h" #include "Player/PlayerProxy.h"
#include "Rtmp/RtmpPusher.h" #include "Rtmp/RtmpPusher.h"
#include "Common/config.h" #include "Common/config.h"
#include "Pusher/MediaPusher.h"
#include "MediaFile/MediaReader.h" #include "MediaFile/MediaReader.h"
using namespace std; using namespace std;
@ -39,49 +40,49 @@ using namespace toolkit;
using namespace mediakit; using namespace mediakit;
//推流器,保持强引用 //推流器,保持强引用
RtmpPusher::Ptr pusher; MediaPusher::Ptr pusher;
//声明函数 //声明函数
void rePushDelay(const string &app,const string &stream,const string &url); void rePushDelay(const string &schema,const string &vhost,const string &app, const string &stream, const string &url);
void createPusher(const string &app,const string &stream,const string &url);
//创建推流器并开始推流 //创建推流器并开始推流
void createPusher(const string &app,const string &stream,const string &url){ void createPusher(const string &schema,const string &vhost,const string &app, const string &stream, const string &url) {
auto rtmpSrc = dynamic_pointer_cast<RtmpMediaSource>(MediaReader::onMakeMediaSource(RTMP_SCHEMA,DEFAULT_VHOST,app,stream)); auto src = MediaReader::onMakeMediaSource(schema,vhost,app,stream);
if(!rtmpSrc){ if(!src){
//文件不存在 //文件不存在
WarnL << "MP4 file not exited!"; WarnL << "MP4 file not exited!";
return; return;
} }
//创建推流器并绑定一个RtmpMediaSource //创建推流器并绑定一个MediaSource
pusher.reset(new RtmpPusher(rtmpSrc)); pusher.reset(new MediaPusher(src));
//设置推流中断处理逻辑 //设置推流中断处理逻辑
pusher->setOnShutdown([app,stream, url](const SockException &ex) { pusher->setOnShutdown([schema,vhost,app,stream, url](const SockException &ex) {
WarnL << "Server connection is closed:" << ex.getErrCode() << " " << ex.what(); WarnL << "Server connection is closed:" << ex.getErrCode() << " " << ex.what();
//重新推流 //重新推流
rePushDelay(app, stream, url); rePushDelay(schema,vhost,app, stream, url);
}); });
//设置发布结果处理逻辑 //设置发布结果处理逻辑
pusher->setOnPublished([app,stream, url](const SockException &ex) { pusher->setOnPublished([schema,vhost,app,stream, url](const SockException &ex) {
if (ex) { if (ex) {
WarnL << "Publish fail:" << ex.getErrCode() << " " << ex.what(); WarnL << "Publish fail:" << ex.getErrCode() << " " << ex.what();
//如果发布失败,就重试 //如果发布失败,就重试
rePushDelay(app,stream, url); rePushDelay(schema,vhost,app, stream, url);
}else { }else {
InfoL << "Publish success,Please play with player:" << url; InfoL << "Publish success,Please play with player:" << url;
} }
}); });
pusher->publish(url.data()); pusher->publish(url);
} }
Timer::Ptr g_timer; Timer::Ptr g_timer;
//推流失败或断开延迟2秒后重试推流 //推流失败或断开延迟2秒后重试推流
void rePushDelay(const string &app, const string &stream, const string &url) { void rePushDelay(const string &schema,const string &vhost,const string &app, const string &stream, const string &url) {
g_timer = std::make_shared<Timer>(2,[app, stream, url]() { g_timer = std::make_shared<Timer>(2,[schema,vhost,app, stream, url]() {
InfoL << "Re-Publishing..."; InfoL << "Re-Publishing...";
//重新推流 //重新推流
createPusher(app, stream, url); createPusher(schema,vhost,app, stream, url);
//此任务不重复 //此任务不重复
return false; return false;
}, nullptr); }, nullptr);
@ -101,7 +102,7 @@ int domain(const string & filePath,const string & pushUrl){
string appName = mINI::Instance()[Record::kAppName]; string appName = mINI::Instance()[Record::kAppName];
//app必须recordfilePath(流id)为相对于httpRoot/record的路径否则MediaReader会找到不该文件 //app必须recordfilePath(流id)为相对于httpRoot/record的路径否则MediaReader会找到不该文件
//限制app为record是为了防止服务器上的文件被肆意访问 //限制app为record是为了防止服务器上的文件被肆意访问
createPusher(appName,filePath,pushUrl); createPusher(FindField(pushUrl.data(), nullptr,"://"),DEFAULT_VHOST,appName,filePath,pushUrl);
sem.wait(); sem.wait();
return 0; return 0;
@ -112,7 +113,7 @@ int domain(const string & filePath,const string & pushUrl){
int main(int argc,char *argv[]){ int main(int argc,char *argv[]){
//MP4文件需要放置在 httpRoot/record目录下,文件负载必须为h264+aac //MP4文件需要放置在 httpRoot/record目录下,文件负载必须为h264+aac
//可以使用test_server生成的mp4文件 //可以使用test_server生成的mp4文件
return domain("app/stream/2017-09-30/12-55-38.mp4","rtmp://jizan.iok.la/live/test"); return domain("app/stream/2017-09-30/12-55-38.mp4","rtsp://127.0.0.1/live/rtsp_push");
} }