1 /*
2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "modules/rtp_rtcp/source/rtp_packet_history.h"
12
13 #include <algorithm>
14 #include <limits>
15 #include <memory>
16 #include <utility>
17
18 #include "modules/include/module_common_types_public.h"
19 #include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
20 #include "rtc_base/checks.h"
21 #include "rtc_base/logging.h"
22 #include "system_wrappers/include/clock.h"
23
24 namespace webrtc {
25
26 constexpr size_t RtpPacketHistory::kMaxCapacity;
27 constexpr size_t RtpPacketHistory::kMaxPaddingtHistory;
28 constexpr int64_t RtpPacketHistory::kMinPacketDurationMs;
29 constexpr int RtpPacketHistory::kMinPacketDurationRtt;
30 constexpr int RtpPacketHistory::kPacketCullingDelayFactor;
31
32 RtpPacketHistory::PacketState::PacketState() = default;
33 RtpPacketHistory::PacketState::PacketState(const PacketState&) = default;
34 RtpPacketHistory::PacketState::~PacketState() = default;
35
StoredPacket(std::unique_ptr<RtpPacketToSend> packet,absl::optional<int64_t> send_time_ms,uint64_t insert_order)36 RtpPacketHistory::StoredPacket::StoredPacket(
37 std::unique_ptr<RtpPacketToSend> packet,
38 absl::optional<int64_t> send_time_ms,
39 uint64_t insert_order)
40 : send_time_ms_(send_time_ms),
41 packet_(std::move(packet)),
42 // No send time indicates packet is not sent immediately, but instead will
43 // be put in the pacer queue and later retrieved via
44 // GetPacketAndSetSendTime().
45 pending_transmission_(!send_time_ms.has_value()),
46 insert_order_(insert_order),
47 times_retransmitted_(0) {}
48
49 RtpPacketHistory::StoredPacket::StoredPacket(StoredPacket&&) = default;
50 RtpPacketHistory::StoredPacket& RtpPacketHistory::StoredPacket::operator=(
51 RtpPacketHistory::StoredPacket&&) = default;
52 RtpPacketHistory::StoredPacket::~StoredPacket() = default;
53
IncrementTimesRetransmitted(PacketPrioritySet * priority_set)54 void RtpPacketHistory::StoredPacket::IncrementTimesRetransmitted(
55 PacketPrioritySet* priority_set) {
56 // Check if this StoredPacket is in the priority set. If so, we need to remove
57 // it before updating |times_retransmitted_| since that is used in sorting,
58 // and then add it back.
59 const bool in_priority_set = priority_set && priority_set->erase(this) > 0;
60 ++times_retransmitted_;
61 if (in_priority_set) {
62 auto it = priority_set->insert(this);
63 RTC_DCHECK(it.second)
64 << "ERROR: Priority set already contains matching packet! In set: "
65 "insert order = "
66 << (*it.first)->insert_order_
67 << ", times retransmitted = " << (*it.first)->times_retransmitted_
68 << ". Trying to add: insert order = " << insert_order_
69 << ", times retransmitted = " << times_retransmitted_;
70 }
71 }
72
operator ()(StoredPacket * lhs,StoredPacket * rhs) const73 bool RtpPacketHistory::MoreUseful::operator()(StoredPacket* lhs,
74 StoredPacket* rhs) const {
75 // Prefer to send packets we haven't already sent as padding.
76 if (lhs->times_retransmitted() != rhs->times_retransmitted()) {
77 return lhs->times_retransmitted() < rhs->times_retransmitted();
78 }
79 // All else being equal, prefer newer packets.
80 return lhs->insert_order() > rhs->insert_order();
81 }
82
RtpPacketHistory(Clock * clock,bool enable_padding_prio)83 RtpPacketHistory::RtpPacketHistory(Clock* clock, bool enable_padding_prio)
84 : clock_(clock),
85 enable_padding_prio_(enable_padding_prio),
86 number_to_store_(0),
87 mode_(StorageMode::kDisabled),
88 rtt_ms_(-1),
89 packets_inserted_(0) {}
90
~RtpPacketHistory()91 RtpPacketHistory::~RtpPacketHistory() {}
92
SetStorePacketsStatus(StorageMode mode,size_t number_to_store)93 void RtpPacketHistory::SetStorePacketsStatus(StorageMode mode,
94 size_t number_to_store) {
95 RTC_DCHECK_LE(number_to_store, kMaxCapacity);
96 MutexLock lock(&lock_);
97 if (mode != StorageMode::kDisabled && mode_ != StorageMode::kDisabled) {
98 RTC_LOG(LS_WARNING) << "Purging packet history in order to re-set status.";
99 }
100 Reset();
101 mode_ = mode;
102 number_to_store_ = std::min(kMaxCapacity, number_to_store);
103 }
104
GetStorageMode() const105 RtpPacketHistory::StorageMode RtpPacketHistory::GetStorageMode() const {
106 MutexLock lock(&lock_);
107 return mode_;
108 }
109
SetRtt(int64_t rtt_ms)110 void RtpPacketHistory::SetRtt(int64_t rtt_ms) {
111 MutexLock lock(&lock_);
112 RTC_DCHECK_GE(rtt_ms, 0);
113 rtt_ms_ = rtt_ms;
114 // If storage is not disabled, packets will be removed after a timeout
115 // that depends on the RTT. Changing the RTT may thus cause some packets
116 // become "old" and subject to removal.
117 if (mode_ != StorageMode::kDisabled) {
118 CullOldPackets(clock_->TimeInMilliseconds());
119 }
120 }
121
PutRtpPacket(std::unique_ptr<RtpPacketToSend> packet,absl::optional<int64_t> send_time_ms)122 void RtpPacketHistory::PutRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
123 absl::optional<int64_t> send_time_ms) {
124 RTC_DCHECK(packet);
125 MutexLock lock(&lock_);
126 int64_t now_ms = clock_->TimeInMilliseconds();
127 if (mode_ == StorageMode::kDisabled) {
128 return;
129 }
130
131 RTC_DCHECK(packet->allow_retransmission());
132 CullOldPackets(now_ms);
133
134 // Store packet.
135 const uint16_t rtp_seq_no = packet->SequenceNumber();
136 int packet_index = GetPacketIndex(rtp_seq_no);
137 if (packet_index >= 0u &&
138 static_cast<size_t>(packet_index) < packet_history_.size() &&
139 packet_history_[packet_index].packet_ != nullptr) {
140 RTC_LOG(LS_WARNING) << "Duplicate packet inserted: " << rtp_seq_no;
141 // Remove previous packet to avoid inconsistent state.
142 RemovePacket(packet_index);
143 packet_index = GetPacketIndex(rtp_seq_no);
144 }
145
146 // Packet to be inserted ahead of first packet, expand front.
147 for (; packet_index < 0; ++packet_index) {
148 packet_history_.emplace_front(nullptr, absl::nullopt, 0);
149 }
150 // Packet to be inserted behind last packet, expand back.
151 while (static_cast<int>(packet_history_.size()) <= packet_index) {
152 packet_history_.emplace_back(nullptr, absl::nullopt, 0);
153 }
154
155 RTC_DCHECK_GE(packet_index, 0);
156 RTC_DCHECK_LT(packet_index, packet_history_.size());
157 RTC_DCHECK(packet_history_[packet_index].packet_ == nullptr);
158
159 packet_history_[packet_index] =
160 StoredPacket(std::move(packet), send_time_ms, packets_inserted_++);
161
162 if (enable_padding_prio_) {
163 if (padding_priority_.size() >= kMaxPaddingtHistory - 1) {
164 padding_priority_.erase(std::prev(padding_priority_.end()));
165 }
166 auto prio_it = padding_priority_.insert(&packet_history_[packet_index]);
167 RTC_DCHECK(prio_it.second) << "Failed to insert packet into prio set.";
168 }
169 }
170
GetPacketAndSetSendTime(uint16_t sequence_number)171 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPacketAndSetSendTime(
172 uint16_t sequence_number) {
173 MutexLock lock(&lock_);
174 if (mode_ == StorageMode::kDisabled) {
175 return nullptr;
176 }
177
178 StoredPacket* packet = GetStoredPacket(sequence_number);
179 if (packet == nullptr) {
180 return nullptr;
181 }
182
183 int64_t now_ms = clock_->TimeInMilliseconds();
184 if (!VerifyRtt(*packet, now_ms)) {
185 return nullptr;
186 }
187
188 if (packet->send_time_ms_) {
189 packet->IncrementTimesRetransmitted(
190 enable_padding_prio_ ? &padding_priority_ : nullptr);
191 }
192
193 // Update send-time and mark as no long in pacer queue.
194 packet->send_time_ms_ = now_ms;
195 packet->pending_transmission_ = false;
196
197 // Return copy of packet instance since it may need to be retransmitted.
198 return std::make_unique<RtpPacketToSend>(*packet->packet_);
199 }
200
GetPacketAndMarkAsPending(uint16_t sequence_number)201 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPacketAndMarkAsPending(
202 uint16_t sequence_number) {
203 return GetPacketAndMarkAsPending(
204 sequence_number, [](const RtpPacketToSend& packet) {
205 return std::make_unique<RtpPacketToSend>(packet);
206 });
207 }
208
GetPacketAndMarkAsPending(uint16_t sequence_number,rtc::FunctionView<std::unique_ptr<RtpPacketToSend> (const RtpPacketToSend &)> encapsulate)209 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPacketAndMarkAsPending(
210 uint16_t sequence_number,
211 rtc::FunctionView<std::unique_ptr<RtpPacketToSend>(const RtpPacketToSend&)>
212 encapsulate) {
213 MutexLock lock(&lock_);
214 if (mode_ == StorageMode::kDisabled) {
215 return nullptr;
216 }
217
218 StoredPacket* packet = GetStoredPacket(sequence_number);
219 if (packet == nullptr) {
220 return nullptr;
221 }
222
223 if (packet->pending_transmission_) {
224 // Packet already in pacer queue, ignore this request.
225 return nullptr;
226 }
227
228 if (!VerifyRtt(*packet, clock_->TimeInMilliseconds())) {
229 // Packet already resent within too short a time window, ignore.
230 return nullptr;
231 }
232
233 // Copy and/or encapsulate packet.
234 std::unique_ptr<RtpPacketToSend> encapsulated_packet =
235 encapsulate(*packet->packet_);
236 if (encapsulated_packet) {
237 packet->pending_transmission_ = true;
238 }
239
240 return encapsulated_packet;
241 }
242
MarkPacketAsSent(uint16_t sequence_number)243 void RtpPacketHistory::MarkPacketAsSent(uint16_t sequence_number) {
244 MutexLock lock(&lock_);
245 if (mode_ == StorageMode::kDisabled) {
246 return;
247 }
248
249 StoredPacket* packet = GetStoredPacket(sequence_number);
250 if (packet == nullptr) {
251 return;
252 }
253
254 RTC_DCHECK(packet->send_time_ms_);
255
256 // Update send-time, mark as no longer in pacer queue, and increment
257 // transmission count.
258 packet->send_time_ms_ = clock_->TimeInMilliseconds();
259 packet->pending_transmission_ = false;
260 packet->IncrementTimesRetransmitted(enable_padding_prio_ ? &padding_priority_
261 : nullptr);
262 }
263
GetPacketState(uint16_t sequence_number) const264 absl::optional<RtpPacketHistory::PacketState> RtpPacketHistory::GetPacketState(
265 uint16_t sequence_number) const {
266 MutexLock lock(&lock_);
267 if (mode_ == StorageMode::kDisabled) {
268 return absl::nullopt;
269 }
270
271 int packet_index = GetPacketIndex(sequence_number);
272 if (packet_index < 0 ||
273 static_cast<size_t>(packet_index) >= packet_history_.size()) {
274 return absl::nullopt;
275 }
276 const StoredPacket& packet = packet_history_[packet_index];
277 if (packet.packet_ == nullptr) {
278 return absl::nullopt;
279 }
280
281 if (!VerifyRtt(packet, clock_->TimeInMilliseconds())) {
282 return absl::nullopt;
283 }
284
285 return StoredPacketToPacketState(packet);
286 }
287
VerifyRtt(const RtpPacketHistory::StoredPacket & packet,int64_t now_ms) const288 bool RtpPacketHistory::VerifyRtt(const RtpPacketHistory::StoredPacket& packet,
289 int64_t now_ms) const {
290 if (packet.send_time_ms_) {
291 // Send-time already set, this check must be for a retransmission.
292 if (packet.times_retransmitted() > 0 &&
293 now_ms < *packet.send_time_ms_ + rtt_ms_) {
294 // This packet has already been retransmitted once, and the time since
295 // that even is lower than on RTT. Ignore request as this packet is
296 // likely already in the network pipe.
297 return false;
298 }
299 }
300
301 return true;
302 }
303
GetPayloadPaddingPacket()304 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPayloadPaddingPacket() {
305 // Default implementation always just returns a copy of the packet.
306 return GetPayloadPaddingPacket([](const RtpPacketToSend& packet) {
307 return std::make_unique<RtpPacketToSend>(packet);
308 });
309 }
310
GetPayloadPaddingPacket(rtc::FunctionView<std::unique_ptr<RtpPacketToSend> (const RtpPacketToSend &)> encapsulate)311 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPayloadPaddingPacket(
312 rtc::FunctionView<std::unique_ptr<RtpPacketToSend>(const RtpPacketToSend&)>
313 encapsulate) {
314 MutexLock lock(&lock_);
315 if (mode_ == StorageMode::kDisabled) {
316 return nullptr;
317 }
318
319 StoredPacket* best_packet = nullptr;
320 if (enable_padding_prio_ && !padding_priority_.empty()) {
321 auto best_packet_it = padding_priority_.begin();
322 best_packet = *best_packet_it;
323 } else if (!enable_padding_prio_ && !packet_history_.empty()) {
324 // Prioritization not available, pick the last packet.
325 for (auto it = packet_history_.rbegin(); it != packet_history_.rend();
326 ++it) {
327 if (it->packet_ != nullptr) {
328 best_packet = &(*it);
329 break;
330 }
331 }
332 }
333 if (best_packet == nullptr) {
334 return nullptr;
335 }
336
337 if (best_packet->pending_transmission_) {
338 // Because PacedSender releases it's lock when it calls
339 // GeneratePadding() there is the potential for a race where a new
340 // packet ends up here instead of the regular transmit path. In such a
341 // case, just return empty and it will be picked up on the next
342 // Process() call.
343 return nullptr;
344 }
345
346 auto padding_packet = encapsulate(*best_packet->packet_);
347 if (!padding_packet) {
348 return nullptr;
349 }
350
351 best_packet->send_time_ms_ = clock_->TimeInMilliseconds();
352 best_packet->IncrementTimesRetransmitted(
353 enable_padding_prio_ ? &padding_priority_ : nullptr);
354
355 return padding_packet;
356 }
357
CullAcknowledgedPackets(rtc::ArrayView<const uint16_t> sequence_numbers)358 void RtpPacketHistory::CullAcknowledgedPackets(
359 rtc::ArrayView<const uint16_t> sequence_numbers) {
360 MutexLock lock(&lock_);
361 for (uint16_t sequence_number : sequence_numbers) {
362 int packet_index = GetPacketIndex(sequence_number);
363 if (packet_index < 0 ||
364 static_cast<size_t>(packet_index) >= packet_history_.size()) {
365 continue;
366 }
367 RemovePacket(packet_index);
368 }
369 }
370
SetPendingTransmission(uint16_t sequence_number)371 bool RtpPacketHistory::SetPendingTransmission(uint16_t sequence_number) {
372 MutexLock lock(&lock_);
373 if (mode_ == StorageMode::kDisabled) {
374 return false;
375 }
376
377 StoredPacket* packet = GetStoredPacket(sequence_number);
378 if (packet == nullptr) {
379 return false;
380 }
381
382 packet->pending_transmission_ = true;
383 return true;
384 }
385
Clear()386 void RtpPacketHistory::Clear() {
387 MutexLock lock(&lock_);
388 Reset();
389 }
390
Reset()391 void RtpPacketHistory::Reset() {
392 packet_history_.clear();
393 padding_priority_.clear();
394 }
395
CullOldPackets(int64_t now_ms)396 void RtpPacketHistory::CullOldPackets(int64_t now_ms) {
397 int64_t packet_duration_ms =
398 std::max(kMinPacketDurationRtt * rtt_ms_, kMinPacketDurationMs);
399 while (!packet_history_.empty()) {
400 if (packet_history_.size() >= kMaxCapacity) {
401 // We have reached the absolute max capacity, remove one packet
402 // unconditionally.
403 RemovePacket(0);
404 continue;
405 }
406
407 const StoredPacket& stored_packet = packet_history_.front();
408 if (stored_packet.pending_transmission_) {
409 // Don't remove packets in the pacer queue, pending tranmission.
410 return;
411 }
412
413 if (*stored_packet.send_time_ms_ + packet_duration_ms > now_ms) {
414 // Don't cull packets too early to avoid failed retransmission requests.
415 return;
416 }
417
418 if (packet_history_.size() >= number_to_store_ ||
419 *stored_packet.send_time_ms_ +
420 (packet_duration_ms * kPacketCullingDelayFactor) <=
421 now_ms) {
422 // Too many packets in history, or this packet has timed out. Remove it
423 // and continue.
424 RemovePacket(0);
425 } else {
426 // No more packets can be removed right now.
427 return;
428 }
429 }
430 }
431
RemovePacket(int packet_index)432 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::RemovePacket(
433 int packet_index) {
434 // Move the packet out from the StoredPacket container.
435 std::unique_ptr<RtpPacketToSend> rtp_packet =
436 std::move(packet_history_[packet_index].packet_);
437
438 // Erase from padding priority set, if eligible.
439 if (enable_padding_prio_) {
440 padding_priority_.erase(&packet_history_[packet_index]);
441 }
442
443 if (packet_index == 0) {
444 while (!packet_history_.empty() &&
445 packet_history_.front().packet_ == nullptr) {
446 packet_history_.pop_front();
447 }
448 }
449
450 return rtp_packet;
451 }
452
GetPacketIndex(uint16_t sequence_number) const453 int RtpPacketHistory::GetPacketIndex(uint16_t sequence_number) const {
454 if (packet_history_.empty()) {
455 return 0;
456 }
457
458 RTC_DCHECK(packet_history_.front().packet_ != nullptr);
459 int first_seq = packet_history_.front().packet_->SequenceNumber();
460 if (first_seq == sequence_number) {
461 return 0;
462 }
463
464 int packet_index = sequence_number - first_seq;
465 constexpr int kSeqNumSpan = std::numeric_limits<uint16_t>::max() + 1;
466
467 if (IsNewerSequenceNumber(sequence_number, first_seq)) {
468 if (sequence_number < first_seq) {
469 // Forward wrap.
470 packet_index += kSeqNumSpan;
471 }
472 } else if (sequence_number > first_seq) {
473 // Backwards wrap.
474 packet_index -= kSeqNumSpan;
475 }
476
477 return packet_index;
478 }
479
GetStoredPacket(uint16_t sequence_number)480 RtpPacketHistory::StoredPacket* RtpPacketHistory::GetStoredPacket(
481 uint16_t sequence_number) {
482 int index = GetPacketIndex(sequence_number);
483 if (index < 0 || static_cast<size_t>(index) >= packet_history_.size() ||
484 packet_history_[index].packet_ == nullptr) {
485 return nullptr;
486 }
487 return &packet_history_[index];
488 }
489
StoredPacketToPacketState(const RtpPacketHistory::StoredPacket & stored_packet)490 RtpPacketHistory::PacketState RtpPacketHistory::StoredPacketToPacketState(
491 const RtpPacketHistory::StoredPacket& stored_packet) {
492 RtpPacketHistory::PacketState state;
493 state.rtp_sequence_number = stored_packet.packet_->SequenceNumber();
494 state.send_time_ms = stored_packet.send_time_ms_;
495 state.capture_time_ms = stored_packet.packet_->capture_time_ms();
496 state.ssrc = stored_packet.packet_->Ssrc();
497 state.packet_size = stored_packet.packet_->size();
498 state.times_retransmitted = stored_packet.times_retransmitted();
499 state.pending_transmission = stored_packet.pending_transmission_;
500 return state;
501 }
502
503 } // namespace webrtc
504