1 /*
2  *  Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "modules/pacing/round_robin_packet_queue.h"
12 
13 #include <algorithm>
14 #include <cstdint>
15 #include <utility>
16 
17 #include "absl/strings/match.h"
18 #include "rtc_base/checks.h"
19 
20 namespace webrtc {
21 namespace {
22 static constexpr DataSize kMaxLeadingSize = DataSize::Bytes(1400);
23 }
24 
25 RoundRobinPacketQueue::QueuedPacket::QueuedPacket(const QueuedPacket& rhs) =
26     default;
27 RoundRobinPacketQueue::QueuedPacket::~QueuedPacket() = default;
28 
QueuedPacket(int priority,Timestamp enqueue_time,uint64_t enqueue_order,std::multiset<Timestamp>::iterator enqueue_time_it,std::unique_ptr<RtpPacketToSend> packet)29 RoundRobinPacketQueue::QueuedPacket::QueuedPacket(
30     int priority,
31     Timestamp enqueue_time,
32     uint64_t enqueue_order,
33     std::multiset<Timestamp>::iterator enqueue_time_it,
34     std::unique_ptr<RtpPacketToSend> packet)
35     : priority_(priority),
36       enqueue_time_(enqueue_time),
37       enqueue_order_(enqueue_order),
38       is_retransmission_(packet->packet_type() ==
39                          RtpPacketMediaType::kRetransmission),
40       enqueue_time_it_(enqueue_time_it),
41       owned_packet_(packet.release()) {}
42 
operator <(const RoundRobinPacketQueue::QueuedPacket & other) const43 bool RoundRobinPacketQueue::QueuedPacket::operator<(
44     const RoundRobinPacketQueue::QueuedPacket& other) const {
45   if (priority_ != other.priority_)
46     return priority_ > other.priority_;
47   if (is_retransmission_ != other.is_retransmission_)
48     return other.is_retransmission_;
49 
50   return enqueue_order_ > other.enqueue_order_;
51 }
52 
Priority() const53 int RoundRobinPacketQueue::QueuedPacket::Priority() const {
54   return priority_;
55 }
56 
Type() const57 RtpPacketMediaType RoundRobinPacketQueue::QueuedPacket::Type() const {
58   return *owned_packet_->packet_type();
59 }
60 
Ssrc() const61 uint32_t RoundRobinPacketQueue::QueuedPacket::Ssrc() const {
62   return owned_packet_->Ssrc();
63 }
64 
EnqueueTime() const65 Timestamp RoundRobinPacketQueue::QueuedPacket::EnqueueTime() const {
66   return enqueue_time_;
67 }
68 
IsRetransmission() const69 bool RoundRobinPacketQueue::QueuedPacket::IsRetransmission() const {
70   return Type() == RtpPacketMediaType::kRetransmission;
71 }
72 
EnqueueOrder() const73 uint64_t RoundRobinPacketQueue::QueuedPacket::EnqueueOrder() const {
74   return enqueue_order_;
75 }
76 
RtpPacket() const77 RtpPacketToSend* RoundRobinPacketQueue::QueuedPacket::RtpPacket() const {
78   return owned_packet_;
79 }
80 
UpdateEnqueueTimeIterator(std::multiset<Timestamp>::iterator it)81 void RoundRobinPacketQueue::QueuedPacket::UpdateEnqueueTimeIterator(
82     std::multiset<Timestamp>::iterator it) {
83   enqueue_time_it_ = it;
84 }
85 
86 std::multiset<Timestamp>::iterator
EnqueueTimeIterator() const87 RoundRobinPacketQueue::QueuedPacket::EnqueueTimeIterator() const {
88   return enqueue_time_it_;
89 }
90 
SubtractPauseTime(TimeDelta pause_time_sum)91 void RoundRobinPacketQueue::QueuedPacket::SubtractPauseTime(
92     TimeDelta pause_time_sum) {
93   enqueue_time_ -= pause_time_sum;
94 }
95 
96 RoundRobinPacketQueue::PriorityPacketQueue::const_iterator
begin() const97 RoundRobinPacketQueue::PriorityPacketQueue::begin() const {
98   return c.begin();
99 }
100 
101 RoundRobinPacketQueue::PriorityPacketQueue::const_iterator
end() const102 RoundRobinPacketQueue::PriorityPacketQueue::end() const {
103   return c.end();
104 }
105 
Stream()106 RoundRobinPacketQueue::Stream::Stream() : size(DataSize::Zero()), ssrc(0) {}
107 RoundRobinPacketQueue::Stream::Stream(const Stream& stream) = default;
108 RoundRobinPacketQueue::Stream::~Stream() = default;
109 
IsEnabled(const WebRtcKeyValueConfig * field_trials,const char * name)110 bool IsEnabled(const WebRtcKeyValueConfig* field_trials, const char* name) {
111   if (!field_trials) {
112     return false;
113   }
114   return absl::StartsWith(field_trials->Lookup(name), "Enabled");
115 }
116 
RoundRobinPacketQueue(Timestamp start_time,const WebRtcKeyValueConfig * field_trials)117 RoundRobinPacketQueue::RoundRobinPacketQueue(
118     Timestamp start_time,
119     const WebRtcKeyValueConfig* field_trials)
120     : transport_overhead_per_packet_(DataSize::Zero()),
121       time_last_updated_(start_time),
122       paused_(false),
123       size_packets_(0),
124       size_(DataSize::Zero()),
125       max_size_(kMaxLeadingSize),
126       queue_time_sum_(TimeDelta::Zero()),
127       pause_time_sum_(TimeDelta::Zero()),
128       include_overhead_(false) {}
129 
~RoundRobinPacketQueue()130 RoundRobinPacketQueue::~RoundRobinPacketQueue() {
131   // Make sure to release any packets owned by raw pointer in QueuedPacket.
132   while (!Empty()) {
133     Pop();
134   }
135 }
136 
Push(int priority,Timestamp enqueue_time,uint64_t enqueue_order,std::unique_ptr<RtpPacketToSend> packet)137 void RoundRobinPacketQueue::Push(int priority,
138                                  Timestamp enqueue_time,
139                                  uint64_t enqueue_order,
140                                  std::unique_ptr<RtpPacketToSend> packet) {
141   RTC_DCHECK(packet->packet_type().has_value());
142   if (size_packets_ == 0) {
143     // Single packet fast-path.
144     single_packet_queue_.emplace(
145         QueuedPacket(priority, enqueue_time, enqueue_order,
146                      enqueue_times_.end(), std::move(packet)));
147     UpdateQueueTime(enqueue_time);
148     single_packet_queue_->SubtractPauseTime(pause_time_sum_);
149     size_packets_ = 1;
150     size_ += PacketSize(*single_packet_queue_);
151   } else {
152     MaybePromoteSinglePacketToNormalQueue();
153     Push(QueuedPacket(priority, enqueue_time, enqueue_order,
154                       enqueue_times_.insert(enqueue_time), std::move(packet)));
155   }
156 }
157 
Pop()158 std::unique_ptr<RtpPacketToSend> RoundRobinPacketQueue::Pop() {
159   if (single_packet_queue_.has_value()) {
160     RTC_DCHECK(stream_priorities_.empty());
161     std::unique_ptr<RtpPacketToSend> rtp_packet(
162         single_packet_queue_->RtpPacket());
163     single_packet_queue_.reset();
164     queue_time_sum_ = TimeDelta::Zero();
165     size_packets_ = 0;
166     size_ = DataSize::Zero();
167     return rtp_packet;
168   }
169 
170   RTC_DCHECK(!Empty());
171   Stream* stream = GetHighestPriorityStream();
172   const QueuedPacket& queued_packet = stream->packet_queue.top();
173 
174   stream_priorities_.erase(stream->priority_it);
175 
176   // Calculate the total amount of time spent by this packet in the queue
177   // while in a non-paused state. Note that the |pause_time_sum_ms_| was
178   // subtracted from |packet.enqueue_time_ms| when the packet was pushed, and
179   // by subtracting it now we effectively remove the time spent in in the
180   // queue while in a paused state.
181   TimeDelta time_in_non_paused_state =
182       time_last_updated_ - queued_packet.EnqueueTime() - pause_time_sum_;
183   queue_time_sum_ -= time_in_non_paused_state;
184 
185   RTC_CHECK(queued_packet.EnqueueTimeIterator() != enqueue_times_.end());
186   enqueue_times_.erase(queued_packet.EnqueueTimeIterator());
187 
188   // Update |bytes| of this stream. The general idea is that the stream that
189   // has sent the least amount of bytes should have the highest priority.
190   // The problem with that is if streams send with different rates, in which
191   // case a "budget" will be built up for the stream sending at the lower
192   // rate. To avoid building a too large budget we limit |bytes| to be within
193   // kMaxLeading bytes of the stream that has sent the most amount of bytes.
194   DataSize packet_size = PacketSize(queued_packet);
195   stream->size =
196       std::max(stream->size + packet_size, max_size_ - kMaxLeadingSize);
197   max_size_ = std::max(max_size_, stream->size);
198 
199   size_ -= packet_size;
200   size_packets_ -= 1;
201   RTC_CHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());
202 
203   std::unique_ptr<RtpPacketToSend> rtp_packet(queued_packet.RtpPacket());
204   stream->packet_queue.pop();
205 
206   // If there are packets left to be sent, schedule the stream again.
207   RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
208   if (stream->packet_queue.empty()) {
209     stream->priority_it = stream_priorities_.end();
210   } else {
211     int priority = stream->packet_queue.top().Priority();
212     stream->priority_it = stream_priorities_.emplace(
213         StreamPrioKey(priority, stream->size), stream->ssrc);
214   }
215 
216   return rtp_packet;
217 }
218 
Empty() const219 bool RoundRobinPacketQueue::Empty() const {
220   if (size_packets_ == 0) {
221     RTC_DCHECK(!single_packet_queue_.has_value() && stream_priorities_.empty());
222     return true;
223   }
224   RTC_DCHECK(single_packet_queue_.has_value() || !stream_priorities_.empty());
225   return false;
226 }
227 
SizeInPackets() const228 size_t RoundRobinPacketQueue::SizeInPackets() const {
229   return size_packets_;
230 }
231 
Size() const232 DataSize RoundRobinPacketQueue::Size() const {
233   return size_;
234 }
235 
LeadingAudioPacketEnqueueTime() const236 absl::optional<Timestamp> RoundRobinPacketQueue::LeadingAudioPacketEnqueueTime()
237     const {
238   if (single_packet_queue_.has_value()) {
239     if (single_packet_queue_->Type() == RtpPacketMediaType::kAudio) {
240       return single_packet_queue_->EnqueueTime();
241     }
242     return absl::nullopt;
243   }
244 
245   if (stream_priorities_.empty()) {
246     return absl::nullopt;
247   }
248   uint32_t ssrc = stream_priorities_.begin()->second;
249 
250   const auto& top_packet = streams_.find(ssrc)->second.packet_queue.top();
251   if (top_packet.Type() == RtpPacketMediaType::kAudio) {
252     return top_packet.EnqueueTime();
253   }
254   return absl::nullopt;
255 }
256 
OldestEnqueueTime() const257 Timestamp RoundRobinPacketQueue::OldestEnqueueTime() const {
258   if (single_packet_queue_.has_value()) {
259     return single_packet_queue_->EnqueueTime();
260   }
261 
262   if (Empty())
263     return Timestamp::MinusInfinity();
264   RTC_CHECK(!enqueue_times_.empty());
265   return *enqueue_times_.begin();
266 }
267 
UpdateQueueTime(Timestamp now)268 void RoundRobinPacketQueue::UpdateQueueTime(Timestamp now) {
269   RTC_CHECK_GE(now, time_last_updated_);
270   if (now == time_last_updated_)
271     return;
272 
273   TimeDelta delta = now - time_last_updated_;
274 
275   if (paused_) {
276     pause_time_sum_ += delta;
277   } else {
278     queue_time_sum_ += TimeDelta::Micros(delta.us() * size_packets_);
279   }
280 
281   time_last_updated_ = now;
282 }
283 
SetPauseState(bool paused,Timestamp now)284 void RoundRobinPacketQueue::SetPauseState(bool paused, Timestamp now) {
285   if (paused_ == paused)
286     return;
287   UpdateQueueTime(now);
288   paused_ = paused;
289 }
290 
SetIncludeOverhead()291 void RoundRobinPacketQueue::SetIncludeOverhead() {
292   MaybePromoteSinglePacketToNormalQueue();
293   include_overhead_ = true;
294   // We need to update the size to reflect overhead for existing packets.
295   for (const auto& stream : streams_) {
296     for (const QueuedPacket& packet : stream.second.packet_queue) {
297       size_ += DataSize::Bytes(packet.RtpPacket()->headers_size()) +
298                transport_overhead_per_packet_;
299     }
300   }
301 }
302 
SetTransportOverhead(DataSize overhead_per_packet)303 void RoundRobinPacketQueue::SetTransportOverhead(DataSize overhead_per_packet) {
304   MaybePromoteSinglePacketToNormalQueue();
305   if (include_overhead_) {
306     DataSize previous_overhead = transport_overhead_per_packet_;
307     // We need to update the size to reflect overhead for existing packets.
308     for (const auto& stream : streams_) {
309       int packets = stream.second.packet_queue.size();
310       size_ -= packets * previous_overhead;
311       size_ += packets * overhead_per_packet;
312     }
313   }
314   transport_overhead_per_packet_ = overhead_per_packet;
315 }
316 
AverageQueueTime() const317 TimeDelta RoundRobinPacketQueue::AverageQueueTime() const {
318   if (Empty())
319     return TimeDelta::Zero();
320   return queue_time_sum_ / size_packets_;
321 }
322 
Push(QueuedPacket packet)323 void RoundRobinPacketQueue::Push(QueuedPacket packet) {
324   auto stream_info_it = streams_.find(packet.Ssrc());
325   if (stream_info_it == streams_.end()) {
326     stream_info_it = streams_.emplace(packet.Ssrc(), Stream()).first;
327     stream_info_it->second.priority_it = stream_priorities_.end();
328     stream_info_it->second.ssrc = packet.Ssrc();
329   }
330 
331   Stream* stream = &stream_info_it->second;
332 
333   if (stream->priority_it == stream_priorities_.end()) {
334     // If the SSRC is not currently scheduled, add it to |stream_priorities_|.
335     RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
336     stream->priority_it = stream_priorities_.emplace(
337         StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());
338   } else if (packet.Priority() < stream->priority_it->first.priority) {
339     // If the priority of this SSRC increased, remove the outdated StreamPrioKey
340     // and insert a new one with the new priority. Note that |priority_| uses
341     // lower ordinal for higher priority.
342     stream_priorities_.erase(stream->priority_it);
343     stream->priority_it = stream_priorities_.emplace(
344         StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());
345   }
346   RTC_CHECK(stream->priority_it != stream_priorities_.end());
347 
348   if (packet.EnqueueTimeIterator() == enqueue_times_.end()) {
349     // Promotion from single-packet queue. Just add to enqueue times.
350     packet.UpdateEnqueueTimeIterator(
351         enqueue_times_.insert(packet.EnqueueTime()));
352   } else {
353     // In order to figure out how much time a packet has spent in the queue
354     // while not in a paused state, we subtract the total amount of time the
355     // queue has been paused so far, and when the packet is popped we subtract
356     // the total amount of time the queue has been paused at that moment. This
357     // way we subtract the total amount of time the packet has spent in the
358     // queue while in a paused state.
359     UpdateQueueTime(packet.EnqueueTime());
360     packet.SubtractPauseTime(pause_time_sum_);
361 
362     size_packets_ += 1;
363     size_ += PacketSize(packet);
364   }
365 
366   stream->packet_queue.push(packet);
367 }
368 
PacketSize(const QueuedPacket & packet) const369 DataSize RoundRobinPacketQueue::PacketSize(const QueuedPacket& packet) const {
370   DataSize packet_size = DataSize::Bytes(packet.RtpPacket()->payload_size() +
371                                          packet.RtpPacket()->padding_size());
372   if (include_overhead_) {
373     packet_size += DataSize::Bytes(packet.RtpPacket()->headers_size()) +
374                    transport_overhead_per_packet_;
375   }
376   return packet_size;
377 }
378 
MaybePromoteSinglePacketToNormalQueue()379 void RoundRobinPacketQueue::MaybePromoteSinglePacketToNormalQueue() {
380   if (single_packet_queue_.has_value()) {
381     Push(*single_packet_queue_);
382     single_packet_queue_.reset();
383   }
384 }
385 
386 RoundRobinPacketQueue::Stream*
GetHighestPriorityStream()387 RoundRobinPacketQueue::GetHighestPriorityStream() {
388   RTC_CHECK(!stream_priorities_.empty());
389   uint32_t ssrc = stream_priorities_.begin()->second;
390 
391   auto stream_info_it = streams_.find(ssrc);
392   RTC_CHECK(stream_info_it != streams_.end());
393   RTC_CHECK(stream_info_it->second.priority_it == stream_priorities_.begin());
394   RTC_CHECK(!stream_info_it->second.packet_queue.empty());
395   return &stream_info_it->second;
396 }
397 
IsSsrcScheduled(uint32_t ssrc) const398 bool RoundRobinPacketQueue::IsSsrcScheduled(uint32_t ssrc) const {
399   for (const auto& scheduled_stream : stream_priorities_) {
400     if (scheduled_stream.second == ssrc)
401       return true;
402   }
403   return false;
404 }
405 
406 }  // namespace webrtc
407