diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index ea5319d9..6b3c4519 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -479,57 +479,52 @@ MediaSource::Ptr MediaSource::createFromMP4(const string &schema, const string & #endif //ENABLE_MP4 } -static bool isFlushAble_default(bool is_audio, uint32_t last_stamp, uint32_t new_stamp, int cache_size) { +static bool isFlushAble_default(bool is_video, uint32_t last_stamp, uint32_t new_stamp, int cache_size) { if (new_stamp < last_stamp) { //时间戳回退(可能seek中) return true; } - if (!is_audio) { - //这是视频,时间戳发送变化或者缓存超过1024个 - return last_stamp != new_stamp || cache_size >= 1024; - } - - //这是音频,缓存超过100ms或者缓存个数超过10个 - return new_stamp > last_stamp + 100 || cache_size > 10; + //时间戳发送变化或者缓存超过1024个,sendmsg接口一般最多只能发送1024个数据包 + return last_stamp != new_stamp || cache_size >= 1024; } -static bool isFlushAble_merge(bool is_audio, uint32_t last_stamp, uint32_t new_stamp, int cache_size, int merge_ms) { +static bool isFlushAble_merge(bool is_video, uint32_t last_stamp, uint32_t new_stamp, int cache_size, int merge_ms) { if (new_stamp < last_stamp) { //时间戳回退(可能seek中) return true; } - if(new_stamp > last_stamp + merge_ms){ + if (new_stamp > last_stamp + merge_ms) { //时间戳增量超过合并写阈值 return true; } - if (!is_audio) { - //这是视频,缓存数超过1024个,这个逻辑用于避免时间戳异常的流导致的内存暴增问题 - //而且sendmsg接口一般最多只能发送1024个数据包 - return cache_size >= 1024; - } - - //这是音频,音频缓存超过20个 - return cache_size > 20; + //缓存数超过1024个,这个逻辑用于避免时间戳异常的流导致的内存暴增问题 + //而且sendmsg接口一般最多只能发送1024个数据包 + return cache_size >= 1024; } -bool FlushPolicy::isFlushAble(uint32_t new_stamp, int cache_size) { - bool ret = false; - GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS); - if (mergeWriteMS <= 0) { - //关闭了合并写或者合并写阈值小于等于0 - ret = isFlushAble_default(_is_audio, _last_stamp, new_stamp, cache_size); +bool FlushPolicy::isFlushAble(bool is_video, bool is_key, uint32_t new_stamp, int cache_size) { + bool flush_flag = false; + if (is_key && is_video) { + //遇到关键帧flush掉前面的数据,确保关键帧为该组数据的第一帧,确保GOP缓存有效 + flush_flag = true; } else { - ret = isFlushAble_merge(_is_audio, _last_stamp, new_stamp, cache_size, mergeWriteMS); + GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS); + if (mergeWriteMS <= 0) { + //关闭了合并写或者合并写阈值小于等于0 + flush_flag = isFlushAble_default(is_video, _last_stamp[is_video], new_stamp, cache_size); + } else { + flush_flag = isFlushAble_merge(is_video, _last_stamp[is_video], new_stamp, cache_size, mergeWriteMS); + } } - if (ret) { -// DebugL << _is_audio << " " << _last_stamp << " " << new_stamp; - _last_stamp = new_stamp; + if (flush_flag) { +// DebugL << is_video << " " << _last_stamp[is_video] << " " << new_stamp; + _last_stamp[is_video] = new_stamp; } - return ret; + return flush_flag; } } /* namespace mediakit */ \ No newline at end of file diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 299cf709..6c775934 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -164,10 +164,7 @@ private: ///缓存刷新策略类 class FlushPolicy { public: - FlushPolicy(bool is_audio) { - _is_audio = is_audio; - }; - + FlushPolicy() = default; ~FlushPolicy() = default; uint32_t getStamp(const RtpPacket::Ptr &packet) { @@ -178,45 +175,45 @@ public: return packet->timeStamp; } - bool isFlushAble(uint32_t new_stamp, int cache_size); + bool isFlushAble(bool is_video, bool is_key, uint32_t new_stamp, int cache_size); + private: - bool _is_audio; - uint32_t _last_stamp= 0; + uint32_t _last_stamp[2] = {0, 0}; }; -/// 视频合并写缓存模板 +/// 合并写缓存模板 /// \tparam packet 包类型 /// \tparam policy 刷新缓存策略 /// \tparam packet_list 包缓存类型 template > > -class VideoPacketCache { +class PacketCache { public: - VideoPacketCache() : _policy(false) { + PacketCache(){ _cache = std::make_shared(); } - virtual ~VideoPacketCache() = default; + virtual ~PacketCache() = default; - void inputVideo(const std::shared_ptr &rtp, bool key_pos) { - if (_policy.isFlushAble(_policy.getStamp(rtp), _cache->size())) { + void inputPacket(bool is_video, const std::shared_ptr &pkt, bool key_pos) { + if (_policy.isFlushAble(is_video, key_pos, _policy.getStamp(pkt), _cache->size())) { flushAll(); } //追加数据到最后 - _cache->emplace_back(rtp); + _cache->emplace_back(pkt); if (key_pos) { _key_pos = key_pos; } } - virtual void onFlushVideo(std::shared_ptr &, bool key_pos) = 0; + virtual void onFlush(std::shared_ptr &, bool key_pos) = 0; private: void flushAll() { if (_cache->empty()) { return; } - onFlushVideo(_cache, _key_pos); + onFlush(_cache, _key_pos); _cache = std::make_shared(); _key_pos = false; } @@ -227,44 +224,5 @@ private: bool _key_pos = false; }; -/// 音频频合并写缓存模板 -/// \tparam packet 包类型 -/// \tparam policy 刷新缓存策略 -/// \tparam packet_list 包缓存类型 -template > > -class AudioPacketCache { -public: - AudioPacketCache() : _policy(true) { - _cache = std::make_shared(); - } - - virtual ~AudioPacketCache() = default; - - void inputAudio(const std::shared_ptr &rtp) { - if (_policy.isFlushAble(_policy.getStamp(rtp), _cache->size())) { - flushAll(); - } - //追加数据到最后 - _cache->emplace_back(rtp); - } - - virtual void onFlushAudio(std::shared_ptr &) = 0; - -private: - void flushAll() { - if (_cache->empty()) { - return; - } - onFlushAudio(_cache); - _cache = std::make_shared(); - } - -private: - policy _policy; - std::shared_ptr _cache; -}; - } /* namespace mediakit */ - - -#endif //ZLMEDIAKIT_MEDIASOURCE_H +#endif //ZLMEDIAKIT_MEDIASOURCE_H \ No newline at end of file diff --git a/src/Rtmp/RtmpMediaSource.h b/src/Rtmp/RtmpMediaSource.h index 23a0c6ad..0e0c7091 100644 --- a/src/Rtmp/RtmpMediaSource.h +++ b/src/Rtmp/RtmpMediaSource.h @@ -33,9 +33,6 @@ using namespace toolkit; #define RTMP_GOP_SIZE 512 namespace mediakit { -typedef VideoPacketCache RtmpVideoCache; -typedef AudioPacketCache RtmpAudioCache; - /** * rtmp媒体源的数据抽象 * rtmp有关键的三要素,分别是metadata、config帧,普通帧 @@ -43,7 +40,7 @@ typedef AudioPacketCache RtmpAudioCache; * 只要生成了这三要素,那么要实现rtmp推流、rtmp服务器就很简单了 * rtmp推拉流协议中,先传递metadata,然后传递config帧,然后一直传递普通帧 */ -class RtmpMediaSource : public MediaSource, public RingDelegate, public RtmpVideoCache, public RtmpAudioCache{ +class RtmpMediaSource : public MediaSource, public RingDelegate, public PacketCache{ public: typedef std::shared_ptr Ptr; typedef std::shared_ptr > RingDataType; @@ -149,12 +146,7 @@ public: regist(); } } - - if(pkt->typeId == MSG_VIDEO){ - RtmpVideoCache::inputVideo(pkt, key); - }else{ - RtmpAudioCache::inputAudio(pkt); - } + PacketCache::inputPacket(pkt->typeId == MSG_VIDEO, pkt, key); } /** @@ -175,21 +167,13 @@ public: private: /** - * 批量flush时间戳相同的视频rtmp包时触发该函数 - * @param rtmp_list 时间戳相同的rtmp包列表 + * 批量flush rtmp包时触发该函数 + * @param rtmp_list rtmp包列表 * @param key_pos 是否包含关键帧 */ - void onFlushVideo(std::shared_ptr > &rtmp_list, bool key_pos) override { - _ring->write(rtmp_list, key_pos); - } - - /** - * 批量flush一定数量的音频rtmp包时触发该函数 - * @param rtmp_list rtmp包列表 - */ - void onFlushAudio(std::shared_ptr > &rtmp_list) override{ - //只有音频的话,就不存在gop缓存的意义 - _ring->write(rtmp_list, !_have_video); + void onFlush(std::shared_ptr > &rtmp_list, bool key_pos) override { + //如果不存在视频,那么就没有存在GOP缓存的意义,所以is_key一直为true确保一直清空GOP缓存 + _ring->write(rtmp_list, _have_video ? key_pos : true); } /** diff --git a/src/Rtsp/RtspMediaSource.h b/src/Rtsp/RtspMediaSource.h index caededbf..4e2657e0 100644 --- a/src/Rtsp/RtspMediaSource.h +++ b/src/Rtsp/RtspMediaSource.h @@ -30,16 +30,13 @@ using namespace toolkit; #define RTP_GOP_SIZE 512 namespace mediakit { -typedef VideoPacketCache RtpVideoCache; -typedef AudioPacketCache RtpAudioCache; - - /** +/** * rtsp媒体源的数据抽象 * rtsp有关键的两要素,分别是sdp、rtp包 * 只要生成了这两要素,那么要实现rtsp推流、rtsp服务器就很简单了 * rtsp推拉流协议中,先传递sdp,然后再协商传输方式(tcp/udp/组播),最后一直传递rtp */ -class RtspMediaSource : public MediaSource, public RingDelegate, public RtpVideoCache, public RtpAudioCache { +class RtspMediaSource : public MediaSource, public RingDelegate, public PacketCache { public: typedef ResourcePool PoolType; typedef std::shared_ptr Ptr; @@ -175,32 +172,19 @@ public: regist(); } } - - if(rtp->type == TrackVideo){ - RtpVideoCache::inputVideo(rtp, keyPos); - }else{ - RtpAudioCache::inputAudio(rtp); - } + PacketCache::inputPacket(rtp->type == TrackVideo, rtp, keyPos); } private: /** - * 批量flush时间戳相同的视频rtp包时触发该函数 - * @param rtp_list 时间戳相同的rtp包列表 + * 批量flush rtp包时触发该函数 + * @param rtp_list rtp包列表 * @param key_pos 是否包含关键帧 */ - void onFlushVideo(std::shared_ptr > &rtp_list, bool key_pos) override { - _ring->write(rtp_list, key_pos); - } - - /** - * 批量flush一定数量的音频rtp包时触发该函数 - * @param rtp_list rtp包列表 - */ - void onFlushAudio(std::shared_ptr > &rtp_list) override{ - //只有音频的话,就不存在gop缓存的意义 - _ring->write(rtp_list, !_have_video); + void onFlush(std::shared_ptr > &rtp_list, bool key_pos) override { + //如果不存在视频,那么就没有存在GOP缓存的意义,所以is_key一直为true确保一直清空GOP缓存 + _ring->write(rtp_list, _have_video ? key_pos : true); } /**