webrtc源码分析 pacer代码流程
作者:互联网
看流程之前先看理论
数据流
1、入队列流程
1.1 入队列流程
RTPSenderVideo::LogAndSendToNetwork
RTPSender::EnqueuePackets
PacedSender::EnqueuePackets
PacingController::SetPacingRates
PacingController::EnqueuePacketInternal
RoundRobinPacketQueue::Push //放入队列
1.2 发送流程
2、分析
2.1数据完成rtp封装后传入PacedSender
void PacedSender::EnqueuePackets(
std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
{
......
//视频帧会拆分成多个rtp包
for (auto& packet : packets) {
RTC_DCHECK_GE(packet->capture_time_ms(), 0);
pacing_controller_.EnqueuePacket(std::move(packet));
}
}
MaybeWakupProcessThread();
}
2.2获取数据优先级
void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
RTC_DCHECK(pacing_bitrate_ > DataRate::Zero())
<< "SetPacingRate must be called before InsertPacket.";
RTC_CHECK(packet->packet_type());
// Get priority first and store in temporary, to avoid chance of object being
// moved before GetPriorityForType() being called.
//优先级
const int priority = GetPriorityForType(*packet->packet_type());
EnqueuePacketInternal(std::move(packet), priority);
}
priority 数字越小优先级越高
int GetPriorityForType(RtpPacketMediaType type) {
// Lower number takes priority over higher.
switch (type) {
case RtpPacketMediaType::kAudio:
// Audio is always prioritized over other packet types.
return kFirstPriority + 1;
case RtpPacketMediaType::kRetransmission:
// Send retransmissions before new media.
return kFirstPriority + 2;
case RtpPacketMediaType::kVideo:
case RtpPacketMediaType::kForwardErrorCorrection:
// Video has "normal" priority, in the old speak.
// Send redundancy concurrently to video. If it is delayed it might have a
// lower chance of being useful.
return kFirstPriority + 3;
case RtpPacketMediaType::kPadding:
// Packets that are in themselves likely useless, only sent to keep the
// BWE high.
return kFirstPriority + 4;
}
RTC_CHECK_NOTREACHED();
}
发送顺序
- 优先级高的报文排在fifo的前面,低的排在后面。
- 首先判断报文的priority等级,等级越小的优先级越高(priority等级根据报文类型进行分类)。
- 然后判断重发标示,重发的报文比普通报文的优先级更高
- 最后是判断视频帧timestamp,越早的视频帧优先级更高。
2.3 放入队列
void PacingController::EnqueuePacketInternal(
std::unique_ptr<RtpPacketToSend> packet,
int priority) {
//统计发送带宽
prober_.OnIncomingPacket(DataSize::Bytes(packet->payload_size()));
Timestamp now = CurrentTime();
if (mode_ == ProcessMode::kDynamic && packet_queue_.Empty() &&
NextSendTime() <= now) {
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
UpdateBudgetWithElapsedTime(elapsed_time);
}
//放入队列
packet_queue_.Push(priority, now, packet_counter_++, std::move(packet));
}
2.4 rtp包是按照流分队列存储
void RoundRobinPacketQueue::Push(QueuedPacket packet) {
auto stream_info_it = streams_.find(packet.Ssrc());
if (stream_info_it == streams_.end()) {
stream_info_it = streams_.emplace(packet.Ssrc(), Stream()).first;
stream_info_it->second.priority_it = stream_priorities_.end();
stream_info_it->second.ssrc = packet.Ssrc();
}
Stream* stream = &stream_info_it->second;
if (stream->priority_it == stream_priorities_.end()) {
// If the SSRC is not currently scheduled, add it to `stream_priorities_`.
RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
stream->priority_it = stream_priorities_.emplace(
StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());
} else if (packet.Priority() < stream->priority_it->first.priority) {
// If the priority of this SSRC increased, remove the outdated StreamPrioKey
// and insert a new one with the new priority. Note that `priority_` uses
// lower ordinal for higher priority.
stream_priorities_.erase(stream->priority_it);
stream->priority_it = stream_priorities_.emplace(
StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());
}
RTC_CHECK(stream->priority_it != stream_priorities_.end());
if (packet.EnqueueTimeIterator() == enqueue_times_.end()) {
// Promotion from single-packet queue. Just add to enqueue times.
packet.UpdateEnqueueTimeIterator(
enqueue_times_.insert(packet.EnqueueTime()));
} else {
// In order to figure out how much time a packet has spent in the queue
// while not in a paused state, we subtract the total amount of time the
// queue has been paused so far, and when the packet is popped we subtract
// the total amount of time the queue has been paused at that moment. This
// way we subtract the total amount of time the packet has spent in the
// queue while in a paused state.
UpdateQueueTime(packet.EnqueueTime());
packet.SubtractPauseTime(pause_time_sum_);
size_packets_ += 1;
size_ += PacketSize(packet);
}
stream->packet_queue.push(packet);
}
2、发送流程
PacedSender::Process()
PacingController::ProcessPackets()
PacingController::GetPendingPacket()
PacketRouter::SendPacket()
ModuleRtpRtcpImpl2::TrySendPacket()
RtpSenderEgress::SendPacket()
RtpSenderEgress::SendPacketToNetwork()
LayerFilteringTransport::SendRtp()
标签:queue,stream,pacer,packet,priority,_.,源码,time,webrtc 来源: https://blog.csdn.net/liuhongxiangm/article/details/123231085