mirror of
https://github.com/ZLMediaKit/ZLMediaKit.git
synced 2025-02-11 13:19:42 +08:00
优化合并写逻辑,确保GOP缓存第一帧为关键帧并确保音视频数据的交织性
This commit is contained in:
parent
a7bcfd566b
commit
dea36cfc84
@ -479,57 +479,52 @@ MediaSource::Ptr MediaSource::createFromMP4(const string &schema, const string &
|
||||
#endif //ENABLE_MP4
|
||||
}
|
||||
|
||||
static bool isFlushAble_default(bool is_audio, uint32_t last_stamp, uint32_t new_stamp, int cache_size) {
|
||||
static bool isFlushAble_default(bool is_video, uint32_t last_stamp, uint32_t new_stamp, int cache_size) {
|
||||
if (new_stamp < last_stamp) {
|
||||
//时间戳回退(可能seek中)
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!is_audio) {
|
||||
//这是视频,时间戳发送变化或者缓存超过1024个
|
||||
return last_stamp != new_stamp || cache_size >= 1024;
|
||||
}
|
||||
|
||||
//这是音频,缓存超过100ms或者缓存个数超过10个
|
||||
return new_stamp > last_stamp + 100 || cache_size > 10;
|
||||
//时间戳发送变化或者缓存超过1024个,sendmsg接口一般最多只能发送1024个数据包
|
||||
return last_stamp != new_stamp || cache_size >= 1024;
|
||||
}
|
||||
|
||||
static bool isFlushAble_merge(bool is_audio, uint32_t last_stamp, uint32_t new_stamp, int cache_size, int merge_ms) {
|
||||
static bool isFlushAble_merge(bool is_video, uint32_t last_stamp, uint32_t new_stamp, int cache_size, int merge_ms) {
|
||||
if (new_stamp < last_stamp) {
|
||||
//时间戳回退(可能seek中)
|
||||
return true;
|
||||
}
|
||||
|
||||
if(new_stamp > last_stamp + merge_ms){
|
||||
if (new_stamp > last_stamp + merge_ms) {
|
||||
//时间戳增量超过合并写阈值
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!is_audio) {
|
||||
//这是视频,缓存数超过1024个,这个逻辑用于避免时间戳异常的流导致的内存暴增问题
|
||||
//而且sendmsg接口一般最多只能发送1024个数据包
|
||||
return cache_size >= 1024;
|
||||
}
|
||||
|
||||
//这是音频,音频缓存超过20个
|
||||
return cache_size > 20;
|
||||
//缓存数超过1024个,这个逻辑用于避免时间戳异常的流导致的内存暴增问题
|
||||
//而且sendmsg接口一般最多只能发送1024个数据包
|
||||
return cache_size >= 1024;
|
||||
}
|
||||
|
||||
bool FlushPolicy::isFlushAble(uint32_t new_stamp, int cache_size) {
|
||||
bool ret = false;
|
||||
GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS);
|
||||
if (mergeWriteMS <= 0) {
|
||||
//关闭了合并写或者合并写阈值小于等于0
|
||||
ret = isFlushAble_default(_is_audio, _last_stamp, new_stamp, cache_size);
|
||||
bool FlushPolicy::isFlushAble(bool is_video, bool is_key, uint32_t new_stamp, int cache_size) {
|
||||
bool flush_flag = false;
|
||||
if (is_key && is_video) {
|
||||
//遇到关键帧flush掉前面的数据,确保关键帧为该组数据的第一帧,确保GOP缓存有效
|
||||
flush_flag = true;
|
||||
} else {
|
||||
ret = isFlushAble_merge(_is_audio, _last_stamp, new_stamp, cache_size, mergeWriteMS);
|
||||
GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS);
|
||||
if (mergeWriteMS <= 0) {
|
||||
//关闭了合并写或者合并写阈值小于等于0
|
||||
flush_flag = isFlushAble_default(is_video, _last_stamp[is_video], new_stamp, cache_size);
|
||||
} else {
|
||||
flush_flag = isFlushAble_merge(is_video, _last_stamp[is_video], new_stamp, cache_size, mergeWriteMS);
|
||||
}
|
||||
}
|
||||
|
||||
if (ret) {
|
||||
// DebugL << _is_audio << " " << _last_stamp << " " << new_stamp;
|
||||
_last_stamp = new_stamp;
|
||||
if (flush_flag) {
|
||||
// DebugL << is_video << " " << _last_stamp[is_video] << " " << new_stamp;
|
||||
_last_stamp[is_video] = new_stamp;
|
||||
}
|
||||
return ret;
|
||||
return flush_flag;
|
||||
}
|
||||
|
||||
} /* namespace mediakit */
|
@ -164,10 +164,7 @@ private:
|
||||
///缓存刷新策略类
|
||||
class FlushPolicy {
|
||||
public:
|
||||
FlushPolicy(bool is_audio) {
|
||||
_is_audio = is_audio;
|
||||
};
|
||||
|
||||
FlushPolicy() = default;
|
||||
~FlushPolicy() = default;
|
||||
|
||||
uint32_t getStamp(const RtpPacket::Ptr &packet) {
|
||||
@ -178,45 +175,45 @@ public:
|
||||
return packet->timeStamp;
|
||||
}
|
||||
|
||||
bool isFlushAble(uint32_t new_stamp, int cache_size);
|
||||
bool isFlushAble(bool is_video, bool is_key, uint32_t new_stamp, int cache_size);
|
||||
|
||||
private:
|
||||
bool _is_audio;
|
||||
uint32_t _last_stamp= 0;
|
||||
uint32_t _last_stamp[2] = {0, 0};
|
||||
};
|
||||
|
||||
/// 视频合并写缓存模板
|
||||
/// 合并写缓存模板
|
||||
/// \tparam packet 包类型
|
||||
/// \tparam policy 刷新缓存策略
|
||||
/// \tparam packet_list 包缓存类型
|
||||
template<typename packet, typename policy = FlushPolicy, typename packet_list = List<std::shared_ptr<packet> > >
|
||||
class VideoPacketCache {
|
||||
class PacketCache {
|
||||
public:
|
||||
VideoPacketCache() : _policy(false) {
|
||||
PacketCache(){
|
||||
_cache = std::make_shared<packet_list>();
|
||||
}
|
||||
|
||||
virtual ~VideoPacketCache() = default;
|
||||
virtual ~PacketCache() = default;
|
||||
|
||||
void inputVideo(const std::shared_ptr<packet> &rtp, bool key_pos) {
|
||||
if (_policy.isFlushAble(_policy.getStamp(rtp), _cache->size())) {
|
||||
void inputPacket(bool is_video, const std::shared_ptr<packet> &pkt, bool key_pos) {
|
||||
if (_policy.isFlushAble(is_video, key_pos, _policy.getStamp(pkt), _cache->size())) {
|
||||
flushAll();
|
||||
}
|
||||
|
||||
//追加数据到最后
|
||||
_cache->emplace_back(rtp);
|
||||
_cache->emplace_back(pkt);
|
||||
if (key_pos) {
|
||||
_key_pos = key_pos;
|
||||
}
|
||||
}
|
||||
|
||||
virtual void onFlushVideo(std::shared_ptr<packet_list> &, bool key_pos) = 0;
|
||||
virtual void onFlush(std::shared_ptr<packet_list> &, bool key_pos) = 0;
|
||||
|
||||
private:
|
||||
void flushAll() {
|
||||
if (_cache->empty()) {
|
||||
return;
|
||||
}
|
||||
onFlushVideo(_cache, _key_pos);
|
||||
onFlush(_cache, _key_pos);
|
||||
_cache = std::make_shared<packet_list>();
|
||||
_key_pos = false;
|
||||
}
|
||||
@ -227,44 +224,5 @@ private:
|
||||
bool _key_pos = false;
|
||||
};
|
||||
|
||||
/// 音频频合并写缓存模板
|
||||
/// \tparam packet 包类型
|
||||
/// \tparam policy 刷新缓存策略
|
||||
/// \tparam packet_list 包缓存类型
|
||||
template<typename packet, typename policy = FlushPolicy, typename packet_list = List<std::shared_ptr<packet> > >
|
||||
class AudioPacketCache {
|
||||
public:
|
||||
AudioPacketCache() : _policy(true) {
|
||||
_cache = std::make_shared<packet_list>();
|
||||
}
|
||||
|
||||
virtual ~AudioPacketCache() = default;
|
||||
|
||||
void inputAudio(const std::shared_ptr<packet> &rtp) {
|
||||
if (_policy.isFlushAble(_policy.getStamp(rtp), _cache->size())) {
|
||||
flushAll();
|
||||
}
|
||||
//追加数据到最后
|
||||
_cache->emplace_back(rtp);
|
||||
}
|
||||
|
||||
virtual void onFlushAudio(std::shared_ptr<packet_list> &) = 0;
|
||||
|
||||
private:
|
||||
void flushAll() {
|
||||
if (_cache->empty()) {
|
||||
return;
|
||||
}
|
||||
onFlushAudio(_cache);
|
||||
_cache = std::make_shared<packet_list>();
|
||||
}
|
||||
|
||||
private:
|
||||
policy _policy;
|
||||
std::shared_ptr<packet_list> _cache;
|
||||
};
|
||||
|
||||
} /* namespace mediakit */
|
||||
|
||||
|
||||
#endif //ZLMEDIAKIT_MEDIASOURCE_H
|
||||
#endif //ZLMEDIAKIT_MEDIASOURCE_H
|
@ -33,9 +33,6 @@ using namespace toolkit;
|
||||
#define RTMP_GOP_SIZE 512
|
||||
namespace mediakit {
|
||||
|
||||
typedef VideoPacketCache<RtmpPacket> RtmpVideoCache;
|
||||
typedef AudioPacketCache<RtmpPacket> RtmpAudioCache;
|
||||
|
||||
/**
|
||||
* rtmp媒体源的数据抽象
|
||||
* rtmp有关键的三要素,分别是metadata、config帧,普通帧
|
||||
@ -43,7 +40,7 @@ typedef AudioPacketCache<RtmpPacket> RtmpAudioCache;
|
||||
* 只要生成了这三要素,那么要实现rtmp推流、rtmp服务器就很简单了
|
||||
* rtmp推拉流协议中,先传递metadata,然后传递config帧,然后一直传递普通帧
|
||||
*/
|
||||
class RtmpMediaSource : public MediaSource, public RingDelegate<RtmpPacket::Ptr>, public RtmpVideoCache, public RtmpAudioCache{
|
||||
class RtmpMediaSource : public MediaSource, public RingDelegate<RtmpPacket::Ptr>, public PacketCache<RtmpPacket>{
|
||||
public:
|
||||
typedef std::shared_ptr<RtmpMediaSource> Ptr;
|
||||
typedef std::shared_ptr<List<RtmpPacket::Ptr> > RingDataType;
|
||||
@ -149,12 +146,7 @@ public:
|
||||
regist();
|
||||
}
|
||||
}
|
||||
|
||||
if(pkt->typeId == MSG_VIDEO){
|
||||
RtmpVideoCache::inputVideo(pkt, key);
|
||||
}else{
|
||||
RtmpAudioCache::inputAudio(pkt);
|
||||
}
|
||||
PacketCache<RtmpPacket>::inputPacket(pkt->typeId == MSG_VIDEO, pkt, key);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -175,21 +167,13 @@ public:
|
||||
private:
|
||||
|
||||
/**
|
||||
* 批量flush时间戳相同的视频rtmp包时触发该函数
|
||||
* @param rtmp_list 时间戳相同的rtmp包列表
|
||||
* 批量flush rtmp包时触发该函数
|
||||
* @param rtmp_list rtmp包列表
|
||||
* @param key_pos 是否包含关键帧
|
||||
*/
|
||||
void onFlushVideo(std::shared_ptr<List<RtmpPacket::Ptr> > &rtmp_list, bool key_pos) override {
|
||||
_ring->write(rtmp_list, key_pos);
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量flush一定数量的音频rtmp包时触发该函数
|
||||
* @param rtmp_list rtmp包列表
|
||||
*/
|
||||
void onFlushAudio(std::shared_ptr<List<RtmpPacket::Ptr> > &rtmp_list) override{
|
||||
//只有音频的话,就不存在gop缓存的意义
|
||||
_ring->write(rtmp_list, !_have_video);
|
||||
void onFlush(std::shared_ptr<List<RtmpPacket::Ptr> > &rtmp_list, bool key_pos) override {
|
||||
//如果不存在视频,那么就没有存在GOP缓存的意义,所以is_key一直为true确保一直清空GOP缓存
|
||||
_ring->write(rtmp_list, _have_video ? key_pos : true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -30,16 +30,13 @@ using namespace toolkit;
|
||||
#define RTP_GOP_SIZE 512
|
||||
namespace mediakit {
|
||||
|
||||
typedef VideoPacketCache<RtpPacket> RtpVideoCache;
|
||||
typedef AudioPacketCache<RtpPacket> RtpAudioCache;
|
||||
|
||||
/**
|
||||
/**
|
||||
* rtsp媒体源的数据抽象
|
||||
* rtsp有关键的两要素,分别是sdp、rtp包
|
||||
* 只要生成了这两要素,那么要实现rtsp推流、rtsp服务器就很简单了
|
||||
* rtsp推拉流协议中,先传递sdp,然后再协商传输方式(tcp/udp/组播),最后一直传递rtp
|
||||
*/
|
||||
class RtspMediaSource : public MediaSource, public RingDelegate<RtpPacket::Ptr>, public RtpVideoCache, public RtpAudioCache {
|
||||
class RtspMediaSource : public MediaSource, public RingDelegate<RtpPacket::Ptr>, public PacketCache<RtpPacket> {
|
||||
public:
|
||||
typedef ResourcePool<RtpPacket> PoolType;
|
||||
typedef std::shared_ptr<RtspMediaSource> Ptr;
|
||||
@ -175,32 +172,19 @@ public:
|
||||
regist();
|
||||
}
|
||||
}
|
||||
|
||||
if(rtp->type == TrackVideo){
|
||||
RtpVideoCache::inputVideo(rtp, keyPos);
|
||||
}else{
|
||||
RtpAudioCache::inputAudio(rtp);
|
||||
}
|
||||
PacketCache<RtpPacket>::inputPacket(rtp->type == TrackVideo, rtp, keyPos);
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
/**
|
||||
* 批量flush时间戳相同的视频rtp包时触发该函数
|
||||
* @param rtp_list 时间戳相同的rtp包列表
|
||||
* 批量flush rtp包时触发该函数
|
||||
* @param rtp_list rtp包列表
|
||||
* @param key_pos 是否包含关键帧
|
||||
*/
|
||||
void onFlushVideo(std::shared_ptr<List<RtpPacket::Ptr> > &rtp_list, bool key_pos) override {
|
||||
_ring->write(rtp_list, key_pos);
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量flush一定数量的音频rtp包时触发该函数
|
||||
* @param rtp_list rtp包列表
|
||||
*/
|
||||
void onFlushAudio(std::shared_ptr<List<RtpPacket::Ptr> > &rtp_list) override{
|
||||
//只有音频的话,就不存在gop缓存的意义
|
||||
_ring->write(rtp_list, !_have_video);
|
||||
void onFlush(std::shared_ptr<List<RtpPacket::Ptr> > &rtp_list, bool key_pos) override {
|
||||
//如果不存在视频,那么就没有存在GOP缓存的意义,所以is_key一直为true确保一直清空GOP缓存
|
||||
_ring->write(rtp_list, _have_video ? key_pos : true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
Reference in New Issue
Block a user