From ef9ebc89e0c2f9c260fcc35ae6b52142b8da523f Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Thu, 30 Aug 2018 19:29:54 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8A=BD=E8=B1=A1FLV=E5=A4=8D=E7=94=A8?= =?UTF-8?q?=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Http/HttpSession.cpp | 173 +++++++----------------------- src/Http/HttpSession.h | 18 ++-- src/Rtmp/FlvMuxer.cpp | 226 +++++++++++++++++++++++++++++++++++++++ src/Rtmp/FlvMuxer.h | 59 ++++++++++ 4 files changed, 332 insertions(+), 144 deletions(-) create mode 100644 src/Rtmp/FlvMuxer.cpp create mode 100644 src/Rtmp/FlvMuxer.h diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index e031166f..cc0021db 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -175,9 +175,8 @@ void HttpSession::onError(const SockException& err) { //WarnL << err.what(); GET_CONFIG_AND_REGISTER(uint32_t,iFlowThreshold,Broadcast::kFlowThreshold); - if(m_previousTagSize > iFlowThreshold * 1024){ - uint64_t totalBytes = m_previousTagSize; - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport,m_mediaInfo,totalBytes,*this); + if(m_ui64TotalBytes > iFlowThreshold * 1024){ + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport,m_mediaInfo,m_ui64TotalBytes,*this); } } @@ -224,73 +223,18 @@ inline bool HttpSession::checkLiveFlvStream(){ } //找到rtmp源,发送http头,负载后续发送 sendResponse("200 OK", makeHttpHeader(false,0,get_mime_type(".flv")), ""); - //发送flv文件头 - char flv_file_header[] = "FLV\x1\x5\x0\x0\x0\x9"; // have audio and have video - bool is_have_audio = false,is_have_video = false; - - mediaSrc->getConfigFrame([&](const RtmpPacket::Ptr &pkt){ - if(pkt->typeId == MSG_VIDEO){ - is_have_video = true; - } - if(pkt->typeId == MSG_AUDIO){ - is_have_audio = true; - } - }); - - if (is_have_audio && is_have_video) { - flv_file_header[4] = 0x05; - } else if (is_have_audio && !is_have_video) { - flv_file_header[4] = 0x04; - } else if (!is_have_audio && is_have_video) { - flv_file_header[4] = 0x01; - } else { - flv_file_header[4] = 0x00; - } - //send flv header - send(flv_file_header, sizeof(flv_file_header) - 1); - //send metadata - AMFEncoder invoke; - invoke << "onMetaData" << mediaSrc->getMetaData(); - sendRtmp(MSG_DATA, invoke.data(), 0); - //send config frame - mediaSrc->getConfigFrame([&](const RtmpPacket::Ptr &pkt){ - onSendMedia(pkt); - }); //开始发送rtmp负载 - //关闭tcp_nodelay ,优化性能 SockUtil::setNoDelay(_sock->rawFD(),false); (*this) << SocketFlags(sock_flags); - m_pRingReader = mediaSrc->getRing()->attach(); - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - m_pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt){ - auto strongSelf = weakSelf.lock(); - if(!strongSelf) { - return; - } - strongSelf->async([pkt,weakSelf](){ - auto strongSelf = weakSelf.lock(); - if(!strongSelf) { - return; - } - strongSelf->onSendMedia(pkt); - }); - }); - m_pRingReader->setDetachCB([weakSelf](){ - auto strongSelf = weakSelf.lock(); - if(!strongSelf) { - return; - } - strongSelf->async_first([weakSelf](){ - auto strongSelf = weakSelf.lock(); - if(!strongSelf) { - return; - } - strongSelf->shutdown(); - }); - }); + try{ + start(mediaSrc); + }catch (std::exception &ex){ + //该rtmp源不存在 + shutdown(); + } }; weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); @@ -682,86 +626,43 @@ void HttpSession::responseDelay(const string &Origin,bool bClose, } inline void HttpSession::sendNotFound(bool bClose) { GET_CONFIG_AND_REGISTER(string,notFound,Config::Http::kNotFound); - sendResponse("404 Not Found", makeHttpHeader(bClose, notFound.size()), notFound); } -void HttpSession::onSendMedia(const RtmpPacket::Ptr &pkt) { - auto modifiedStamp = pkt->timeStamp; - auto &firstStamp = m_aui32FirstStamp[pkt->typeId % 2]; - if(!firstStamp){ - firstStamp = modifiedStamp; - } - if(modifiedStamp >= firstStamp){ - //计算时间戳增量 - modifiedStamp -= firstStamp; - }else{ - //发生回环,重新计算时间戳增量 - CLEAR_ARR(m_aui32FirstStamp); - modifiedStamp = 0; - } - sendRtmp(pkt, modifiedStamp); + +void HttpSession::onWrite(const Buffer::Ptr &buffer) { + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + async([weakSelf,buffer](){ + auto strongSelf = weakSelf.lock(); + if(!strongSelf) { + return; + } + strongSelf->m_ui64TotalBytes += buffer->size(); + strongSelf->send(buffer); + }); } -#if defined(_WIN32) -#pragma pack(push, 1) -#endif // defined(_WIN32) +void HttpSession::onWrite(const char *data, int len) { + BufferRaw::Ptr buffer(new BufferRaw); + buffer->assign(data,len); -class RtmpTagHeader { -public: - uint8_t type = 0; - uint8_t data_size[3] = {0}; - uint8_t timestamp[3] = {0}; - uint8_t timestamp_ex = 0; - uint8_t streamid[3] = {0}; /* Always 0. */ -}PACKED; - -#if defined(_WIN32) -#pragma pack(pop) -#endif // defined(_WIN32) - -class BufferRtmp : public Buffer{ -public: - typedef std::shared_ptr Ptr; - BufferRtmp(const RtmpPacket::Ptr & pkt):_rtmp(pkt){} - virtual ~BufferRtmp(){} - - char *data() override { - return (char *)_rtmp->strBuf.data(); - } - uint32_t size() const override { - return _rtmp->strBuf.size(); - } -private: - RtmpPacket::Ptr _rtmp; -}; - -void HttpSession::sendRtmp(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp) { - auto size = htonl(m_previousTagSize); - send((char *)&size,4);//send PreviousTagSize - RtmpTagHeader header; - header.type = pkt->typeId; - set_be24(header.data_size, pkt->strBuf.size()); - header.timestamp_ex = (uint8_t) ((ui32TimeStamp >> 24) & 0xff); - set_be24(header.timestamp,ui32TimeStamp & 0xFFFFFF); - send((char *)&header, sizeof(header));//send tag header - send(std::make_shared(pkt));//send tag data - m_previousTagSize += (pkt->strBuf.size() + sizeof(header)); - m_ticker.resetTime(); + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + async([weakSelf,buffer](){ + auto strongSelf = weakSelf.lock(); + if(!strongSelf) { + return; + } + strongSelf->m_ui64TotalBytes += buffer->size(); + strongSelf->send(buffer); + }); } -void HttpSession::sendRtmp(uint8_t ui8Type, const std::string& strBuf, uint32_t ui32TimeStamp) { - auto size = htonl(m_previousTagSize); - send((char *)&size,4);//send PreviousTagSize - RtmpTagHeader header; - header.type = ui8Type; - set_be24(header.data_size, strBuf.size()); - header.timestamp_ex = (uint8_t) ((ui32TimeStamp >> 24) & 0xff); - set_be24(header.timestamp,ui32TimeStamp & 0xFFFFFF); - send((char *)&header, sizeof(header));//send tag header - send(strBuf);//send tag data - m_previousTagSize += (strBuf.size() + sizeof(header)); - m_ticker.resetTime(); +void HttpSession::onDetach() { + safeShutdown(); +} + +std::shared_ptr HttpSession::getSharedPtr(){ + return dynamic_pointer_cast(shared_from_this()); } } /* namespace Http */ diff --git a/src/Http/HttpSession.h b/src/Http/HttpSession.h index 21a34b3a..763e6054 100644 --- a/src/Http/HttpSession.h +++ b/src/Http/HttpSession.h @@ -31,6 +31,7 @@ #include "Rtsp/Rtsp.h" #include "Network/TcpSession.h" #include "Rtmp/RtmpMediaSource.h" +#include "Rtmp/FlvMuxer.h" using namespace std; using namespace ZL::Rtmp; @@ -40,7 +41,7 @@ namespace ZL { namespace Http { -class HttpSession: public TcpSession { +class HttpSession: public TcpSession,FlvMuxer { public: typedef StrCaseMap KeyValue; typedef std::function getSharedPtr() override; private: typedef enum { @@ -70,16 +77,11 @@ private: string m_strRcvBuf; Ticker m_ticker; uint32_t m_iReqCnt = 0; + //消耗的总流量 + uint64_t m_ui64TotalBytes = 0; //flv over http - uint32_t m_aui32FirstStamp[2] = {0}; - uint32_t m_previousTagSize = 0; MediaInfo m_mediaInfo; - RingBuffer::RingReader::Ptr m_pRingReader; - - void onSendMedia(const RtmpPacket::Ptr &pkt); - void sendRtmp(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp); - void sendRtmp(uint8_t ui8Type, const std::string& strBuf, uint32_t ui32TimeStamp); inline HttpCode parserHttpReq(const string &); inline HttpCode Handle_Req_GET(); diff --git a/src/Rtmp/FlvMuxer.cpp b/src/Rtmp/FlvMuxer.cpp new file mode 100644 index 00000000..1e6c6815 --- /dev/null +++ b/src/Rtmp/FlvMuxer.cpp @@ -0,0 +1,226 @@ +// +// Created by xzl on 2018/8/30. +// + +#include "Util/File.h" +#include "FlvMuxer.h" +#include "utils.h" + +#define FILE_BUF_SIZE (64 * 1024) + +namespace ZL { +namespace Rtmp { + + +FlvMuxer::FlvMuxer() { +} +FlvMuxer::~FlvMuxer() { +} + +void FlvMuxer::start(const RtmpMediaSource::Ptr &media) { + if(!media){ + throw std::runtime_error("RtmpMediaSource 无效"); + } + if(!media->ready()){ + throw std::runtime_error("RtmpMediaSource 未准备好"); + } + + onWriteFlvHeader(media); + + std::weak_ptr weakSelf = getSharedPtr(); + _ring_reader = media->getRing()->attach(); + _ring_reader->setDetachCB([weakSelf](){ + auto strongSelf = weakSelf.lock(); + if(!strongSelf){ + return; + } + strongSelf->onDetach(); + }); + _ring_reader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt){ + auto strongSelf = weakSelf.lock(); + if(!strongSelf){ + return; + } + strongSelf->onWriteRtmp(pkt); + }); +} + +void FlvMuxer::onWriteFlvHeader(const RtmpMediaSource::Ptr &mediaSrc) { + m_previousTagSize = 0; + CLEAR_ARR(m_aui32FirstStamp); + + //发送flv文件头 + char flv_file_header[] = "FLV\x1\x5\x0\x0\x0\x9"; // have audio and have video + bool is_have_audio = false,is_have_video = false; + + mediaSrc->getConfigFrame([&](const RtmpPacket::Ptr &pkt){ + if(pkt->typeId == MSG_VIDEO){ + is_have_video = true; + } + if(pkt->typeId == MSG_AUDIO){ + is_have_audio = true; + } + }); + + if (is_have_audio && is_have_video) { + flv_file_header[4] = 0x05; + } else if (is_have_audio && !is_have_video) { + flv_file_header[4] = 0x04; + } else if (!is_have_audio && is_have_video) { + flv_file_header[4] = 0x01; + } else { + flv_file_header[4] = 0x00; + } + + //flv header + onWrite(flv_file_header, sizeof(flv_file_header) - 1); + //metadata + AMFEncoder invoke; + invoke << "onMetaData" << mediaSrc->getMetaData(); + onWriteFlvTag(MSG_DATA, invoke.data(), 0); + //config frame + mediaSrc->getConfigFrame([&](const RtmpPacket::Ptr &pkt){ + onWriteRtmp(pkt); + }); +} + + + +#if defined(_WIN32) +#pragma pack(push, 1) +#endif // defined(_WIN32) + +class RtmpTagHeader { +public: + uint8_t type = 0; + uint8_t data_size[3] = {0}; + uint8_t timestamp[3] = {0}; + uint8_t timestamp_ex = 0; + uint8_t streamid[3] = {0}; /* Always 0. */ +}PACKED; + +#if defined(_WIN32) +#pragma pack(pop) +#endif // defined(_WIN32) + +class BufferRtmp : public Buffer{ +public: + typedef std::shared_ptr Ptr; + BufferRtmp(const RtmpPacket::Ptr & pkt):_rtmp(pkt){} + virtual ~BufferRtmp(){} + + char *data() override { + return (char *)_rtmp->strBuf.data(); + } + uint32_t size() const override { + return _rtmp->strBuf.size(); + } +private: + RtmpPacket::Ptr _rtmp; +}; + + +void FlvMuxer::onWriteFlvTag(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp) { + auto size = htonl(m_previousTagSize); + onWrite((char *)&size,4);//onWrite PreviousTagSize + RtmpTagHeader header; + header.type = pkt->typeId; + set_be24(header.data_size, pkt->strBuf.size()); + header.timestamp_ex = (uint8_t) ((ui32TimeStamp >> 24) & 0xff); + set_be24(header.timestamp,ui32TimeStamp & 0xFFFFFF); + onWrite((char *)&header, sizeof(header));//onWrite tag header + onWrite(std::make_shared(pkt));//onWrite tag data + m_previousTagSize += (pkt->strBuf.size() + sizeof(header)); +} + +void FlvMuxer::onWriteFlvTag(uint8_t ui8Type, const std::string &strBuf, uint32_t ui32TimeStamp) { + auto size = htonl(m_previousTagSize); + onWrite((char *)&size,4);//onWrite PreviousTagSize + RtmpTagHeader header; + header.type = ui8Type; + set_be24(header.data_size, strBuf.size()); + header.timestamp_ex = (uint8_t) ((ui32TimeStamp >> 24) & 0xff); + set_be24(header.timestamp,ui32TimeStamp & 0xFFFFFF); + onWrite((char *)&header, sizeof(header));//onWrite tag header + onWrite(std::make_shared(strBuf));//onWrite tag data + m_previousTagSize += (strBuf.size() + sizeof(header)); +} + +void FlvMuxer::onWriteRtmp(const RtmpPacket::Ptr &pkt) { + auto modifiedStamp = pkt->timeStamp; + auto &firstStamp = m_aui32FirstStamp[pkt->typeId % 2]; + if(!firstStamp){ + firstStamp = modifiedStamp; + } + if(modifiedStamp >= firstStamp){ + //计算时间戳增量 + modifiedStamp -= firstStamp; + }else{ + //发生回环,重新计算时间戳增量 + CLEAR_ARR(m_aui32FirstStamp); + modifiedStamp = 0; + } + onWriteFlvTag(pkt, modifiedStamp); +} + +void FlvMuxer::stop() { + if(_ring_reader){ + _ring_reader.reset(); + onDetach(); + } +} + +///////////////////////////////////////////////////////FlvRecorder///////////////////////////////////////////////////// +void FlvRecorder::startRecord(const string &vhost, const string &app, const string &stream,const string &file_path) { + startRecord(dynamic_pointer_cast(MediaSource::find(RTMP_SCHEMA,vhost,app,stream,false)),file_path); +} + +void FlvRecorder::startRecord(const RtmpMediaSource::Ptr &media, const string &file_path) { + //开辟文件写缓存 + std::shared_ptr fileBuf(new char[FILE_BUF_SIZE],[](char *ptr){ + if(ptr){ + delete [] ptr; + } + }); + //新建文件 + std::shared_ptr _file(File::createfile_file(file_path.data(),"wb"),[fileBuf](FILE *fp){ + if(fp){ + fflush(fp); + fclose(fp); + } + }); + if (!_file){ + throw std::runtime_error( StrPrinter << "打开文件失败:" << file_path); + } + + //设置文件写缓存 + setvbuf( _file.get(), fileBuf.get(),_IOFBF, FILE_BUF_SIZE); + start(media); +} + +void FlvRecorder::onWrite(const Buffer::Ptr &data) { + lock_guard lck(_file_mtx); + if(_file){ + fwrite(data->data(),data->size(),1,_file.get()); + } +} + +void FlvRecorder::onWrite(const char *data, int len) { + lock_guard lck(_file_mtx); + if(_file){ + fwrite(data,len,1,_file.get()); + } +} + +void FlvRecorder::onDetach() { + lock_guard lck(_file_mtx); + _file.reset(); +} + +std::shared_ptr FlvRecorder::getSharedPtr() { + return shared_from_this(); +} + + + }//namespace Rtmp +}//namespace ZL diff --git a/src/Rtmp/FlvMuxer.h b/src/Rtmp/FlvMuxer.h new file mode 100644 index 00000000..a118a242 --- /dev/null +++ b/src/Rtmp/FlvMuxer.h @@ -0,0 +1,59 @@ +// +// Created by xzl on 2018/8/30. +// + +#ifndef ZLMEDIAKIT_FLVRECORDER_H +#define ZLMEDIAKIT_FLVRECORDER_H + +#include "Rtmp.h" +#include "RtmpMediaSource.h" +#include "Network/Socket.h" + +using namespace ZL::Network; + +namespace ZL { +namespace Rtmp { + +class FlvMuxer{ +public: + FlvMuxer(); + virtual ~FlvMuxer(); + void stop(); +protected: + void start(const RtmpMediaSource::Ptr &media); + virtual void onWrite(const Buffer::Ptr &data) = 0; + virtual void onWrite(const char *data,int len) = 0; + virtual void onDetach() = 0; + virtual std::shared_ptr getSharedPtr() = 0; +private: + void onWriteFlvHeader(const RtmpMediaSource::Ptr &media); + void onWriteRtmp(const RtmpPacket::Ptr &pkt); + void onWriteFlvTag(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp); + void onWriteFlvTag(uint8_t ui8Type, const std::string &strBuf, uint32_t ui32TimeStamp); +private: + RtmpMediaSource::RingType::RingReader::Ptr _ring_reader; + uint32_t m_aui32FirstStamp[2] = {0}; + uint32_t m_previousTagSize = 0; +}; + +class FlvRecorder : public FlvMuxer , public std::enable_shared_from_this{ +public: + FlvRecorder(); + virtual ~FlvRecorder(); + void startRecord(const string &vhost,const string &app,const string &stream,const string &file_path); + void startRecord(const RtmpMediaSource::Ptr &media,const string &file_path); +private: + virtual void onWrite(const Buffer::Ptr &data) override ; + virtual void onWrite(const char *data,int len) override; + virtual void onDetach() override; + virtual std::shared_ptr getSharedPtr() override; +private: + std::shared_ptr _file; + recursive_mutex _file_mtx; +}; + + +}//namespace Rtmp +}//namespace ZL + +#endif //ZLMEDIAKIT_FLVRECORDER_H