1 /*
2  *  Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "modules/rtp_rtcp/source/rtp_packet_history.h"
12 
13 #include <algorithm>
14 #include <limits>
15 #include <memory>
16 #include <utility>
17 
18 #include "modules/include/module_common_types_public.h"
19 #include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
20 #include "rtc_base/checks.h"
21 #include "rtc_base/logging.h"
22 #include "system_wrappers/include/clock.h"
23 
24 namespace webrtc {
25 
26 constexpr size_t RtpPacketHistory::kMaxCapacity;
27 constexpr size_t RtpPacketHistory::kMaxPaddingtHistory;
28 constexpr int64_t RtpPacketHistory::kMinPacketDurationMs;
29 constexpr int RtpPacketHistory::kMinPacketDurationRtt;
30 constexpr int RtpPacketHistory::kPacketCullingDelayFactor;
31 
32 RtpPacketHistory::PacketState::PacketState() = default;
33 RtpPacketHistory::PacketState::PacketState(const PacketState&) = default;
34 RtpPacketHistory::PacketState::~PacketState() = default;
35 
StoredPacket(std::unique_ptr<RtpPacketToSend> packet,absl::optional<int64_t> send_time_ms,uint64_t insert_order)36 RtpPacketHistory::StoredPacket::StoredPacket(
37     std::unique_ptr<RtpPacketToSend> packet,
38     absl::optional<int64_t> send_time_ms,
39     uint64_t insert_order)
40     : send_time_ms_(send_time_ms),
41       packet_(std::move(packet)),
42       // No send time indicates packet is not sent immediately, but instead will
43       // be put in the pacer queue and later retrieved via
44       // GetPacketAndSetSendTime().
45       pending_transmission_(!send_time_ms.has_value()),
46       insert_order_(insert_order),
47       times_retransmitted_(0) {}
48 
49 RtpPacketHistory::StoredPacket::StoredPacket(StoredPacket&&) = default;
50 RtpPacketHistory::StoredPacket& RtpPacketHistory::StoredPacket::operator=(
51     RtpPacketHistory::StoredPacket&&) = default;
52 RtpPacketHistory::StoredPacket::~StoredPacket() = default;
53 
IncrementTimesRetransmitted(PacketPrioritySet * priority_set)54 void RtpPacketHistory::StoredPacket::IncrementTimesRetransmitted(
55     PacketPrioritySet* priority_set) {
56   // Check if this StoredPacket is in the priority set. If so, we need to remove
57   // it before updating |times_retransmitted_| since that is used in sorting,
58   // and then add it back.
59   const bool in_priority_set = priority_set && priority_set->erase(this) > 0;
60   ++times_retransmitted_;
61   if (in_priority_set) {
62     auto it = priority_set->insert(this);
63     RTC_DCHECK(it.second)
64         << "ERROR: Priority set already contains matching packet! In set: "
65            "insert order = "
66         << (*it.first)->insert_order_
67         << ", times retransmitted = " << (*it.first)->times_retransmitted_
68         << ". Trying to add: insert order = " << insert_order_
69         << ", times retransmitted = " << times_retransmitted_;
70   }
71 }
72 
operator ()(StoredPacket * lhs,StoredPacket * rhs) const73 bool RtpPacketHistory::MoreUseful::operator()(StoredPacket* lhs,
74                                               StoredPacket* rhs) const {
75   // Prefer to send packets we haven't already sent as padding.
76   if (lhs->times_retransmitted() != rhs->times_retransmitted()) {
77     return lhs->times_retransmitted() < rhs->times_retransmitted();
78   }
79   // All else being equal, prefer newer packets.
80   return lhs->insert_order() > rhs->insert_order();
81 }
82 
RtpPacketHistory(Clock * clock,bool enable_padding_prio)83 RtpPacketHistory::RtpPacketHistory(Clock* clock, bool enable_padding_prio)
84     : clock_(clock),
85       enable_padding_prio_(enable_padding_prio),
86       number_to_store_(0),
87       mode_(StorageMode::kDisabled),
88       rtt_ms_(-1),
89       packets_inserted_(0) {}
90 
~RtpPacketHistory()91 RtpPacketHistory::~RtpPacketHistory() {}
92 
SetStorePacketsStatus(StorageMode mode,size_t number_to_store)93 void RtpPacketHistory::SetStorePacketsStatus(StorageMode mode,
94                                              size_t number_to_store) {
95   RTC_DCHECK_LE(number_to_store, kMaxCapacity);
96   MutexLock lock(&lock_);
97   if (mode != StorageMode::kDisabled && mode_ != StorageMode::kDisabled) {
98     RTC_LOG(LS_WARNING) << "Purging packet history in order to re-set status.";
99   }
100   Reset();
101   mode_ = mode;
102   number_to_store_ = std::min(kMaxCapacity, number_to_store);
103 }
104 
GetStorageMode() const105 RtpPacketHistory::StorageMode RtpPacketHistory::GetStorageMode() const {
106   MutexLock lock(&lock_);
107   return mode_;
108 }
109 
SetRtt(int64_t rtt_ms)110 void RtpPacketHistory::SetRtt(int64_t rtt_ms) {
111   MutexLock lock(&lock_);
112   RTC_DCHECK_GE(rtt_ms, 0);
113   rtt_ms_ = rtt_ms;
114   // If storage is not disabled,  packets will be removed after a timeout
115   // that depends on the RTT. Changing the RTT may thus cause some packets
116   // become "old" and subject to removal.
117   if (mode_ != StorageMode::kDisabled) {
118     CullOldPackets(clock_->TimeInMilliseconds());
119   }
120 }
121 
PutRtpPacket(std::unique_ptr<RtpPacketToSend> packet,absl::optional<int64_t> send_time_ms)122 void RtpPacketHistory::PutRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
123                                     absl::optional<int64_t> send_time_ms) {
124   RTC_DCHECK(packet);
125   MutexLock lock(&lock_);
126   int64_t now_ms = clock_->TimeInMilliseconds();
127   if (mode_ == StorageMode::kDisabled) {
128     return;
129   }
130 
131   RTC_DCHECK(packet->allow_retransmission());
132   CullOldPackets(now_ms);
133 
134   // Store packet.
135   const uint16_t rtp_seq_no = packet->SequenceNumber();
136   int packet_index = GetPacketIndex(rtp_seq_no);
137   if (packet_index >= 0u &&
138       static_cast<size_t>(packet_index) < packet_history_.size() &&
139       packet_history_[packet_index].packet_ != nullptr) {
140     RTC_LOG(LS_WARNING) << "Duplicate packet inserted: " << rtp_seq_no;
141     // Remove previous packet to avoid inconsistent state.
142     RemovePacket(packet_index);
143     packet_index = GetPacketIndex(rtp_seq_no);
144   }
145 
146   // Packet to be inserted ahead of first packet, expand front.
147   for (; packet_index < 0; ++packet_index) {
148     packet_history_.emplace_front(nullptr, absl::nullopt, 0);
149   }
150   // Packet to be inserted behind last packet, expand back.
151   while (static_cast<int>(packet_history_.size()) <= packet_index) {
152     packet_history_.emplace_back(nullptr, absl::nullopt, 0);
153   }
154 
155   RTC_DCHECK_GE(packet_index, 0);
156   RTC_DCHECK_LT(packet_index, packet_history_.size());
157   RTC_DCHECK(packet_history_[packet_index].packet_ == nullptr);
158 
159   packet_history_[packet_index] =
160       StoredPacket(std::move(packet), send_time_ms, packets_inserted_++);
161 
162   if (enable_padding_prio_) {
163     if (padding_priority_.size() >= kMaxPaddingtHistory - 1) {
164       padding_priority_.erase(std::prev(padding_priority_.end()));
165     }
166     auto prio_it = padding_priority_.insert(&packet_history_[packet_index]);
167     RTC_DCHECK(prio_it.second) << "Failed to insert packet into prio set.";
168   }
169 }
170 
GetPacketAndSetSendTime(uint16_t sequence_number)171 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPacketAndSetSendTime(
172     uint16_t sequence_number) {
173   MutexLock lock(&lock_);
174   if (mode_ == StorageMode::kDisabled) {
175     return nullptr;
176   }
177 
178   StoredPacket* packet = GetStoredPacket(sequence_number);
179   if (packet == nullptr) {
180     return nullptr;
181   }
182 
183   int64_t now_ms = clock_->TimeInMilliseconds();
184   if (!VerifyRtt(*packet, now_ms)) {
185     return nullptr;
186   }
187 
188   if (packet->send_time_ms_) {
189     packet->IncrementTimesRetransmitted(
190         enable_padding_prio_ ? &padding_priority_ : nullptr);
191   }
192 
193   // Update send-time and mark as no long in pacer queue.
194   packet->send_time_ms_ = now_ms;
195   packet->pending_transmission_ = false;
196 
197   // Return copy of packet instance since it may need to be retransmitted.
198   return std::make_unique<RtpPacketToSend>(*packet->packet_);
199 }
200 
GetPacketAndMarkAsPending(uint16_t sequence_number)201 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPacketAndMarkAsPending(
202     uint16_t sequence_number) {
203   return GetPacketAndMarkAsPending(
204       sequence_number, [](const RtpPacketToSend& packet) {
205         return std::make_unique<RtpPacketToSend>(packet);
206       });
207 }
208 
GetPacketAndMarkAsPending(uint16_t sequence_number,rtc::FunctionView<std::unique_ptr<RtpPacketToSend> (const RtpPacketToSend &)> encapsulate)209 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPacketAndMarkAsPending(
210     uint16_t sequence_number,
211     rtc::FunctionView<std::unique_ptr<RtpPacketToSend>(const RtpPacketToSend&)>
212         encapsulate) {
213   MutexLock lock(&lock_);
214   if (mode_ == StorageMode::kDisabled) {
215     return nullptr;
216   }
217 
218   StoredPacket* packet = GetStoredPacket(sequence_number);
219   if (packet == nullptr) {
220     return nullptr;
221   }
222 
223   if (packet->pending_transmission_) {
224     // Packet already in pacer queue, ignore this request.
225     return nullptr;
226   }
227 
228   if (!VerifyRtt(*packet, clock_->TimeInMilliseconds())) {
229     // Packet already resent within too short a time window, ignore.
230     return nullptr;
231   }
232 
233   // Copy and/or encapsulate packet.
234   std::unique_ptr<RtpPacketToSend> encapsulated_packet =
235       encapsulate(*packet->packet_);
236   if (encapsulated_packet) {
237     packet->pending_transmission_ = true;
238   }
239 
240   return encapsulated_packet;
241 }
242 
MarkPacketAsSent(uint16_t sequence_number)243 void RtpPacketHistory::MarkPacketAsSent(uint16_t sequence_number) {
244   MutexLock lock(&lock_);
245   if (mode_ == StorageMode::kDisabled) {
246     return;
247   }
248 
249   StoredPacket* packet = GetStoredPacket(sequence_number);
250   if (packet == nullptr) {
251     return;
252   }
253 
254   RTC_DCHECK(packet->send_time_ms_);
255 
256   // Update send-time, mark as no longer in pacer queue, and increment
257   // transmission count.
258   packet->send_time_ms_ = clock_->TimeInMilliseconds();
259   packet->pending_transmission_ = false;
260   packet->IncrementTimesRetransmitted(enable_padding_prio_ ? &padding_priority_
261                                                            : nullptr);
262 }
263 
GetPacketState(uint16_t sequence_number) const264 absl::optional<RtpPacketHistory::PacketState> RtpPacketHistory::GetPacketState(
265     uint16_t sequence_number) const {
266   MutexLock lock(&lock_);
267   if (mode_ == StorageMode::kDisabled) {
268     return absl::nullopt;
269   }
270 
271   int packet_index = GetPacketIndex(sequence_number);
272   if (packet_index < 0 ||
273       static_cast<size_t>(packet_index) >= packet_history_.size()) {
274     return absl::nullopt;
275   }
276   const StoredPacket& packet = packet_history_[packet_index];
277   if (packet.packet_ == nullptr) {
278     return absl::nullopt;
279   }
280 
281   if (!VerifyRtt(packet, clock_->TimeInMilliseconds())) {
282     return absl::nullopt;
283   }
284 
285   return StoredPacketToPacketState(packet);
286 }
287 
VerifyRtt(const RtpPacketHistory::StoredPacket & packet,int64_t now_ms) const288 bool RtpPacketHistory::VerifyRtt(const RtpPacketHistory::StoredPacket& packet,
289                                  int64_t now_ms) const {
290   if (packet.send_time_ms_) {
291     // Send-time already set, this check must be for a retransmission.
292     if (packet.times_retransmitted() > 0 &&
293         now_ms < *packet.send_time_ms_ + rtt_ms_) {
294       // This packet has already been retransmitted once, and the time since
295       // that even is lower than on RTT. Ignore request as this packet is
296       // likely already in the network pipe.
297       return false;
298     }
299   }
300 
301   return true;
302 }
303 
GetPayloadPaddingPacket()304 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPayloadPaddingPacket() {
305   // Default implementation always just returns a copy of the packet.
306   return GetPayloadPaddingPacket([](const RtpPacketToSend& packet) {
307     return std::make_unique<RtpPacketToSend>(packet);
308   });
309 }
310 
GetPayloadPaddingPacket(rtc::FunctionView<std::unique_ptr<RtpPacketToSend> (const RtpPacketToSend &)> encapsulate)311 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPayloadPaddingPacket(
312     rtc::FunctionView<std::unique_ptr<RtpPacketToSend>(const RtpPacketToSend&)>
313         encapsulate) {
314   MutexLock lock(&lock_);
315   if (mode_ == StorageMode::kDisabled) {
316     return nullptr;
317   }
318 
319   StoredPacket* best_packet = nullptr;
320   if (enable_padding_prio_ && !padding_priority_.empty()) {
321     auto best_packet_it = padding_priority_.begin();
322     best_packet = *best_packet_it;
323   } else if (!enable_padding_prio_ && !packet_history_.empty()) {
324     // Prioritization not available, pick the last packet.
325     for (auto it = packet_history_.rbegin(); it != packet_history_.rend();
326          ++it) {
327       if (it->packet_ != nullptr) {
328         best_packet = &(*it);
329         break;
330       }
331     }
332   }
333   if (best_packet == nullptr) {
334     return nullptr;
335   }
336 
337   if (best_packet->pending_transmission_) {
338     // Because PacedSender releases it's lock when it calls
339     // GeneratePadding() there is the potential for a race where a new
340     // packet ends up here instead of the regular transmit path. In such a
341     // case, just return empty and it will be picked up on the next
342     // Process() call.
343     return nullptr;
344   }
345 
346   auto padding_packet = encapsulate(*best_packet->packet_);
347   if (!padding_packet) {
348     return nullptr;
349   }
350 
351   best_packet->send_time_ms_ = clock_->TimeInMilliseconds();
352   best_packet->IncrementTimesRetransmitted(
353       enable_padding_prio_ ? &padding_priority_ : nullptr);
354 
355   return padding_packet;
356 }
357 
CullAcknowledgedPackets(rtc::ArrayView<const uint16_t> sequence_numbers)358 void RtpPacketHistory::CullAcknowledgedPackets(
359     rtc::ArrayView<const uint16_t> sequence_numbers) {
360   MutexLock lock(&lock_);
361   for (uint16_t sequence_number : sequence_numbers) {
362     int packet_index = GetPacketIndex(sequence_number);
363     if (packet_index < 0 ||
364         static_cast<size_t>(packet_index) >= packet_history_.size()) {
365       continue;
366     }
367     RemovePacket(packet_index);
368   }
369 }
370 
SetPendingTransmission(uint16_t sequence_number)371 bool RtpPacketHistory::SetPendingTransmission(uint16_t sequence_number) {
372   MutexLock lock(&lock_);
373   if (mode_ == StorageMode::kDisabled) {
374     return false;
375   }
376 
377   StoredPacket* packet = GetStoredPacket(sequence_number);
378   if (packet == nullptr) {
379     return false;
380   }
381 
382   packet->pending_transmission_ = true;
383   return true;
384 }
385 
Clear()386 void RtpPacketHistory::Clear() {
387   MutexLock lock(&lock_);
388   Reset();
389 }
390 
Reset()391 void RtpPacketHistory::Reset() {
392   packet_history_.clear();
393   padding_priority_.clear();
394 }
395 
CullOldPackets(int64_t now_ms)396 void RtpPacketHistory::CullOldPackets(int64_t now_ms) {
397   int64_t packet_duration_ms =
398       std::max(kMinPacketDurationRtt * rtt_ms_, kMinPacketDurationMs);
399   while (!packet_history_.empty()) {
400     if (packet_history_.size() >= kMaxCapacity) {
401       // We have reached the absolute max capacity, remove one packet
402       // unconditionally.
403       RemovePacket(0);
404       continue;
405     }
406 
407     const StoredPacket& stored_packet = packet_history_.front();
408     if (stored_packet.pending_transmission_) {
409       // Don't remove packets in the pacer queue, pending tranmission.
410       return;
411     }
412 
413     if (*stored_packet.send_time_ms_ + packet_duration_ms > now_ms) {
414       // Don't cull packets too early to avoid failed retransmission requests.
415       return;
416     }
417 
418     if (packet_history_.size() >= number_to_store_ ||
419         *stored_packet.send_time_ms_ +
420                 (packet_duration_ms * kPacketCullingDelayFactor) <=
421             now_ms) {
422       // Too many packets in history, or this packet has timed out. Remove it
423       // and continue.
424       RemovePacket(0);
425     } else {
426       // No more packets can be removed right now.
427       return;
428     }
429   }
430 }
431 
RemovePacket(int packet_index)432 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::RemovePacket(
433     int packet_index) {
434   // Move the packet out from the StoredPacket container.
435   std::unique_ptr<RtpPacketToSend> rtp_packet =
436       std::move(packet_history_[packet_index].packet_);
437 
438   // Erase from padding priority set, if eligible.
439   if (enable_padding_prio_) {
440     padding_priority_.erase(&packet_history_[packet_index]);
441   }
442 
443   if (packet_index == 0) {
444     while (!packet_history_.empty() &&
445            packet_history_.front().packet_ == nullptr) {
446       packet_history_.pop_front();
447     }
448   }
449 
450   return rtp_packet;
451 }
452 
GetPacketIndex(uint16_t sequence_number) const453 int RtpPacketHistory::GetPacketIndex(uint16_t sequence_number) const {
454   if (packet_history_.empty()) {
455     return 0;
456   }
457 
458   RTC_DCHECK(packet_history_.front().packet_ != nullptr);
459   int first_seq = packet_history_.front().packet_->SequenceNumber();
460   if (first_seq == sequence_number) {
461     return 0;
462   }
463 
464   int packet_index = sequence_number - first_seq;
465   constexpr int kSeqNumSpan = std::numeric_limits<uint16_t>::max() + 1;
466 
467   if (IsNewerSequenceNumber(sequence_number, first_seq)) {
468     if (sequence_number < first_seq) {
469       // Forward wrap.
470       packet_index += kSeqNumSpan;
471     }
472   } else if (sequence_number > first_seq) {
473     // Backwards wrap.
474     packet_index -= kSeqNumSpan;
475   }
476 
477   return packet_index;
478 }
479 
GetStoredPacket(uint16_t sequence_number)480 RtpPacketHistory::StoredPacket* RtpPacketHistory::GetStoredPacket(
481     uint16_t sequence_number) {
482   int index = GetPacketIndex(sequence_number);
483   if (index < 0 || static_cast<size_t>(index) >= packet_history_.size() ||
484       packet_history_[index].packet_ == nullptr) {
485     return nullptr;
486   }
487   return &packet_history_[index];
488 }
489 
StoredPacketToPacketState(const RtpPacketHistory::StoredPacket & stored_packet)490 RtpPacketHistory::PacketState RtpPacketHistory::StoredPacketToPacketState(
491     const RtpPacketHistory::StoredPacket& stored_packet) {
492   RtpPacketHistory::PacketState state;
493   state.rtp_sequence_number = stored_packet.packet_->SequenceNumber();
494   state.send_time_ms = stored_packet.send_time_ms_;
495   state.capture_time_ms = stored_packet.packet_->capture_time_ms();
496   state.ssrc = stored_packet.packet_->Ssrc();
497   state.packet_size = stored_packet.packet_->size();
498   state.times_retransmitted = stored_packet.times_retransmitted();
499   state.pending_transmission = stored_packet.pending_transmission_;
500   return state;
501 }
502 
503 }  // namespace webrtc
504