1 /*
2  *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "modules/rtp_rtcp/source/deprecated/deprecated_rtp_sender_egress.h"
12 
13 #include <limits>
14 #include <memory>
15 #include <utility>
16 
17 #include "absl/strings/match.h"
18 #include "api/transport/field_trial_based_config.h"
19 #include "logging/rtc_event_log/events/rtc_event_rtp_packet_outgoing.h"
20 #include "modules/remote_bitrate_estimator/test/bwe_test_logging.h"
21 #include "rtc_base/logging.h"
22 
23 namespace webrtc {
24 namespace {
25 constexpr uint32_t kTimestampTicksPerMs = 90;
26 constexpr int kSendSideDelayWindowMs = 1000;
27 constexpr int kBitrateStatisticsWindowMs = 1000;
28 constexpr size_t kRtpSequenceNumberMapMaxEntries = 1 << 13;
29 
IsEnabled(absl::string_view name,const WebRtcKeyValueConfig * field_trials)30 bool IsEnabled(absl::string_view name,
31                const WebRtcKeyValueConfig* field_trials) {
32   FieldTrialBasedConfig default_trials;
33   auto& trials = field_trials ? *field_trials : default_trials;
34   return absl::StartsWith(trials.Lookup(name), "Enabled");
35 }
36 }  // namespace
37 
NonPacedPacketSender(DEPRECATED_RtpSenderEgress * sender)38 DEPRECATED_RtpSenderEgress::NonPacedPacketSender::NonPacedPacketSender(
39     DEPRECATED_RtpSenderEgress* sender)
40     : transport_sequence_number_(0), sender_(sender) {}
41 DEPRECATED_RtpSenderEgress::NonPacedPacketSender::~NonPacedPacketSender() =
42     default;
43 
EnqueuePackets(std::vector<std::unique_ptr<RtpPacketToSend>> packets)44 void DEPRECATED_RtpSenderEgress::NonPacedPacketSender::EnqueuePackets(
45     std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
46   for (auto& packet : packets) {
47     if (!packet->SetExtension<TransportSequenceNumber>(
48             ++transport_sequence_number_)) {
49       --transport_sequence_number_;
50     }
51     packet->ReserveExtension<TransmissionOffset>();
52     packet->ReserveExtension<AbsoluteSendTime>();
53     sender_->SendPacket(packet.get(), PacedPacketInfo());
54   }
55 }
56 
DEPRECATED_RtpSenderEgress(const RtpRtcpInterface::Configuration & config,RtpPacketHistory * packet_history)57 DEPRECATED_RtpSenderEgress::DEPRECATED_RtpSenderEgress(
58     const RtpRtcpInterface::Configuration& config,
59     RtpPacketHistory* packet_history)
60     : ssrc_(config.local_media_ssrc),
61       rtx_ssrc_(config.rtx_send_ssrc),
62       flexfec_ssrc_(config.fec_generator ? config.fec_generator->FecSsrc()
63                                          : absl::nullopt),
64       populate_network2_timestamp_(config.populate_network2_timestamp),
65       send_side_bwe_with_overhead_(
66           IsEnabled("WebRTC-SendSideBwe-WithOverhead", config.field_trials)),
67       clock_(config.clock),
68       packet_history_(packet_history),
69       transport_(config.outgoing_transport),
70       event_log_(config.event_log),
71       is_audio_(config.audio),
72       need_rtp_packet_infos_(config.need_rtp_packet_infos),
73       transport_feedback_observer_(config.transport_feedback_callback),
74       send_side_delay_observer_(config.send_side_delay_observer),
75       send_packet_observer_(config.send_packet_observer),
76       rtp_stats_callback_(config.rtp_stats_callback),
77       bitrate_callback_(config.send_bitrate_observer),
78       media_has_been_sent_(false),
79       force_part_of_allocation_(false),
80       timestamp_offset_(0),
81       max_delay_it_(send_delays_.end()),
82       sum_delays_ms_(0),
83       total_packet_send_delay_ms_(0),
84       send_rates_(kNumMediaTypes,
85                   {kBitrateStatisticsWindowMs, RateStatistics::kBpsScale}),
86       rtp_sequence_number_map_(need_rtp_packet_infos_
87                                    ? std::make_unique<RtpSequenceNumberMap>(
88                                          kRtpSequenceNumberMapMaxEntries)
89                                    : nullptr) {}
90 
SendPacket(RtpPacketToSend * packet,const PacedPacketInfo & pacing_info)91 void DEPRECATED_RtpSenderEgress::SendPacket(
92     RtpPacketToSend* packet,
93     const PacedPacketInfo& pacing_info) {
94   RTC_DCHECK(packet);
95 
96   const uint32_t packet_ssrc = packet->Ssrc();
97   RTC_DCHECK(packet->packet_type().has_value());
98   RTC_DCHECK(HasCorrectSsrc(*packet));
99   int64_t now_ms = clock_->TimeInMilliseconds();
100 
101   if (is_audio_) {
102 #if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
103     BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "AudioTotBitrate_kbps", now_ms,
104                                     GetSendRates().Sum().kbps(), packet_ssrc);
105     BWE_TEST_LOGGING_PLOT_WITH_SSRC(
106         1, "AudioNackBitrate_kbps", now_ms,
107         GetSendRates()[RtpPacketMediaType::kRetransmission].kbps(),
108         packet_ssrc);
109 #endif
110   } else {
111 #if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
112     BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "VideoTotBitrate_kbps", now_ms,
113                                     GetSendRates().Sum().kbps(), packet_ssrc);
114     BWE_TEST_LOGGING_PLOT_WITH_SSRC(
115         1, "VideoNackBitrate_kbps", now_ms,
116         GetSendRates()[RtpPacketMediaType::kRetransmission].kbps(),
117         packet_ssrc);
118 #endif
119   }
120 
121   PacketOptions options;
122   {
123     MutexLock lock(&lock_);
124     options.included_in_allocation = force_part_of_allocation_;
125 
126     if (need_rtp_packet_infos_ &&
127         packet->packet_type() == RtpPacketToSend::Type::kVideo) {
128       RTC_DCHECK(rtp_sequence_number_map_);
129       // Last packet of a frame, add it to sequence number info map.
130       const uint32_t timestamp = packet->Timestamp() - timestamp_offset_;
131       bool is_first_packet_of_frame = packet->is_first_packet_of_frame();
132       bool is_last_packet_of_frame = packet->Marker();
133 
134       rtp_sequence_number_map_->InsertPacket(
135           packet->SequenceNumber(),
136           RtpSequenceNumberMap::Info(timestamp, is_first_packet_of_frame,
137                                      is_last_packet_of_frame));
138     }
139   }
140 
141   // Bug webrtc:7859. While FEC is invoked from rtp_sender_video, and not after
142   // the pacer, these modifications of the header below are happening after the
143   // FEC protection packets are calculated. This will corrupt recovered packets
144   // at the same place. It's not an issue for extensions, which are present in
145   // all the packets (their content just may be incorrect on recovered packets).
146   // In case of VideoTimingExtension, since it's present not in every packet,
147   // data after rtp header may be corrupted if these packets are protected by
148   // the FEC.
149   int64_t diff_ms = now_ms - packet->capture_time_ms();
150   if (packet->HasExtension<TransmissionOffset>()) {
151     packet->SetExtension<TransmissionOffset>(kTimestampTicksPerMs * diff_ms);
152   }
153   if (packet->HasExtension<AbsoluteSendTime>()) {
154     packet->SetExtension<AbsoluteSendTime>(
155         AbsoluteSendTime::MsTo24Bits(now_ms));
156   }
157 
158   if (packet->HasExtension<VideoTimingExtension>()) {
159     if (populate_network2_timestamp_) {
160       packet->set_network2_time_ms(now_ms);
161     } else {
162       packet->set_pacer_exit_time_ms(now_ms);
163     }
164   }
165 
166   const bool is_media = packet->packet_type() == RtpPacketMediaType::kAudio ||
167                         packet->packet_type() == RtpPacketMediaType::kVideo;
168 
169   // Downstream code actually uses this flag to distinguish between media and
170   // everything else.
171   options.is_retransmit = !is_media;
172   if (auto packet_id = packet->GetExtension<TransportSequenceNumber>()) {
173     options.packet_id = *packet_id;
174     options.included_in_feedback = true;
175     options.included_in_allocation = true;
176     AddPacketToTransportFeedback(*packet_id, *packet, pacing_info);
177   }
178 
179   options.application_data.assign(packet->application_data().begin(),
180                                   packet->application_data().end());
181 
182   if (packet->packet_type() != RtpPacketMediaType::kPadding &&
183       packet->packet_type() != RtpPacketMediaType::kRetransmission) {
184     UpdateDelayStatistics(packet->capture_time_ms(), now_ms, packet_ssrc);
185     UpdateOnSendPacket(options.packet_id, packet->capture_time_ms(),
186                        packet_ssrc);
187   }
188 
189   const bool send_success = SendPacketToNetwork(*packet, options, pacing_info);
190 
191   // Put packet in retransmission history or update pending status even if
192   // actual sending fails.
193   if (is_media && packet->allow_retransmission()) {
194     packet_history_->PutRtpPacket(std::make_unique<RtpPacketToSend>(*packet),
195                                   now_ms);
196   } else if (packet->retransmitted_sequence_number()) {
197     packet_history_->MarkPacketAsSent(*packet->retransmitted_sequence_number());
198   }
199 
200   if (send_success) {
201     MutexLock lock(&lock_);
202     UpdateRtpStats(*packet);
203     media_has_been_sent_ = true;
204   }
205 }
206 
ProcessBitrateAndNotifyObservers()207 void DEPRECATED_RtpSenderEgress::ProcessBitrateAndNotifyObservers() {
208   if (!bitrate_callback_)
209     return;
210 
211   MutexLock lock(&lock_);
212   RtpSendRates send_rates = GetSendRatesLocked();
213   bitrate_callback_->Notify(
214       send_rates.Sum().bps(),
215       send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_);
216 }
217 
GetSendRates() const218 RtpSendRates DEPRECATED_RtpSenderEgress::GetSendRates() const {
219   MutexLock lock(&lock_);
220   return GetSendRatesLocked();
221 }
222 
GetSendRatesLocked() const223 RtpSendRates DEPRECATED_RtpSenderEgress::GetSendRatesLocked() const {
224   const int64_t now_ms = clock_->TimeInMilliseconds();
225   RtpSendRates current_rates;
226   for (size_t i = 0; i < kNumMediaTypes; ++i) {
227     RtpPacketMediaType type = static_cast<RtpPacketMediaType>(i);
228     current_rates[type] =
229         DataRate::BitsPerSec(send_rates_[i].Rate(now_ms).value_or(0));
230   }
231   return current_rates;
232 }
233 
GetDataCounters(StreamDataCounters * rtp_stats,StreamDataCounters * rtx_stats) const234 void DEPRECATED_RtpSenderEgress::GetDataCounters(
235     StreamDataCounters* rtp_stats,
236     StreamDataCounters* rtx_stats) const {
237   MutexLock lock(&lock_);
238   *rtp_stats = rtp_stats_;
239   *rtx_stats = rtx_rtp_stats_;
240 }
241 
ForceIncludeSendPacketsInAllocation(bool part_of_allocation)242 void DEPRECATED_RtpSenderEgress::ForceIncludeSendPacketsInAllocation(
243     bool part_of_allocation) {
244   MutexLock lock(&lock_);
245   force_part_of_allocation_ = part_of_allocation;
246 }
247 
MediaHasBeenSent() const248 bool DEPRECATED_RtpSenderEgress::MediaHasBeenSent() const {
249   MutexLock lock(&lock_);
250   return media_has_been_sent_;
251 }
252 
SetMediaHasBeenSent(bool media_sent)253 void DEPRECATED_RtpSenderEgress::SetMediaHasBeenSent(bool media_sent) {
254   MutexLock lock(&lock_);
255   media_has_been_sent_ = media_sent;
256 }
257 
SetTimestampOffset(uint32_t timestamp)258 void DEPRECATED_RtpSenderEgress::SetTimestampOffset(uint32_t timestamp) {
259   MutexLock lock(&lock_);
260   timestamp_offset_ = timestamp;
261 }
262 
263 std::vector<RtpSequenceNumberMap::Info>
GetSentRtpPacketInfos(rtc::ArrayView<const uint16_t> sequence_numbers) const264 DEPRECATED_RtpSenderEgress::GetSentRtpPacketInfos(
265     rtc::ArrayView<const uint16_t> sequence_numbers) const {
266   RTC_DCHECK(!sequence_numbers.empty());
267   if (!need_rtp_packet_infos_) {
268     return std::vector<RtpSequenceNumberMap::Info>();
269   }
270 
271   std::vector<RtpSequenceNumberMap::Info> results;
272   results.reserve(sequence_numbers.size());
273 
274   MutexLock lock(&lock_);
275   for (uint16_t sequence_number : sequence_numbers) {
276     const auto& info = rtp_sequence_number_map_->Get(sequence_number);
277     if (!info) {
278       // The empty vector will be returned. We can delay the clearing
279       // of the vector until after we exit the critical section.
280       return std::vector<RtpSequenceNumberMap::Info>();
281     }
282     results.push_back(*info);
283   }
284 
285   return results;
286 }
287 
HasCorrectSsrc(const RtpPacketToSend & packet) const288 bool DEPRECATED_RtpSenderEgress::HasCorrectSsrc(
289     const RtpPacketToSend& packet) const {
290   switch (*packet.packet_type()) {
291     case RtpPacketMediaType::kAudio:
292     case RtpPacketMediaType::kVideo:
293       return packet.Ssrc() == ssrc_;
294     case RtpPacketMediaType::kRetransmission:
295     case RtpPacketMediaType::kPadding:
296       // Both padding and retransmission must be on either the media or the
297       // RTX stream.
298       return packet.Ssrc() == rtx_ssrc_ || packet.Ssrc() == ssrc_;
299     case RtpPacketMediaType::kForwardErrorCorrection:
300       // FlexFEC is on separate SSRC, ULPFEC uses media SSRC.
301       return packet.Ssrc() == ssrc_ || packet.Ssrc() == flexfec_ssrc_;
302   }
303   return false;
304 }
305 
AddPacketToTransportFeedback(uint16_t packet_id,const RtpPacketToSend & packet,const PacedPacketInfo & pacing_info)306 void DEPRECATED_RtpSenderEgress::AddPacketToTransportFeedback(
307     uint16_t packet_id,
308     const RtpPacketToSend& packet,
309     const PacedPacketInfo& pacing_info) {
310   if (transport_feedback_observer_) {
311     size_t packet_size = packet.payload_size() + packet.padding_size();
312     if (send_side_bwe_with_overhead_) {
313       packet_size = packet.size();
314     }
315 
316     RtpPacketSendInfo packet_info;
317     packet_info.ssrc = ssrc_;
318     packet_info.transport_sequence_number = packet_id;
319     packet_info.rtp_sequence_number = packet.SequenceNumber();
320     packet_info.length = packet_size;
321     packet_info.pacing_info = pacing_info;
322     packet_info.packet_type = packet.packet_type();
323     transport_feedback_observer_->OnAddPacket(packet_info);
324   }
325 }
326 
UpdateDelayStatistics(int64_t capture_time_ms,int64_t now_ms,uint32_t ssrc)327 void DEPRECATED_RtpSenderEgress::UpdateDelayStatistics(int64_t capture_time_ms,
328                                                        int64_t now_ms,
329                                                        uint32_t ssrc) {
330   if (!send_side_delay_observer_ || capture_time_ms <= 0)
331     return;
332 
333   int avg_delay_ms = 0;
334   int max_delay_ms = 0;
335   uint64_t total_packet_send_delay_ms = 0;
336   {
337     MutexLock lock(&lock_);
338     // Compute the max and average of the recent capture-to-send delays.
339     // The time complexity of the current approach depends on the distribution
340     // of the delay values. This could be done more efficiently.
341 
342     // Remove elements older than kSendSideDelayWindowMs.
343     auto lower_bound =
344         send_delays_.lower_bound(now_ms - kSendSideDelayWindowMs);
345     for (auto it = send_delays_.begin(); it != lower_bound; ++it) {
346       if (max_delay_it_ == it) {
347         max_delay_it_ = send_delays_.end();
348       }
349       sum_delays_ms_ -= it->second;
350     }
351     send_delays_.erase(send_delays_.begin(), lower_bound);
352     if (max_delay_it_ == send_delays_.end()) {
353       // Removed the previous max. Need to recompute.
354       RecomputeMaxSendDelay();
355     }
356 
357     // Add the new element.
358     RTC_DCHECK_GE(now_ms, 0);
359     RTC_DCHECK_LE(now_ms, std::numeric_limits<int64_t>::max() / 2);
360     RTC_DCHECK_GE(capture_time_ms, 0);
361     RTC_DCHECK_LE(capture_time_ms, std::numeric_limits<int64_t>::max() / 2);
362     int64_t diff_ms = now_ms - capture_time_ms;
363     RTC_DCHECK_GE(diff_ms, static_cast<int64_t>(0));
364     RTC_DCHECK_LE(diff_ms, std::numeric_limits<int>::max());
365     int new_send_delay = rtc::dchecked_cast<int>(now_ms - capture_time_ms);
366     SendDelayMap::iterator it;
367     bool inserted;
368     std::tie(it, inserted) =
369         send_delays_.insert(std::make_pair(now_ms, new_send_delay));
370     if (!inserted) {
371       // TODO(terelius): If we have multiple delay measurements during the same
372       // millisecond then we keep the most recent one. It is not clear that this
373       // is the right decision, but it preserves an earlier behavior.
374       int previous_send_delay = it->second;
375       sum_delays_ms_ -= previous_send_delay;
376       it->second = new_send_delay;
377       if (max_delay_it_ == it && new_send_delay < previous_send_delay) {
378         RecomputeMaxSendDelay();
379       }
380     }
381     if (max_delay_it_ == send_delays_.end() ||
382         it->second >= max_delay_it_->second) {
383       max_delay_it_ = it;
384     }
385     sum_delays_ms_ += new_send_delay;
386     total_packet_send_delay_ms_ += new_send_delay;
387     total_packet_send_delay_ms = total_packet_send_delay_ms_;
388 
389     size_t num_delays = send_delays_.size();
390     RTC_DCHECK(max_delay_it_ != send_delays_.end());
391     max_delay_ms = rtc::dchecked_cast<int>(max_delay_it_->second);
392     int64_t avg_ms = (sum_delays_ms_ + num_delays / 2) / num_delays;
393     RTC_DCHECK_GE(avg_ms, static_cast<int64_t>(0));
394     RTC_DCHECK_LE(avg_ms,
395                   static_cast<int64_t>(std::numeric_limits<int>::max()));
396     avg_delay_ms =
397         rtc::dchecked_cast<int>((sum_delays_ms_ + num_delays / 2) / num_delays);
398   }
399   send_side_delay_observer_->SendSideDelayUpdated(
400       avg_delay_ms, max_delay_ms, total_packet_send_delay_ms, ssrc);
401 }
402 
RecomputeMaxSendDelay()403 void DEPRECATED_RtpSenderEgress::RecomputeMaxSendDelay() {
404   max_delay_it_ = send_delays_.begin();
405   for (auto it = send_delays_.begin(); it != send_delays_.end(); ++it) {
406     if (it->second >= max_delay_it_->second) {
407       max_delay_it_ = it;
408     }
409   }
410 }
411 
UpdateOnSendPacket(int packet_id,int64_t capture_time_ms,uint32_t ssrc)412 void DEPRECATED_RtpSenderEgress::UpdateOnSendPacket(int packet_id,
413                                                     int64_t capture_time_ms,
414                                                     uint32_t ssrc) {
415   if (!send_packet_observer_ || capture_time_ms <= 0 || packet_id == -1) {
416     return;
417   }
418 
419   send_packet_observer_->OnSendPacket(packet_id, capture_time_ms, ssrc);
420 }
421 
SendPacketToNetwork(const RtpPacketToSend & packet,const PacketOptions & options,const PacedPacketInfo & pacing_info)422 bool DEPRECATED_RtpSenderEgress::SendPacketToNetwork(
423     const RtpPacketToSend& packet,
424     const PacketOptions& options,
425     const PacedPacketInfo& pacing_info) {
426   int bytes_sent = -1;
427   if (transport_) {
428     bytes_sent = transport_->SendRtp(packet.data(), packet.size(), options)
429                      ? static_cast<int>(packet.size())
430                      : -1;
431     if (event_log_ && bytes_sent > 0) {
432       event_log_->Log(std::make_unique<RtcEventRtpPacketOutgoing>(
433           packet, pacing_info.probe_cluster_id));
434     }
435   }
436 
437   if (bytes_sent <= 0) {
438     RTC_LOG(LS_WARNING) << "Transport failed to send packet.";
439     return false;
440   }
441   return true;
442 }
443 
UpdateRtpStats(const RtpPacketToSend & packet)444 void DEPRECATED_RtpSenderEgress::UpdateRtpStats(const RtpPacketToSend& packet) {
445   int64_t now_ms = clock_->TimeInMilliseconds();
446 
447   StreamDataCounters* counters =
448       packet.Ssrc() == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_;
449 
450   if (counters->first_packet_time_ms == -1) {
451     counters->first_packet_time_ms = now_ms;
452   }
453 
454   if (packet.packet_type() == RtpPacketMediaType::kForwardErrorCorrection) {
455     counters->fec.AddPacket(packet);
456   }
457 
458   if (packet.packet_type() == RtpPacketMediaType::kRetransmission) {
459     counters->retransmitted.AddPacket(packet);
460   }
461   counters->transmitted.AddPacket(packet);
462 
463   RTC_DCHECK(packet.packet_type().has_value());
464   send_rates_[static_cast<size_t>(*packet.packet_type())].Update(packet.size(),
465                                                                  now_ms);
466 
467   if (rtp_stats_callback_) {
468     rtp_stats_callback_->DataCountersUpdated(*counters, packet.Ssrc());
469   }
470 }
471 
472 }  // namespace webrtc
473