1 /*
2 * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "modules/rtp_rtcp/source/deprecated/deprecated_rtp_sender_egress.h"
12
13 #include <limits>
14 #include <memory>
15 #include <utility>
16
17 #include "absl/strings/match.h"
18 #include "api/transport/field_trial_based_config.h"
19 #include "logging/rtc_event_log/events/rtc_event_rtp_packet_outgoing.h"
20 #include "modules/remote_bitrate_estimator/test/bwe_test_logging.h"
21 #include "rtc_base/logging.h"
22
23 namespace webrtc {
24 namespace {
25 constexpr uint32_t kTimestampTicksPerMs = 90;
26 constexpr int kSendSideDelayWindowMs = 1000;
27 constexpr int kBitrateStatisticsWindowMs = 1000;
28 constexpr size_t kRtpSequenceNumberMapMaxEntries = 1 << 13;
29
IsEnabled(absl::string_view name,const WebRtcKeyValueConfig * field_trials)30 bool IsEnabled(absl::string_view name,
31 const WebRtcKeyValueConfig* field_trials) {
32 FieldTrialBasedConfig default_trials;
33 auto& trials = field_trials ? *field_trials : default_trials;
34 return absl::StartsWith(trials.Lookup(name), "Enabled");
35 }
36 } // namespace
37
NonPacedPacketSender(DEPRECATED_RtpSenderEgress * sender)38 DEPRECATED_RtpSenderEgress::NonPacedPacketSender::NonPacedPacketSender(
39 DEPRECATED_RtpSenderEgress* sender)
40 : transport_sequence_number_(0), sender_(sender) {}
41 DEPRECATED_RtpSenderEgress::NonPacedPacketSender::~NonPacedPacketSender() =
42 default;
43
EnqueuePackets(std::vector<std::unique_ptr<RtpPacketToSend>> packets)44 void DEPRECATED_RtpSenderEgress::NonPacedPacketSender::EnqueuePackets(
45 std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
46 for (auto& packet : packets) {
47 if (!packet->SetExtension<TransportSequenceNumber>(
48 ++transport_sequence_number_)) {
49 --transport_sequence_number_;
50 }
51 packet->ReserveExtension<TransmissionOffset>();
52 packet->ReserveExtension<AbsoluteSendTime>();
53 sender_->SendPacket(packet.get(), PacedPacketInfo());
54 }
55 }
56
DEPRECATED_RtpSenderEgress(const RtpRtcpInterface::Configuration & config,RtpPacketHistory * packet_history)57 DEPRECATED_RtpSenderEgress::DEPRECATED_RtpSenderEgress(
58 const RtpRtcpInterface::Configuration& config,
59 RtpPacketHistory* packet_history)
60 : ssrc_(config.local_media_ssrc),
61 rtx_ssrc_(config.rtx_send_ssrc),
62 flexfec_ssrc_(config.fec_generator ? config.fec_generator->FecSsrc()
63 : absl::nullopt),
64 populate_network2_timestamp_(config.populate_network2_timestamp),
65 send_side_bwe_with_overhead_(
66 IsEnabled("WebRTC-SendSideBwe-WithOverhead", config.field_trials)),
67 clock_(config.clock),
68 packet_history_(packet_history),
69 transport_(config.outgoing_transport),
70 event_log_(config.event_log),
71 is_audio_(config.audio),
72 need_rtp_packet_infos_(config.need_rtp_packet_infos),
73 transport_feedback_observer_(config.transport_feedback_callback),
74 send_side_delay_observer_(config.send_side_delay_observer),
75 send_packet_observer_(config.send_packet_observer),
76 rtp_stats_callback_(config.rtp_stats_callback),
77 bitrate_callback_(config.send_bitrate_observer),
78 media_has_been_sent_(false),
79 force_part_of_allocation_(false),
80 timestamp_offset_(0),
81 max_delay_it_(send_delays_.end()),
82 sum_delays_ms_(0),
83 total_packet_send_delay_ms_(0),
84 send_rates_(kNumMediaTypes,
85 {kBitrateStatisticsWindowMs, RateStatistics::kBpsScale}),
86 rtp_sequence_number_map_(need_rtp_packet_infos_
87 ? std::make_unique<RtpSequenceNumberMap>(
88 kRtpSequenceNumberMapMaxEntries)
89 : nullptr) {}
90
SendPacket(RtpPacketToSend * packet,const PacedPacketInfo & pacing_info)91 void DEPRECATED_RtpSenderEgress::SendPacket(
92 RtpPacketToSend* packet,
93 const PacedPacketInfo& pacing_info) {
94 RTC_DCHECK(packet);
95
96 const uint32_t packet_ssrc = packet->Ssrc();
97 RTC_DCHECK(packet->packet_type().has_value());
98 RTC_DCHECK(HasCorrectSsrc(*packet));
99 int64_t now_ms = clock_->TimeInMilliseconds();
100
101 if (is_audio_) {
102 #if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
103 BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "AudioTotBitrate_kbps", now_ms,
104 GetSendRates().Sum().kbps(), packet_ssrc);
105 BWE_TEST_LOGGING_PLOT_WITH_SSRC(
106 1, "AudioNackBitrate_kbps", now_ms,
107 GetSendRates()[RtpPacketMediaType::kRetransmission].kbps(),
108 packet_ssrc);
109 #endif
110 } else {
111 #if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
112 BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "VideoTotBitrate_kbps", now_ms,
113 GetSendRates().Sum().kbps(), packet_ssrc);
114 BWE_TEST_LOGGING_PLOT_WITH_SSRC(
115 1, "VideoNackBitrate_kbps", now_ms,
116 GetSendRates()[RtpPacketMediaType::kRetransmission].kbps(),
117 packet_ssrc);
118 #endif
119 }
120
121 PacketOptions options;
122 {
123 MutexLock lock(&lock_);
124 options.included_in_allocation = force_part_of_allocation_;
125
126 if (need_rtp_packet_infos_ &&
127 packet->packet_type() == RtpPacketToSend::Type::kVideo) {
128 RTC_DCHECK(rtp_sequence_number_map_);
129 // Last packet of a frame, add it to sequence number info map.
130 const uint32_t timestamp = packet->Timestamp() - timestamp_offset_;
131 bool is_first_packet_of_frame = packet->is_first_packet_of_frame();
132 bool is_last_packet_of_frame = packet->Marker();
133
134 rtp_sequence_number_map_->InsertPacket(
135 packet->SequenceNumber(),
136 RtpSequenceNumberMap::Info(timestamp, is_first_packet_of_frame,
137 is_last_packet_of_frame));
138 }
139 }
140
141 // Bug webrtc:7859. While FEC is invoked from rtp_sender_video, and not after
142 // the pacer, these modifications of the header below are happening after the
143 // FEC protection packets are calculated. This will corrupt recovered packets
144 // at the same place. It's not an issue for extensions, which are present in
145 // all the packets (their content just may be incorrect on recovered packets).
146 // In case of VideoTimingExtension, since it's present not in every packet,
147 // data after rtp header may be corrupted if these packets are protected by
148 // the FEC.
149 int64_t diff_ms = now_ms - packet->capture_time_ms();
150 if (packet->HasExtension<TransmissionOffset>()) {
151 packet->SetExtension<TransmissionOffset>(kTimestampTicksPerMs * diff_ms);
152 }
153 if (packet->HasExtension<AbsoluteSendTime>()) {
154 packet->SetExtension<AbsoluteSendTime>(
155 AbsoluteSendTime::MsTo24Bits(now_ms));
156 }
157
158 if (packet->HasExtension<VideoTimingExtension>()) {
159 if (populate_network2_timestamp_) {
160 packet->set_network2_time_ms(now_ms);
161 } else {
162 packet->set_pacer_exit_time_ms(now_ms);
163 }
164 }
165
166 const bool is_media = packet->packet_type() == RtpPacketMediaType::kAudio ||
167 packet->packet_type() == RtpPacketMediaType::kVideo;
168
169 // Downstream code actually uses this flag to distinguish between media and
170 // everything else.
171 options.is_retransmit = !is_media;
172 if (auto packet_id = packet->GetExtension<TransportSequenceNumber>()) {
173 options.packet_id = *packet_id;
174 options.included_in_feedback = true;
175 options.included_in_allocation = true;
176 AddPacketToTransportFeedback(*packet_id, *packet, pacing_info);
177 }
178
179 options.application_data.assign(packet->application_data().begin(),
180 packet->application_data().end());
181
182 if (packet->packet_type() != RtpPacketMediaType::kPadding &&
183 packet->packet_type() != RtpPacketMediaType::kRetransmission) {
184 UpdateDelayStatistics(packet->capture_time_ms(), now_ms, packet_ssrc);
185 UpdateOnSendPacket(options.packet_id, packet->capture_time_ms(),
186 packet_ssrc);
187 }
188
189 const bool send_success = SendPacketToNetwork(*packet, options, pacing_info);
190
191 // Put packet in retransmission history or update pending status even if
192 // actual sending fails.
193 if (is_media && packet->allow_retransmission()) {
194 packet_history_->PutRtpPacket(std::make_unique<RtpPacketToSend>(*packet),
195 now_ms);
196 } else if (packet->retransmitted_sequence_number()) {
197 packet_history_->MarkPacketAsSent(*packet->retransmitted_sequence_number());
198 }
199
200 if (send_success) {
201 MutexLock lock(&lock_);
202 UpdateRtpStats(*packet);
203 media_has_been_sent_ = true;
204 }
205 }
206
ProcessBitrateAndNotifyObservers()207 void DEPRECATED_RtpSenderEgress::ProcessBitrateAndNotifyObservers() {
208 if (!bitrate_callback_)
209 return;
210
211 MutexLock lock(&lock_);
212 RtpSendRates send_rates = GetSendRatesLocked();
213 bitrate_callback_->Notify(
214 send_rates.Sum().bps(),
215 send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_);
216 }
217
GetSendRates() const218 RtpSendRates DEPRECATED_RtpSenderEgress::GetSendRates() const {
219 MutexLock lock(&lock_);
220 return GetSendRatesLocked();
221 }
222
GetSendRatesLocked() const223 RtpSendRates DEPRECATED_RtpSenderEgress::GetSendRatesLocked() const {
224 const int64_t now_ms = clock_->TimeInMilliseconds();
225 RtpSendRates current_rates;
226 for (size_t i = 0; i < kNumMediaTypes; ++i) {
227 RtpPacketMediaType type = static_cast<RtpPacketMediaType>(i);
228 current_rates[type] =
229 DataRate::BitsPerSec(send_rates_[i].Rate(now_ms).value_or(0));
230 }
231 return current_rates;
232 }
233
GetDataCounters(StreamDataCounters * rtp_stats,StreamDataCounters * rtx_stats) const234 void DEPRECATED_RtpSenderEgress::GetDataCounters(
235 StreamDataCounters* rtp_stats,
236 StreamDataCounters* rtx_stats) const {
237 MutexLock lock(&lock_);
238 *rtp_stats = rtp_stats_;
239 *rtx_stats = rtx_rtp_stats_;
240 }
241
ForceIncludeSendPacketsInAllocation(bool part_of_allocation)242 void DEPRECATED_RtpSenderEgress::ForceIncludeSendPacketsInAllocation(
243 bool part_of_allocation) {
244 MutexLock lock(&lock_);
245 force_part_of_allocation_ = part_of_allocation;
246 }
247
MediaHasBeenSent() const248 bool DEPRECATED_RtpSenderEgress::MediaHasBeenSent() const {
249 MutexLock lock(&lock_);
250 return media_has_been_sent_;
251 }
252
SetMediaHasBeenSent(bool media_sent)253 void DEPRECATED_RtpSenderEgress::SetMediaHasBeenSent(bool media_sent) {
254 MutexLock lock(&lock_);
255 media_has_been_sent_ = media_sent;
256 }
257
SetTimestampOffset(uint32_t timestamp)258 void DEPRECATED_RtpSenderEgress::SetTimestampOffset(uint32_t timestamp) {
259 MutexLock lock(&lock_);
260 timestamp_offset_ = timestamp;
261 }
262
263 std::vector<RtpSequenceNumberMap::Info>
GetSentRtpPacketInfos(rtc::ArrayView<const uint16_t> sequence_numbers) const264 DEPRECATED_RtpSenderEgress::GetSentRtpPacketInfos(
265 rtc::ArrayView<const uint16_t> sequence_numbers) const {
266 RTC_DCHECK(!sequence_numbers.empty());
267 if (!need_rtp_packet_infos_) {
268 return std::vector<RtpSequenceNumberMap::Info>();
269 }
270
271 std::vector<RtpSequenceNumberMap::Info> results;
272 results.reserve(sequence_numbers.size());
273
274 MutexLock lock(&lock_);
275 for (uint16_t sequence_number : sequence_numbers) {
276 const auto& info = rtp_sequence_number_map_->Get(sequence_number);
277 if (!info) {
278 // The empty vector will be returned. We can delay the clearing
279 // of the vector until after we exit the critical section.
280 return std::vector<RtpSequenceNumberMap::Info>();
281 }
282 results.push_back(*info);
283 }
284
285 return results;
286 }
287
HasCorrectSsrc(const RtpPacketToSend & packet) const288 bool DEPRECATED_RtpSenderEgress::HasCorrectSsrc(
289 const RtpPacketToSend& packet) const {
290 switch (*packet.packet_type()) {
291 case RtpPacketMediaType::kAudio:
292 case RtpPacketMediaType::kVideo:
293 return packet.Ssrc() == ssrc_;
294 case RtpPacketMediaType::kRetransmission:
295 case RtpPacketMediaType::kPadding:
296 // Both padding and retransmission must be on either the media or the
297 // RTX stream.
298 return packet.Ssrc() == rtx_ssrc_ || packet.Ssrc() == ssrc_;
299 case RtpPacketMediaType::kForwardErrorCorrection:
300 // FlexFEC is on separate SSRC, ULPFEC uses media SSRC.
301 return packet.Ssrc() == ssrc_ || packet.Ssrc() == flexfec_ssrc_;
302 }
303 return false;
304 }
305
AddPacketToTransportFeedback(uint16_t packet_id,const RtpPacketToSend & packet,const PacedPacketInfo & pacing_info)306 void DEPRECATED_RtpSenderEgress::AddPacketToTransportFeedback(
307 uint16_t packet_id,
308 const RtpPacketToSend& packet,
309 const PacedPacketInfo& pacing_info) {
310 if (transport_feedback_observer_) {
311 size_t packet_size = packet.payload_size() + packet.padding_size();
312 if (send_side_bwe_with_overhead_) {
313 packet_size = packet.size();
314 }
315
316 RtpPacketSendInfo packet_info;
317 packet_info.ssrc = ssrc_;
318 packet_info.transport_sequence_number = packet_id;
319 packet_info.rtp_sequence_number = packet.SequenceNumber();
320 packet_info.length = packet_size;
321 packet_info.pacing_info = pacing_info;
322 packet_info.packet_type = packet.packet_type();
323 transport_feedback_observer_->OnAddPacket(packet_info);
324 }
325 }
326
UpdateDelayStatistics(int64_t capture_time_ms,int64_t now_ms,uint32_t ssrc)327 void DEPRECATED_RtpSenderEgress::UpdateDelayStatistics(int64_t capture_time_ms,
328 int64_t now_ms,
329 uint32_t ssrc) {
330 if (!send_side_delay_observer_ || capture_time_ms <= 0)
331 return;
332
333 int avg_delay_ms = 0;
334 int max_delay_ms = 0;
335 uint64_t total_packet_send_delay_ms = 0;
336 {
337 MutexLock lock(&lock_);
338 // Compute the max and average of the recent capture-to-send delays.
339 // The time complexity of the current approach depends on the distribution
340 // of the delay values. This could be done more efficiently.
341
342 // Remove elements older than kSendSideDelayWindowMs.
343 auto lower_bound =
344 send_delays_.lower_bound(now_ms - kSendSideDelayWindowMs);
345 for (auto it = send_delays_.begin(); it != lower_bound; ++it) {
346 if (max_delay_it_ == it) {
347 max_delay_it_ = send_delays_.end();
348 }
349 sum_delays_ms_ -= it->second;
350 }
351 send_delays_.erase(send_delays_.begin(), lower_bound);
352 if (max_delay_it_ == send_delays_.end()) {
353 // Removed the previous max. Need to recompute.
354 RecomputeMaxSendDelay();
355 }
356
357 // Add the new element.
358 RTC_DCHECK_GE(now_ms, 0);
359 RTC_DCHECK_LE(now_ms, std::numeric_limits<int64_t>::max() / 2);
360 RTC_DCHECK_GE(capture_time_ms, 0);
361 RTC_DCHECK_LE(capture_time_ms, std::numeric_limits<int64_t>::max() / 2);
362 int64_t diff_ms = now_ms - capture_time_ms;
363 RTC_DCHECK_GE(diff_ms, static_cast<int64_t>(0));
364 RTC_DCHECK_LE(diff_ms, std::numeric_limits<int>::max());
365 int new_send_delay = rtc::dchecked_cast<int>(now_ms - capture_time_ms);
366 SendDelayMap::iterator it;
367 bool inserted;
368 std::tie(it, inserted) =
369 send_delays_.insert(std::make_pair(now_ms, new_send_delay));
370 if (!inserted) {
371 // TODO(terelius): If we have multiple delay measurements during the same
372 // millisecond then we keep the most recent one. It is not clear that this
373 // is the right decision, but it preserves an earlier behavior.
374 int previous_send_delay = it->second;
375 sum_delays_ms_ -= previous_send_delay;
376 it->second = new_send_delay;
377 if (max_delay_it_ == it && new_send_delay < previous_send_delay) {
378 RecomputeMaxSendDelay();
379 }
380 }
381 if (max_delay_it_ == send_delays_.end() ||
382 it->second >= max_delay_it_->second) {
383 max_delay_it_ = it;
384 }
385 sum_delays_ms_ += new_send_delay;
386 total_packet_send_delay_ms_ += new_send_delay;
387 total_packet_send_delay_ms = total_packet_send_delay_ms_;
388
389 size_t num_delays = send_delays_.size();
390 RTC_DCHECK(max_delay_it_ != send_delays_.end());
391 max_delay_ms = rtc::dchecked_cast<int>(max_delay_it_->second);
392 int64_t avg_ms = (sum_delays_ms_ + num_delays / 2) / num_delays;
393 RTC_DCHECK_GE(avg_ms, static_cast<int64_t>(0));
394 RTC_DCHECK_LE(avg_ms,
395 static_cast<int64_t>(std::numeric_limits<int>::max()));
396 avg_delay_ms =
397 rtc::dchecked_cast<int>((sum_delays_ms_ + num_delays / 2) / num_delays);
398 }
399 send_side_delay_observer_->SendSideDelayUpdated(
400 avg_delay_ms, max_delay_ms, total_packet_send_delay_ms, ssrc);
401 }
402
RecomputeMaxSendDelay()403 void DEPRECATED_RtpSenderEgress::RecomputeMaxSendDelay() {
404 max_delay_it_ = send_delays_.begin();
405 for (auto it = send_delays_.begin(); it != send_delays_.end(); ++it) {
406 if (it->second >= max_delay_it_->second) {
407 max_delay_it_ = it;
408 }
409 }
410 }
411
UpdateOnSendPacket(int packet_id,int64_t capture_time_ms,uint32_t ssrc)412 void DEPRECATED_RtpSenderEgress::UpdateOnSendPacket(int packet_id,
413 int64_t capture_time_ms,
414 uint32_t ssrc) {
415 if (!send_packet_observer_ || capture_time_ms <= 0 || packet_id == -1) {
416 return;
417 }
418
419 send_packet_observer_->OnSendPacket(packet_id, capture_time_ms, ssrc);
420 }
421
SendPacketToNetwork(const RtpPacketToSend & packet,const PacketOptions & options,const PacedPacketInfo & pacing_info)422 bool DEPRECATED_RtpSenderEgress::SendPacketToNetwork(
423 const RtpPacketToSend& packet,
424 const PacketOptions& options,
425 const PacedPacketInfo& pacing_info) {
426 int bytes_sent = -1;
427 if (transport_) {
428 bytes_sent = transport_->SendRtp(packet.data(), packet.size(), options)
429 ? static_cast<int>(packet.size())
430 : -1;
431 if (event_log_ && bytes_sent > 0) {
432 event_log_->Log(std::make_unique<RtcEventRtpPacketOutgoing>(
433 packet, pacing_info.probe_cluster_id));
434 }
435 }
436
437 if (bytes_sent <= 0) {
438 RTC_LOG(LS_WARNING) << "Transport failed to send packet.";
439 return false;
440 }
441 return true;
442 }
443
UpdateRtpStats(const RtpPacketToSend & packet)444 void DEPRECATED_RtpSenderEgress::UpdateRtpStats(const RtpPacketToSend& packet) {
445 int64_t now_ms = clock_->TimeInMilliseconds();
446
447 StreamDataCounters* counters =
448 packet.Ssrc() == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_;
449
450 if (counters->first_packet_time_ms == -1) {
451 counters->first_packet_time_ms = now_ms;
452 }
453
454 if (packet.packet_type() == RtpPacketMediaType::kForwardErrorCorrection) {
455 counters->fec.AddPacket(packet);
456 }
457
458 if (packet.packet_type() == RtpPacketMediaType::kRetransmission) {
459 counters->retransmitted.AddPacket(packet);
460 }
461 counters->transmitted.AddPacket(packet);
462
463 RTC_DCHECK(packet.packet_type().has_value());
464 send_rates_[static_cast<size_t>(*packet.packet_type())].Update(packet.size(),
465 now_ms);
466
467 if (rtp_stats_callback_) {
468 rtp_stats_callback_->DataCountersUpdated(*counters, packet.Ssrc());
469 }
470 }
471
472 } // namespace webrtc
473