1 /*
2  *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "modules/rtp_rtcp/source/rtp_sender_egress.h"
12 
13 #include <algorithm>
14 #include <limits>
15 #include <memory>
16 #include <utility>
17 
18 #include "absl/strings/match.h"
19 #include "api/transport/field_trial_based_config.h"
20 #include "logging/rtc_event_log/events/rtc_event_rtp_packet_outgoing.h"
21 #include "rtc_base/logging.h"
22 #include "rtc_base/task_utils/to_queued_task.h"
23 
24 namespace webrtc {
25 namespace {
26 constexpr uint32_t kTimestampTicksPerMs = 90;
27 constexpr int kSendSideDelayWindowMs = 1000;
28 constexpr int kBitrateStatisticsWindowMs = 1000;
29 constexpr size_t kRtpSequenceNumberMapMaxEntries = 1 << 13;
30 constexpr TimeDelta kUpdateInterval =
31     TimeDelta::Millis(kBitrateStatisticsWindowMs);
32 
IsEnabled(absl::string_view name,const WebRtcKeyValueConfig * field_trials)33 bool IsEnabled(absl::string_view name,
34                const WebRtcKeyValueConfig* field_trials) {
35   FieldTrialBasedConfig default_trials;
36   auto& trials = field_trials ? *field_trials : default_trials;
37   return absl::StartsWith(trials.Lookup(name), "Enabled");
38 }
39 }  // namespace
40 
NonPacedPacketSender(RtpSenderEgress * sender,SequenceNumberAssigner * sequence_number_assigner)41 RtpSenderEgress::NonPacedPacketSender::NonPacedPacketSender(
42     RtpSenderEgress* sender,
43     SequenceNumberAssigner* sequence_number_assigner)
44     : transport_sequence_number_(0),
45       sender_(sender),
46       sequence_number_assigner_(sequence_number_assigner) {
47   RTC_DCHECK(sequence_number_assigner_);
48 }
49 RtpSenderEgress::NonPacedPacketSender::~NonPacedPacketSender() = default;
50 
EnqueuePackets(std::vector<std::unique_ptr<RtpPacketToSend>> packets)51 void RtpSenderEgress::NonPacedPacketSender::EnqueuePackets(
52     std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
53   for (auto& packet : packets) {
54     PrepareForSend(packet.get());
55     sender_->SendPacket(packet.get(), PacedPacketInfo());
56   }
57   auto fec_packets = sender_->FetchFecPackets();
58   if (!fec_packets.empty()) {
59     // Don't generate sequence numbers for flexfec, they are already running on
60     // an internally maintained sequence.
61     const bool generate_sequence_numbers = !sender_->FlexFecSsrc().has_value();
62 
63     for (auto& packet : fec_packets) {
64       if (generate_sequence_numbers) {
65         sequence_number_assigner_->AssignSequenceNumber(packet.get());
66       }
67       PrepareForSend(packet.get());
68     }
69     EnqueuePackets(std::move(fec_packets));
70   }
71 }
72 
PrepareForSend(RtpPacketToSend * packet)73 void RtpSenderEgress::NonPacedPacketSender::PrepareForSend(
74     RtpPacketToSend* packet) {
75   if (!packet->SetExtension<TransportSequenceNumber>(
76           ++transport_sequence_number_)) {
77     --transport_sequence_number_;
78   }
79   packet->ReserveExtension<TransmissionOffset>();
80   packet->ReserveExtension<AbsoluteSendTime>();
81 }
82 
RtpSenderEgress(const RtpRtcpInterface::Configuration & config,RtpPacketHistory * packet_history)83 RtpSenderEgress::RtpSenderEgress(const RtpRtcpInterface::Configuration& config,
84                                  RtpPacketHistory* packet_history)
85     : worker_queue_(TaskQueueBase::Current()),
86       ssrc_(config.local_media_ssrc),
87       rtx_ssrc_(config.rtx_send_ssrc),
88       flexfec_ssrc_(config.fec_generator ? config.fec_generator->FecSsrc()
89                                          : absl::nullopt),
90       populate_network2_timestamp_(config.populate_network2_timestamp),
91       send_side_bwe_with_overhead_(
92           IsEnabled("WebRTC-SendSideBwe-WithOverhead", config.field_trials)),
93       clock_(config.clock),
94       packet_history_(packet_history),
95       transport_(config.outgoing_transport),
96       event_log_(config.event_log),
97 #if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
98       is_audio_(config.audio),
99 #endif
100       need_rtp_packet_infos_(config.need_rtp_packet_infos),
101       fec_generator_(
102           IsEnabled("WebRTC-DeferredFecGeneration", config.field_trials)
103               ? config.fec_generator
104               : nullptr),
105       transport_feedback_observer_(config.transport_feedback_callback),
106       send_side_delay_observer_(config.send_side_delay_observer),
107       send_packet_observer_(config.send_packet_observer),
108       rtp_stats_callback_(config.rtp_stats_callback),
109       bitrate_callback_(config.send_bitrate_observer),
110       media_has_been_sent_(false),
111       force_part_of_allocation_(false),
112       timestamp_offset_(0),
113       max_delay_it_(send_delays_.end()),
114       sum_delays_ms_(0),
115       total_packet_send_delay_ms_(0),
116       send_rates_(kNumMediaTypes,
117                   {kBitrateStatisticsWindowMs, RateStatistics::kBpsScale}),
118       rtp_sequence_number_map_(need_rtp_packet_infos_
119                                    ? std::make_unique<RtpSequenceNumberMap>(
120                                          kRtpSequenceNumberMapMaxEntries)
121                                    : nullptr) {
122   RTC_DCHECK(worker_queue_);
123   pacer_checker_.Detach();
124   if (bitrate_callback_) {
125     update_task_ = RepeatingTaskHandle::DelayedStart(worker_queue_,
__anonf455135e0202() 126                                                      kUpdateInterval, [this]() {
127                                                        PeriodicUpdate();
128                                                        return kUpdateInterval;
129                                                      });
130   }
131 }
132 
~RtpSenderEgress()133 RtpSenderEgress::~RtpSenderEgress() {
134   RTC_DCHECK_RUN_ON(worker_queue_);
135   update_task_.Stop();
136 }
137 
SendPacket(RtpPacketToSend * packet,const PacedPacketInfo & pacing_info)138 void RtpSenderEgress::SendPacket(RtpPacketToSend* packet,
139                                  const PacedPacketInfo& pacing_info) {
140   RTC_DCHECK_RUN_ON(&pacer_checker_);
141   RTC_DCHECK(packet);
142 
143   RTC_DCHECK(packet->packet_type().has_value());
144   RTC_DCHECK(HasCorrectSsrc(*packet));
145 
146   const uint32_t packet_ssrc = packet->Ssrc();
147   const int64_t now_ms = clock_->TimeInMilliseconds();
148 
149 #if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
150   worker_queue_->PostTask(
151       ToQueuedTask(task_safety_, [this, now_ms, packet_ssrc]() {
152         BweTestLoggingPlot(now_ms, packet_ssrc);
153       }));
154 #endif
155 
156   if (need_rtp_packet_infos_ &&
157       packet->packet_type() == RtpPacketToSend::Type::kVideo) {
158     worker_queue_->PostTask(ToQueuedTask(
159         task_safety_,
160         [this, packet_timestamp = packet->Timestamp(),
161          is_first_packet_of_frame = packet->is_first_packet_of_frame(),
162          is_last_packet_of_frame = packet->Marker(),
163          sequence_number = packet->SequenceNumber()]() {
164           RTC_DCHECK_RUN_ON(worker_queue_);
165           // Last packet of a frame, add it to sequence number info map.
166           const uint32_t timestamp = packet_timestamp - timestamp_offset_;
167           rtp_sequence_number_map_->InsertPacket(
168               sequence_number,
169               RtpSequenceNumberMap::Info(timestamp, is_first_packet_of_frame,
170                                          is_last_packet_of_frame));
171         }));
172   }
173 
174   if (fec_generator_ && packet->fec_protect_packet()) {
175     // Deferred fec generation is used, add packet to generator.
176     RTC_DCHECK(fec_generator_);
177     RTC_DCHECK(packet->packet_type() == RtpPacketMediaType::kVideo);
178     absl::optional<std::pair<FecProtectionParams, FecProtectionParams>>
179         new_fec_params;
180     {
181       MutexLock lock(&lock_);
182       new_fec_params.swap(pending_fec_params_);
183     }
184     if (new_fec_params) {
185       fec_generator_->SetProtectionParameters(new_fec_params->first,
186                                               new_fec_params->second);
187     }
188     if (packet->is_red()) {
189       RtpPacketToSend unpacked_packet(*packet);
190 
191       const rtc::CopyOnWriteBuffer buffer = packet->Buffer();
192       // Grab media payload type from RED header.
193       const size_t headers_size = packet->headers_size();
194       unpacked_packet.SetPayloadType(buffer[headers_size]);
195 
196       // Copy the media payload into the unpacked buffer.
197       uint8_t* payload_buffer =
198           unpacked_packet.SetPayloadSize(packet->payload_size() - 1);
199       std::copy(&packet->payload()[0] + 1,
200                 &packet->payload()[0] + packet->payload_size(), payload_buffer);
201 
202       fec_generator_->AddPacketAndGenerateFec(unpacked_packet);
203     } else {
204       // If not RED encapsulated - we can just insert packet directly.
205       fec_generator_->AddPacketAndGenerateFec(*packet);
206     }
207   }
208 
209   // Bug webrtc:7859. While FEC is invoked from rtp_sender_video, and not after
210   // the pacer, these modifications of the header below are happening after the
211   // FEC protection packets are calculated. This will corrupt recovered packets
212   // at the same place. It's not an issue for extensions, which are present in
213   // all the packets (their content just may be incorrect on recovered packets).
214   // In case of VideoTimingExtension, since it's present not in every packet,
215   // data after rtp header may be corrupted if these packets are protected by
216   // the FEC.
217   int64_t diff_ms = now_ms - packet->capture_time_ms();
218   if (packet->HasExtension<TransmissionOffset>()) {
219     packet->SetExtension<TransmissionOffset>(kTimestampTicksPerMs * diff_ms);
220   }
221   if (packet->HasExtension<AbsoluteSendTime>()) {
222     packet->SetExtension<AbsoluteSendTime>(
223         AbsoluteSendTime::MsTo24Bits(now_ms));
224   }
225 
226   if (packet->HasExtension<VideoTimingExtension>()) {
227     if (populate_network2_timestamp_) {
228       packet->set_network2_time_ms(now_ms);
229     } else {
230       packet->set_pacer_exit_time_ms(now_ms);
231     }
232   }
233 
234   const bool is_media = packet->packet_type() == RtpPacketMediaType::kAudio ||
235                         packet->packet_type() == RtpPacketMediaType::kVideo;
236 
237   PacketOptions options;
238   {
239     MutexLock lock(&lock_);
240     options.included_in_allocation = force_part_of_allocation_;
241   }
242 
243   // Downstream code actually uses this flag to distinguish between media and
244   // everything else.
245   options.is_retransmit = !is_media;
246   if (auto packet_id = packet->GetExtension<TransportSequenceNumber>()) {
247     options.packet_id = *packet_id;
248     options.included_in_feedback = true;
249     options.included_in_allocation = true;
250     AddPacketToTransportFeedback(*packet_id, *packet, pacing_info);
251   }
252 
253   options.application_data.assign(packet->application_data().begin(),
254                                   packet->application_data().end());
255 
256   if (packet->packet_type() != RtpPacketMediaType::kPadding &&
257       packet->packet_type() != RtpPacketMediaType::kRetransmission) {
258     UpdateDelayStatistics(packet->capture_time_ms(), now_ms, packet_ssrc);
259     UpdateOnSendPacket(options.packet_id, packet->capture_time_ms(),
260                        packet_ssrc);
261   }
262 
263   const bool send_success = SendPacketToNetwork(*packet, options, pacing_info);
264 
265   // Put packet in retransmission history or update pending status even if
266   // actual sending fails.
267   if (is_media && packet->allow_retransmission()) {
268     packet_history_->PutRtpPacket(std::make_unique<RtpPacketToSend>(*packet),
269                                   now_ms);
270   } else if (packet->retransmitted_sequence_number()) {
271     packet_history_->MarkPacketAsSent(*packet->retransmitted_sequence_number());
272   }
273 
274   if (send_success) {
275     // |media_has_been_sent_| is used by RTPSender to figure out if it can send
276     // padding in the absence of transport-cc or abs-send-time.
277     // In those cases media must be sent first to set a reference timestamp.
278     media_has_been_sent_ = true;
279 
280     // TODO(sprang): Add support for FEC protecting all header extensions, add
281     // media packet to generator here instead.
282 
283     RTC_DCHECK(packet->packet_type().has_value());
284     RtpPacketMediaType packet_type = *packet->packet_type();
285     RtpPacketCounter counter(*packet);
286     size_t size = packet->size();
287     worker_queue_->PostTask(
288         ToQueuedTask(task_safety_, [this, now_ms, packet_ssrc, packet_type,
289                                     counter = std::move(counter), size]() {
290           RTC_DCHECK_RUN_ON(worker_queue_);
291           UpdateRtpStats(now_ms, packet_ssrc, packet_type, std::move(counter),
292                          size);
293         }));
294   }
295 }
296 
GetSendRates() const297 RtpSendRates RtpSenderEgress::GetSendRates() const {
298   MutexLock lock(&lock_);
299   const int64_t now_ms = clock_->TimeInMilliseconds();
300   return GetSendRatesLocked(now_ms);
301 }
302 
GetSendRatesLocked(int64_t now_ms) const303 RtpSendRates RtpSenderEgress::GetSendRatesLocked(int64_t now_ms) const {
304   RtpSendRates current_rates;
305   for (size_t i = 0; i < kNumMediaTypes; ++i) {
306     RtpPacketMediaType type = static_cast<RtpPacketMediaType>(i);
307     current_rates[type] =
308         DataRate::BitsPerSec(send_rates_[i].Rate(now_ms).value_or(0));
309   }
310   return current_rates;
311 }
312 
GetDataCounters(StreamDataCounters * rtp_stats,StreamDataCounters * rtx_stats) const313 void RtpSenderEgress::GetDataCounters(StreamDataCounters* rtp_stats,
314                                       StreamDataCounters* rtx_stats) const {
315   // TODO(bugs.webrtc.org/11581): make sure rtx_rtp_stats_ and rtp_stats_ are
316   // only touched on the worker thread.
317   MutexLock lock(&lock_);
318   *rtp_stats = rtp_stats_;
319   *rtx_stats = rtx_rtp_stats_;
320 }
321 
ForceIncludeSendPacketsInAllocation(bool part_of_allocation)322 void RtpSenderEgress::ForceIncludeSendPacketsInAllocation(
323     bool part_of_allocation) {
324   MutexLock lock(&lock_);
325   force_part_of_allocation_ = part_of_allocation;
326 }
327 
MediaHasBeenSent() const328 bool RtpSenderEgress::MediaHasBeenSent() const {
329   RTC_DCHECK_RUN_ON(&pacer_checker_);
330   return media_has_been_sent_;
331 }
332 
SetMediaHasBeenSent(bool media_sent)333 void RtpSenderEgress::SetMediaHasBeenSent(bool media_sent) {
334   RTC_DCHECK_RUN_ON(&pacer_checker_);
335   media_has_been_sent_ = media_sent;
336 }
337 
SetTimestampOffset(uint32_t timestamp)338 void RtpSenderEgress::SetTimestampOffset(uint32_t timestamp) {
339   RTC_DCHECK_RUN_ON(worker_queue_);
340   timestamp_offset_ = timestamp;
341 }
342 
GetSentRtpPacketInfos(rtc::ArrayView<const uint16_t> sequence_numbers) const343 std::vector<RtpSequenceNumberMap::Info> RtpSenderEgress::GetSentRtpPacketInfos(
344     rtc::ArrayView<const uint16_t> sequence_numbers) const {
345   RTC_DCHECK_RUN_ON(worker_queue_);
346   RTC_DCHECK(!sequence_numbers.empty());
347   if (!need_rtp_packet_infos_) {
348     return std::vector<RtpSequenceNumberMap::Info>();
349   }
350 
351   std::vector<RtpSequenceNumberMap::Info> results;
352   results.reserve(sequence_numbers.size());
353 
354   for (uint16_t sequence_number : sequence_numbers) {
355     const auto& info = rtp_sequence_number_map_->Get(sequence_number);
356     if (!info) {
357       // The empty vector will be returned. We can delay the clearing
358       // of the vector until after we exit the critical section.
359       return std::vector<RtpSequenceNumberMap::Info>();
360     }
361     results.push_back(*info);
362   }
363 
364   return results;
365 }
366 
SetFecProtectionParameters(const FecProtectionParams & delta_params,const FecProtectionParams & key_params)367 void RtpSenderEgress::SetFecProtectionParameters(
368     const FecProtectionParams& delta_params,
369     const FecProtectionParams& key_params) {
370   // TODO(sprang): Post task to pacer queue instead, one pacer is fully
371   // migrated to a task queue.
372   MutexLock lock(&lock_);
373   pending_fec_params_.emplace(delta_params, key_params);
374 }
375 
376 std::vector<std::unique_ptr<RtpPacketToSend>>
FetchFecPackets()377 RtpSenderEgress::FetchFecPackets() {
378   RTC_DCHECK_RUN_ON(&pacer_checker_);
379   if (fec_generator_) {
380     return fec_generator_->GetFecPackets();
381   }
382   return {};
383 }
384 
HasCorrectSsrc(const RtpPacketToSend & packet) const385 bool RtpSenderEgress::HasCorrectSsrc(const RtpPacketToSend& packet) const {
386   switch (*packet.packet_type()) {
387     case RtpPacketMediaType::kAudio:
388     case RtpPacketMediaType::kVideo:
389       return packet.Ssrc() == ssrc_;
390     case RtpPacketMediaType::kRetransmission:
391     case RtpPacketMediaType::kPadding:
392       // Both padding and retransmission must be on either the media or the
393       // RTX stream.
394       return packet.Ssrc() == rtx_ssrc_ || packet.Ssrc() == ssrc_;
395     case RtpPacketMediaType::kForwardErrorCorrection:
396       // FlexFEC is on separate SSRC, ULPFEC uses media SSRC.
397       return packet.Ssrc() == ssrc_ || packet.Ssrc() == flexfec_ssrc_;
398   }
399   return false;
400 }
401 
AddPacketToTransportFeedback(uint16_t packet_id,const RtpPacketToSend & packet,const PacedPacketInfo & pacing_info)402 void RtpSenderEgress::AddPacketToTransportFeedback(
403     uint16_t packet_id,
404     const RtpPacketToSend& packet,
405     const PacedPacketInfo& pacing_info) {
406   if (transport_feedback_observer_) {
407     size_t packet_size = packet.payload_size() + packet.padding_size();
408     if (send_side_bwe_with_overhead_) {
409       packet_size = packet.size();
410     }
411 
412     RtpPacketSendInfo packet_info;
413     packet_info.ssrc = ssrc_;
414     packet_info.transport_sequence_number = packet_id;
415     packet_info.rtp_sequence_number = packet.SequenceNumber();
416     packet_info.length = packet_size;
417     packet_info.pacing_info = pacing_info;
418     packet_info.packet_type = packet.packet_type();
419     transport_feedback_observer_->OnAddPacket(packet_info);
420   }
421 }
422 
UpdateDelayStatistics(int64_t capture_time_ms,int64_t now_ms,uint32_t ssrc)423 void RtpSenderEgress::UpdateDelayStatistics(int64_t capture_time_ms,
424                                             int64_t now_ms,
425                                             uint32_t ssrc) {
426   if (!send_side_delay_observer_ || capture_time_ms <= 0)
427     return;
428 
429   int avg_delay_ms = 0;
430   int max_delay_ms = 0;
431   uint64_t total_packet_send_delay_ms = 0;
432   {
433     MutexLock lock(&lock_);
434     // Compute the max and average of the recent capture-to-send delays.
435     // The time complexity of the current approach depends on the distribution
436     // of the delay values. This could be done more efficiently.
437 
438     // Remove elements older than kSendSideDelayWindowMs.
439     auto lower_bound =
440         send_delays_.lower_bound(now_ms - kSendSideDelayWindowMs);
441     for (auto it = send_delays_.begin(); it != lower_bound; ++it) {
442       if (max_delay_it_ == it) {
443         max_delay_it_ = send_delays_.end();
444       }
445       sum_delays_ms_ -= it->second;
446     }
447     send_delays_.erase(send_delays_.begin(), lower_bound);
448     if (max_delay_it_ == send_delays_.end()) {
449       // Removed the previous max. Need to recompute.
450       RecomputeMaxSendDelay();
451     }
452 
453     // Add the new element.
454     RTC_DCHECK_GE(now_ms, 0);
455     RTC_DCHECK_LE(now_ms, std::numeric_limits<int64_t>::max() / 2);
456     RTC_DCHECK_GE(capture_time_ms, 0);
457     RTC_DCHECK_LE(capture_time_ms, std::numeric_limits<int64_t>::max() / 2);
458     int64_t diff_ms = now_ms - capture_time_ms;
459     RTC_DCHECK_GE(diff_ms, static_cast<int64_t>(0));
460     RTC_DCHECK_LE(diff_ms, std::numeric_limits<int>::max());
461     int new_send_delay = rtc::dchecked_cast<int>(now_ms - capture_time_ms);
462     SendDelayMap::iterator it;
463     bool inserted;
464     std::tie(it, inserted) =
465         send_delays_.insert(std::make_pair(now_ms, new_send_delay));
466     if (!inserted) {
467       // TODO(terelius): If we have multiple delay measurements during the same
468       // millisecond then we keep the most recent one. It is not clear that this
469       // is the right decision, but it preserves an earlier behavior.
470       int previous_send_delay = it->second;
471       sum_delays_ms_ -= previous_send_delay;
472       it->second = new_send_delay;
473       if (max_delay_it_ == it && new_send_delay < previous_send_delay) {
474         RecomputeMaxSendDelay();
475       }
476     }
477     if (max_delay_it_ == send_delays_.end() ||
478         it->second >= max_delay_it_->second) {
479       max_delay_it_ = it;
480     }
481     sum_delays_ms_ += new_send_delay;
482     total_packet_send_delay_ms_ += new_send_delay;
483     total_packet_send_delay_ms = total_packet_send_delay_ms_;
484 
485     size_t num_delays = send_delays_.size();
486     RTC_DCHECK(max_delay_it_ != send_delays_.end());
487     max_delay_ms = rtc::dchecked_cast<int>(max_delay_it_->second);
488     int64_t avg_ms = (sum_delays_ms_ + num_delays / 2) / num_delays;
489     RTC_DCHECK_GE(avg_ms, static_cast<int64_t>(0));
490     RTC_DCHECK_LE(avg_ms,
491                   static_cast<int64_t>(std::numeric_limits<int>::max()));
492     avg_delay_ms =
493         rtc::dchecked_cast<int>((sum_delays_ms_ + num_delays / 2) / num_delays);
494   }
495   send_side_delay_observer_->SendSideDelayUpdated(
496       avg_delay_ms, max_delay_ms, total_packet_send_delay_ms, ssrc);
497 }
498 
RecomputeMaxSendDelay()499 void RtpSenderEgress::RecomputeMaxSendDelay() {
500   max_delay_it_ = send_delays_.begin();
501   for (auto it = send_delays_.begin(); it != send_delays_.end(); ++it) {
502     if (it->second >= max_delay_it_->second) {
503       max_delay_it_ = it;
504     }
505   }
506 }
507 
UpdateOnSendPacket(int packet_id,int64_t capture_time_ms,uint32_t ssrc)508 void RtpSenderEgress::UpdateOnSendPacket(int packet_id,
509                                          int64_t capture_time_ms,
510                                          uint32_t ssrc) {
511   if (!send_packet_observer_ || capture_time_ms <= 0 || packet_id == -1) {
512     return;
513   }
514 
515   send_packet_observer_->OnSendPacket(packet_id, capture_time_ms, ssrc);
516 }
517 
SendPacketToNetwork(const RtpPacketToSend & packet,const PacketOptions & options,const PacedPacketInfo & pacing_info)518 bool RtpSenderEgress::SendPacketToNetwork(const RtpPacketToSend& packet,
519                                           const PacketOptions& options,
520                                           const PacedPacketInfo& pacing_info) {
521   int bytes_sent = -1;
522   if (transport_) {
523     bytes_sent = transport_->SendRtp(packet.data(), packet.size(), options)
524                      ? static_cast<int>(packet.size())
525                      : -1;
526     if (event_log_ && bytes_sent > 0) {
527       event_log_->Log(std::make_unique<RtcEventRtpPacketOutgoing>(
528           packet, pacing_info.probe_cluster_id));
529     }
530   }
531 
532   if (bytes_sent <= 0) {
533     RTC_LOG(LS_WARNING) << "Transport failed to send packet.";
534     return false;
535   }
536   return true;
537 }
538 
UpdateRtpStats(int64_t now_ms,uint32_t packet_ssrc,RtpPacketMediaType packet_type,RtpPacketCounter counter,size_t packet_size)539 void RtpSenderEgress::UpdateRtpStats(int64_t now_ms,
540                                      uint32_t packet_ssrc,
541                                      RtpPacketMediaType packet_type,
542                                      RtpPacketCounter counter,
543                                      size_t packet_size) {
544   RTC_DCHECK_RUN_ON(worker_queue_);
545 
546   // TODO(bugs.webrtc.org/11581): send_rates_ should be touched only on the
547   // worker thread.
548   RtpSendRates send_rates;
549   {
550     MutexLock lock(&lock_);
551 
552     // TODO(bugs.webrtc.org/11581): make sure rtx_rtp_stats_ and rtp_stats_ are
553     // only touched on the worker thread.
554     StreamDataCounters* counters =
555         packet_ssrc == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_;
556 
557     if (counters->first_packet_time_ms == -1) {
558       counters->first_packet_time_ms = now_ms;
559     }
560 
561     if (packet_type == RtpPacketMediaType::kForwardErrorCorrection) {
562       counters->fec.Add(counter);
563     } else if (packet_type == RtpPacketMediaType::kRetransmission) {
564       counters->retransmitted.Add(counter);
565     }
566     counters->transmitted.Add(counter);
567 
568     send_rates_[static_cast<size_t>(packet_type)].Update(packet_size, now_ms);
569     if (bitrate_callback_) {
570       send_rates = GetSendRatesLocked(now_ms);
571     }
572 
573     if (rtp_stats_callback_) {
574       rtp_stats_callback_->DataCountersUpdated(*counters, packet_ssrc);
575     }
576   }
577 
578   // The bitrate_callback_ and rtp_stats_callback_ pointers in practice point
579   // to the same object, so these callbacks could be consolidated into one.
580   if (bitrate_callback_) {
581     bitrate_callback_->Notify(
582         send_rates.Sum().bps(),
583         send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_);
584   }
585 }
586 
PeriodicUpdate()587 void RtpSenderEgress::PeriodicUpdate() {
588   RTC_DCHECK_RUN_ON(worker_queue_);
589   RTC_DCHECK(bitrate_callback_);
590   RtpSendRates send_rates = GetSendRates();
591   bitrate_callback_->Notify(
592       send_rates.Sum().bps(),
593       send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_);
594 }
595 
596 #if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
BweTestLoggingPlot(int64_t now_ms,uint32_t packet_ssrc)597 void RtpSenderEgress::BweTestLoggingPlot(int64_t now_ms, uint32_t packet_ssrc) {
598   RTC_DCHECK_RUN_ON(worker_queue_);
599 
600   const auto rates = GetSendRates();
601   if (is_audio_) {
602     BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "AudioTotBitrate_kbps", now_ms,
603                                     rates.Sum().kbps(), packet_ssrc);
604     BWE_TEST_LOGGING_PLOT_WITH_SSRC(
605         1, "AudioNackBitrate_kbps", now_ms,
606         rates[RtpPacketMediaType::kRetransmission].kbps(), packet_ssrc);
607   } else {
608     BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "VideoTotBitrate_kbps", now_ms,
609                                     rates.Sum().kbps(), packet_ssrc);
610     BWE_TEST_LOGGING_PLOT_WITH_SSRC(
611         1, "VideoNackBitrate_kbps", now_ms,
612         rates[RtpPacketMediaType::kRetransmission].kbps(), packet_ssrc);
613   }
614 }
615 #endif  // BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
616 
617 }  // namespace webrtc
618