1 /*
2 * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "modules/rtp_rtcp/source/rtp_sender_egress.h"
12
13 #include <algorithm>
14 #include <limits>
15 #include <memory>
16 #include <utility>
17
18 #include "absl/strings/match.h"
19 #include "api/transport/field_trial_based_config.h"
20 #include "logging/rtc_event_log/events/rtc_event_rtp_packet_outgoing.h"
21 #include "rtc_base/logging.h"
22 #include "rtc_base/task_utils/to_queued_task.h"
23
24 namespace webrtc {
25 namespace {
26 constexpr uint32_t kTimestampTicksPerMs = 90;
27 constexpr int kSendSideDelayWindowMs = 1000;
28 constexpr int kBitrateStatisticsWindowMs = 1000;
29 constexpr size_t kRtpSequenceNumberMapMaxEntries = 1 << 13;
30 constexpr TimeDelta kUpdateInterval =
31 TimeDelta::Millis(kBitrateStatisticsWindowMs);
32
IsEnabled(absl::string_view name,const WebRtcKeyValueConfig * field_trials)33 bool IsEnabled(absl::string_view name,
34 const WebRtcKeyValueConfig* field_trials) {
35 FieldTrialBasedConfig default_trials;
36 auto& trials = field_trials ? *field_trials : default_trials;
37 return absl::StartsWith(trials.Lookup(name), "Enabled");
38 }
39 } // namespace
40
NonPacedPacketSender(RtpSenderEgress * sender,SequenceNumberAssigner * sequence_number_assigner)41 RtpSenderEgress::NonPacedPacketSender::NonPacedPacketSender(
42 RtpSenderEgress* sender,
43 SequenceNumberAssigner* sequence_number_assigner)
44 : transport_sequence_number_(0),
45 sender_(sender),
46 sequence_number_assigner_(sequence_number_assigner) {
47 RTC_DCHECK(sequence_number_assigner_);
48 }
49 RtpSenderEgress::NonPacedPacketSender::~NonPacedPacketSender() = default;
50
EnqueuePackets(std::vector<std::unique_ptr<RtpPacketToSend>> packets)51 void RtpSenderEgress::NonPacedPacketSender::EnqueuePackets(
52 std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
53 for (auto& packet : packets) {
54 PrepareForSend(packet.get());
55 sender_->SendPacket(packet.get(), PacedPacketInfo());
56 }
57 auto fec_packets = sender_->FetchFecPackets();
58 if (!fec_packets.empty()) {
59 // Don't generate sequence numbers for flexfec, they are already running on
60 // an internally maintained sequence.
61 const bool generate_sequence_numbers = !sender_->FlexFecSsrc().has_value();
62
63 for (auto& packet : fec_packets) {
64 if (generate_sequence_numbers) {
65 sequence_number_assigner_->AssignSequenceNumber(packet.get());
66 }
67 PrepareForSend(packet.get());
68 }
69 EnqueuePackets(std::move(fec_packets));
70 }
71 }
72
PrepareForSend(RtpPacketToSend * packet)73 void RtpSenderEgress::NonPacedPacketSender::PrepareForSend(
74 RtpPacketToSend* packet) {
75 if (!packet->SetExtension<TransportSequenceNumber>(
76 ++transport_sequence_number_)) {
77 --transport_sequence_number_;
78 }
79 packet->ReserveExtension<TransmissionOffset>();
80 packet->ReserveExtension<AbsoluteSendTime>();
81 }
82
RtpSenderEgress(const RtpRtcpInterface::Configuration & config,RtpPacketHistory * packet_history)83 RtpSenderEgress::RtpSenderEgress(const RtpRtcpInterface::Configuration& config,
84 RtpPacketHistory* packet_history)
85 : worker_queue_(TaskQueueBase::Current()),
86 ssrc_(config.local_media_ssrc),
87 rtx_ssrc_(config.rtx_send_ssrc),
88 flexfec_ssrc_(config.fec_generator ? config.fec_generator->FecSsrc()
89 : absl::nullopt),
90 populate_network2_timestamp_(config.populate_network2_timestamp),
91 send_side_bwe_with_overhead_(
92 IsEnabled("WebRTC-SendSideBwe-WithOverhead", config.field_trials)),
93 clock_(config.clock),
94 packet_history_(packet_history),
95 transport_(config.outgoing_transport),
96 event_log_(config.event_log),
97 #if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
98 is_audio_(config.audio),
99 #endif
100 need_rtp_packet_infos_(config.need_rtp_packet_infos),
101 fec_generator_(
102 IsEnabled("WebRTC-DeferredFecGeneration", config.field_trials)
103 ? config.fec_generator
104 : nullptr),
105 transport_feedback_observer_(config.transport_feedback_callback),
106 send_side_delay_observer_(config.send_side_delay_observer),
107 send_packet_observer_(config.send_packet_observer),
108 rtp_stats_callback_(config.rtp_stats_callback),
109 bitrate_callback_(config.send_bitrate_observer),
110 media_has_been_sent_(false),
111 force_part_of_allocation_(false),
112 timestamp_offset_(0),
113 max_delay_it_(send_delays_.end()),
114 sum_delays_ms_(0),
115 total_packet_send_delay_ms_(0),
116 send_rates_(kNumMediaTypes,
117 {kBitrateStatisticsWindowMs, RateStatistics::kBpsScale}),
118 rtp_sequence_number_map_(need_rtp_packet_infos_
119 ? std::make_unique<RtpSequenceNumberMap>(
120 kRtpSequenceNumberMapMaxEntries)
121 : nullptr) {
122 RTC_DCHECK(worker_queue_);
123 pacer_checker_.Detach();
124 if (bitrate_callback_) {
125 update_task_ = RepeatingTaskHandle::DelayedStart(worker_queue_,
__anonf455135e0202() 126 kUpdateInterval, [this]() {
127 PeriodicUpdate();
128 return kUpdateInterval;
129 });
130 }
131 }
132
~RtpSenderEgress()133 RtpSenderEgress::~RtpSenderEgress() {
134 RTC_DCHECK_RUN_ON(worker_queue_);
135 update_task_.Stop();
136 }
137
SendPacket(RtpPacketToSend * packet,const PacedPacketInfo & pacing_info)138 void RtpSenderEgress::SendPacket(RtpPacketToSend* packet,
139 const PacedPacketInfo& pacing_info) {
140 RTC_DCHECK_RUN_ON(&pacer_checker_);
141 RTC_DCHECK(packet);
142
143 RTC_DCHECK(packet->packet_type().has_value());
144 RTC_DCHECK(HasCorrectSsrc(*packet));
145
146 const uint32_t packet_ssrc = packet->Ssrc();
147 const int64_t now_ms = clock_->TimeInMilliseconds();
148
149 #if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
150 worker_queue_->PostTask(
151 ToQueuedTask(task_safety_, [this, now_ms, packet_ssrc]() {
152 BweTestLoggingPlot(now_ms, packet_ssrc);
153 }));
154 #endif
155
156 if (need_rtp_packet_infos_ &&
157 packet->packet_type() == RtpPacketToSend::Type::kVideo) {
158 worker_queue_->PostTask(ToQueuedTask(
159 task_safety_,
160 [this, packet_timestamp = packet->Timestamp(),
161 is_first_packet_of_frame = packet->is_first_packet_of_frame(),
162 is_last_packet_of_frame = packet->Marker(),
163 sequence_number = packet->SequenceNumber()]() {
164 RTC_DCHECK_RUN_ON(worker_queue_);
165 // Last packet of a frame, add it to sequence number info map.
166 const uint32_t timestamp = packet_timestamp - timestamp_offset_;
167 rtp_sequence_number_map_->InsertPacket(
168 sequence_number,
169 RtpSequenceNumberMap::Info(timestamp, is_first_packet_of_frame,
170 is_last_packet_of_frame));
171 }));
172 }
173
174 if (fec_generator_ && packet->fec_protect_packet()) {
175 // Deferred fec generation is used, add packet to generator.
176 RTC_DCHECK(fec_generator_);
177 RTC_DCHECK(packet->packet_type() == RtpPacketMediaType::kVideo);
178 absl::optional<std::pair<FecProtectionParams, FecProtectionParams>>
179 new_fec_params;
180 {
181 MutexLock lock(&lock_);
182 new_fec_params.swap(pending_fec_params_);
183 }
184 if (new_fec_params) {
185 fec_generator_->SetProtectionParameters(new_fec_params->first,
186 new_fec_params->second);
187 }
188 if (packet->is_red()) {
189 RtpPacketToSend unpacked_packet(*packet);
190
191 const rtc::CopyOnWriteBuffer buffer = packet->Buffer();
192 // Grab media payload type from RED header.
193 const size_t headers_size = packet->headers_size();
194 unpacked_packet.SetPayloadType(buffer[headers_size]);
195
196 // Copy the media payload into the unpacked buffer.
197 uint8_t* payload_buffer =
198 unpacked_packet.SetPayloadSize(packet->payload_size() - 1);
199 std::copy(&packet->payload()[0] + 1,
200 &packet->payload()[0] + packet->payload_size(), payload_buffer);
201
202 fec_generator_->AddPacketAndGenerateFec(unpacked_packet);
203 } else {
204 // If not RED encapsulated - we can just insert packet directly.
205 fec_generator_->AddPacketAndGenerateFec(*packet);
206 }
207 }
208
209 // Bug webrtc:7859. While FEC is invoked from rtp_sender_video, and not after
210 // the pacer, these modifications of the header below are happening after the
211 // FEC protection packets are calculated. This will corrupt recovered packets
212 // at the same place. It's not an issue for extensions, which are present in
213 // all the packets (their content just may be incorrect on recovered packets).
214 // In case of VideoTimingExtension, since it's present not in every packet,
215 // data after rtp header may be corrupted if these packets are protected by
216 // the FEC.
217 int64_t diff_ms = now_ms - packet->capture_time_ms();
218 if (packet->HasExtension<TransmissionOffset>()) {
219 packet->SetExtension<TransmissionOffset>(kTimestampTicksPerMs * diff_ms);
220 }
221 if (packet->HasExtension<AbsoluteSendTime>()) {
222 packet->SetExtension<AbsoluteSendTime>(
223 AbsoluteSendTime::MsTo24Bits(now_ms));
224 }
225
226 if (packet->HasExtension<VideoTimingExtension>()) {
227 if (populate_network2_timestamp_) {
228 packet->set_network2_time_ms(now_ms);
229 } else {
230 packet->set_pacer_exit_time_ms(now_ms);
231 }
232 }
233
234 const bool is_media = packet->packet_type() == RtpPacketMediaType::kAudio ||
235 packet->packet_type() == RtpPacketMediaType::kVideo;
236
237 PacketOptions options;
238 {
239 MutexLock lock(&lock_);
240 options.included_in_allocation = force_part_of_allocation_;
241 }
242
243 // Downstream code actually uses this flag to distinguish between media and
244 // everything else.
245 options.is_retransmit = !is_media;
246 if (auto packet_id = packet->GetExtension<TransportSequenceNumber>()) {
247 options.packet_id = *packet_id;
248 options.included_in_feedback = true;
249 options.included_in_allocation = true;
250 AddPacketToTransportFeedback(*packet_id, *packet, pacing_info);
251 }
252
253 options.application_data.assign(packet->application_data().begin(),
254 packet->application_data().end());
255
256 if (packet->packet_type() != RtpPacketMediaType::kPadding &&
257 packet->packet_type() != RtpPacketMediaType::kRetransmission) {
258 UpdateDelayStatistics(packet->capture_time_ms(), now_ms, packet_ssrc);
259 UpdateOnSendPacket(options.packet_id, packet->capture_time_ms(),
260 packet_ssrc);
261 }
262
263 const bool send_success = SendPacketToNetwork(*packet, options, pacing_info);
264
265 // Put packet in retransmission history or update pending status even if
266 // actual sending fails.
267 if (is_media && packet->allow_retransmission()) {
268 packet_history_->PutRtpPacket(std::make_unique<RtpPacketToSend>(*packet),
269 now_ms);
270 } else if (packet->retransmitted_sequence_number()) {
271 packet_history_->MarkPacketAsSent(*packet->retransmitted_sequence_number());
272 }
273
274 if (send_success) {
275 // |media_has_been_sent_| is used by RTPSender to figure out if it can send
276 // padding in the absence of transport-cc or abs-send-time.
277 // In those cases media must be sent first to set a reference timestamp.
278 media_has_been_sent_ = true;
279
280 // TODO(sprang): Add support for FEC protecting all header extensions, add
281 // media packet to generator here instead.
282
283 RTC_DCHECK(packet->packet_type().has_value());
284 RtpPacketMediaType packet_type = *packet->packet_type();
285 RtpPacketCounter counter(*packet);
286 size_t size = packet->size();
287 worker_queue_->PostTask(
288 ToQueuedTask(task_safety_, [this, now_ms, packet_ssrc, packet_type,
289 counter = std::move(counter), size]() {
290 RTC_DCHECK_RUN_ON(worker_queue_);
291 UpdateRtpStats(now_ms, packet_ssrc, packet_type, std::move(counter),
292 size);
293 }));
294 }
295 }
296
GetSendRates() const297 RtpSendRates RtpSenderEgress::GetSendRates() const {
298 MutexLock lock(&lock_);
299 const int64_t now_ms = clock_->TimeInMilliseconds();
300 return GetSendRatesLocked(now_ms);
301 }
302
GetSendRatesLocked(int64_t now_ms) const303 RtpSendRates RtpSenderEgress::GetSendRatesLocked(int64_t now_ms) const {
304 RtpSendRates current_rates;
305 for (size_t i = 0; i < kNumMediaTypes; ++i) {
306 RtpPacketMediaType type = static_cast<RtpPacketMediaType>(i);
307 current_rates[type] =
308 DataRate::BitsPerSec(send_rates_[i].Rate(now_ms).value_or(0));
309 }
310 return current_rates;
311 }
312
GetDataCounters(StreamDataCounters * rtp_stats,StreamDataCounters * rtx_stats) const313 void RtpSenderEgress::GetDataCounters(StreamDataCounters* rtp_stats,
314 StreamDataCounters* rtx_stats) const {
315 // TODO(bugs.webrtc.org/11581): make sure rtx_rtp_stats_ and rtp_stats_ are
316 // only touched on the worker thread.
317 MutexLock lock(&lock_);
318 *rtp_stats = rtp_stats_;
319 *rtx_stats = rtx_rtp_stats_;
320 }
321
ForceIncludeSendPacketsInAllocation(bool part_of_allocation)322 void RtpSenderEgress::ForceIncludeSendPacketsInAllocation(
323 bool part_of_allocation) {
324 MutexLock lock(&lock_);
325 force_part_of_allocation_ = part_of_allocation;
326 }
327
MediaHasBeenSent() const328 bool RtpSenderEgress::MediaHasBeenSent() const {
329 RTC_DCHECK_RUN_ON(&pacer_checker_);
330 return media_has_been_sent_;
331 }
332
SetMediaHasBeenSent(bool media_sent)333 void RtpSenderEgress::SetMediaHasBeenSent(bool media_sent) {
334 RTC_DCHECK_RUN_ON(&pacer_checker_);
335 media_has_been_sent_ = media_sent;
336 }
337
SetTimestampOffset(uint32_t timestamp)338 void RtpSenderEgress::SetTimestampOffset(uint32_t timestamp) {
339 RTC_DCHECK_RUN_ON(worker_queue_);
340 timestamp_offset_ = timestamp;
341 }
342
GetSentRtpPacketInfos(rtc::ArrayView<const uint16_t> sequence_numbers) const343 std::vector<RtpSequenceNumberMap::Info> RtpSenderEgress::GetSentRtpPacketInfos(
344 rtc::ArrayView<const uint16_t> sequence_numbers) const {
345 RTC_DCHECK_RUN_ON(worker_queue_);
346 RTC_DCHECK(!sequence_numbers.empty());
347 if (!need_rtp_packet_infos_) {
348 return std::vector<RtpSequenceNumberMap::Info>();
349 }
350
351 std::vector<RtpSequenceNumberMap::Info> results;
352 results.reserve(sequence_numbers.size());
353
354 for (uint16_t sequence_number : sequence_numbers) {
355 const auto& info = rtp_sequence_number_map_->Get(sequence_number);
356 if (!info) {
357 // The empty vector will be returned. We can delay the clearing
358 // of the vector until after we exit the critical section.
359 return std::vector<RtpSequenceNumberMap::Info>();
360 }
361 results.push_back(*info);
362 }
363
364 return results;
365 }
366
SetFecProtectionParameters(const FecProtectionParams & delta_params,const FecProtectionParams & key_params)367 void RtpSenderEgress::SetFecProtectionParameters(
368 const FecProtectionParams& delta_params,
369 const FecProtectionParams& key_params) {
370 // TODO(sprang): Post task to pacer queue instead, one pacer is fully
371 // migrated to a task queue.
372 MutexLock lock(&lock_);
373 pending_fec_params_.emplace(delta_params, key_params);
374 }
375
376 std::vector<std::unique_ptr<RtpPacketToSend>>
FetchFecPackets()377 RtpSenderEgress::FetchFecPackets() {
378 RTC_DCHECK_RUN_ON(&pacer_checker_);
379 if (fec_generator_) {
380 return fec_generator_->GetFecPackets();
381 }
382 return {};
383 }
384
HasCorrectSsrc(const RtpPacketToSend & packet) const385 bool RtpSenderEgress::HasCorrectSsrc(const RtpPacketToSend& packet) const {
386 switch (*packet.packet_type()) {
387 case RtpPacketMediaType::kAudio:
388 case RtpPacketMediaType::kVideo:
389 return packet.Ssrc() == ssrc_;
390 case RtpPacketMediaType::kRetransmission:
391 case RtpPacketMediaType::kPadding:
392 // Both padding and retransmission must be on either the media or the
393 // RTX stream.
394 return packet.Ssrc() == rtx_ssrc_ || packet.Ssrc() == ssrc_;
395 case RtpPacketMediaType::kForwardErrorCorrection:
396 // FlexFEC is on separate SSRC, ULPFEC uses media SSRC.
397 return packet.Ssrc() == ssrc_ || packet.Ssrc() == flexfec_ssrc_;
398 }
399 return false;
400 }
401
AddPacketToTransportFeedback(uint16_t packet_id,const RtpPacketToSend & packet,const PacedPacketInfo & pacing_info)402 void RtpSenderEgress::AddPacketToTransportFeedback(
403 uint16_t packet_id,
404 const RtpPacketToSend& packet,
405 const PacedPacketInfo& pacing_info) {
406 if (transport_feedback_observer_) {
407 size_t packet_size = packet.payload_size() + packet.padding_size();
408 if (send_side_bwe_with_overhead_) {
409 packet_size = packet.size();
410 }
411
412 RtpPacketSendInfo packet_info;
413 packet_info.ssrc = ssrc_;
414 packet_info.transport_sequence_number = packet_id;
415 packet_info.rtp_sequence_number = packet.SequenceNumber();
416 packet_info.length = packet_size;
417 packet_info.pacing_info = pacing_info;
418 packet_info.packet_type = packet.packet_type();
419 transport_feedback_observer_->OnAddPacket(packet_info);
420 }
421 }
422
UpdateDelayStatistics(int64_t capture_time_ms,int64_t now_ms,uint32_t ssrc)423 void RtpSenderEgress::UpdateDelayStatistics(int64_t capture_time_ms,
424 int64_t now_ms,
425 uint32_t ssrc) {
426 if (!send_side_delay_observer_ || capture_time_ms <= 0)
427 return;
428
429 int avg_delay_ms = 0;
430 int max_delay_ms = 0;
431 uint64_t total_packet_send_delay_ms = 0;
432 {
433 MutexLock lock(&lock_);
434 // Compute the max and average of the recent capture-to-send delays.
435 // The time complexity of the current approach depends on the distribution
436 // of the delay values. This could be done more efficiently.
437
438 // Remove elements older than kSendSideDelayWindowMs.
439 auto lower_bound =
440 send_delays_.lower_bound(now_ms - kSendSideDelayWindowMs);
441 for (auto it = send_delays_.begin(); it != lower_bound; ++it) {
442 if (max_delay_it_ == it) {
443 max_delay_it_ = send_delays_.end();
444 }
445 sum_delays_ms_ -= it->second;
446 }
447 send_delays_.erase(send_delays_.begin(), lower_bound);
448 if (max_delay_it_ == send_delays_.end()) {
449 // Removed the previous max. Need to recompute.
450 RecomputeMaxSendDelay();
451 }
452
453 // Add the new element.
454 RTC_DCHECK_GE(now_ms, 0);
455 RTC_DCHECK_LE(now_ms, std::numeric_limits<int64_t>::max() / 2);
456 RTC_DCHECK_GE(capture_time_ms, 0);
457 RTC_DCHECK_LE(capture_time_ms, std::numeric_limits<int64_t>::max() / 2);
458 int64_t diff_ms = now_ms - capture_time_ms;
459 RTC_DCHECK_GE(diff_ms, static_cast<int64_t>(0));
460 RTC_DCHECK_LE(diff_ms, std::numeric_limits<int>::max());
461 int new_send_delay = rtc::dchecked_cast<int>(now_ms - capture_time_ms);
462 SendDelayMap::iterator it;
463 bool inserted;
464 std::tie(it, inserted) =
465 send_delays_.insert(std::make_pair(now_ms, new_send_delay));
466 if (!inserted) {
467 // TODO(terelius): If we have multiple delay measurements during the same
468 // millisecond then we keep the most recent one. It is not clear that this
469 // is the right decision, but it preserves an earlier behavior.
470 int previous_send_delay = it->second;
471 sum_delays_ms_ -= previous_send_delay;
472 it->second = new_send_delay;
473 if (max_delay_it_ == it && new_send_delay < previous_send_delay) {
474 RecomputeMaxSendDelay();
475 }
476 }
477 if (max_delay_it_ == send_delays_.end() ||
478 it->second >= max_delay_it_->second) {
479 max_delay_it_ = it;
480 }
481 sum_delays_ms_ += new_send_delay;
482 total_packet_send_delay_ms_ += new_send_delay;
483 total_packet_send_delay_ms = total_packet_send_delay_ms_;
484
485 size_t num_delays = send_delays_.size();
486 RTC_DCHECK(max_delay_it_ != send_delays_.end());
487 max_delay_ms = rtc::dchecked_cast<int>(max_delay_it_->second);
488 int64_t avg_ms = (sum_delays_ms_ + num_delays / 2) / num_delays;
489 RTC_DCHECK_GE(avg_ms, static_cast<int64_t>(0));
490 RTC_DCHECK_LE(avg_ms,
491 static_cast<int64_t>(std::numeric_limits<int>::max()));
492 avg_delay_ms =
493 rtc::dchecked_cast<int>((sum_delays_ms_ + num_delays / 2) / num_delays);
494 }
495 send_side_delay_observer_->SendSideDelayUpdated(
496 avg_delay_ms, max_delay_ms, total_packet_send_delay_ms, ssrc);
497 }
498
RecomputeMaxSendDelay()499 void RtpSenderEgress::RecomputeMaxSendDelay() {
500 max_delay_it_ = send_delays_.begin();
501 for (auto it = send_delays_.begin(); it != send_delays_.end(); ++it) {
502 if (it->second >= max_delay_it_->second) {
503 max_delay_it_ = it;
504 }
505 }
506 }
507
UpdateOnSendPacket(int packet_id,int64_t capture_time_ms,uint32_t ssrc)508 void RtpSenderEgress::UpdateOnSendPacket(int packet_id,
509 int64_t capture_time_ms,
510 uint32_t ssrc) {
511 if (!send_packet_observer_ || capture_time_ms <= 0 || packet_id == -1) {
512 return;
513 }
514
515 send_packet_observer_->OnSendPacket(packet_id, capture_time_ms, ssrc);
516 }
517
SendPacketToNetwork(const RtpPacketToSend & packet,const PacketOptions & options,const PacedPacketInfo & pacing_info)518 bool RtpSenderEgress::SendPacketToNetwork(const RtpPacketToSend& packet,
519 const PacketOptions& options,
520 const PacedPacketInfo& pacing_info) {
521 int bytes_sent = -1;
522 if (transport_) {
523 bytes_sent = transport_->SendRtp(packet.data(), packet.size(), options)
524 ? static_cast<int>(packet.size())
525 : -1;
526 if (event_log_ && bytes_sent > 0) {
527 event_log_->Log(std::make_unique<RtcEventRtpPacketOutgoing>(
528 packet, pacing_info.probe_cluster_id));
529 }
530 }
531
532 if (bytes_sent <= 0) {
533 RTC_LOG(LS_WARNING) << "Transport failed to send packet.";
534 return false;
535 }
536 return true;
537 }
538
UpdateRtpStats(int64_t now_ms,uint32_t packet_ssrc,RtpPacketMediaType packet_type,RtpPacketCounter counter,size_t packet_size)539 void RtpSenderEgress::UpdateRtpStats(int64_t now_ms,
540 uint32_t packet_ssrc,
541 RtpPacketMediaType packet_type,
542 RtpPacketCounter counter,
543 size_t packet_size) {
544 RTC_DCHECK_RUN_ON(worker_queue_);
545
546 // TODO(bugs.webrtc.org/11581): send_rates_ should be touched only on the
547 // worker thread.
548 RtpSendRates send_rates;
549 {
550 MutexLock lock(&lock_);
551
552 // TODO(bugs.webrtc.org/11581): make sure rtx_rtp_stats_ and rtp_stats_ are
553 // only touched on the worker thread.
554 StreamDataCounters* counters =
555 packet_ssrc == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_;
556
557 if (counters->first_packet_time_ms == -1) {
558 counters->first_packet_time_ms = now_ms;
559 }
560
561 if (packet_type == RtpPacketMediaType::kForwardErrorCorrection) {
562 counters->fec.Add(counter);
563 } else if (packet_type == RtpPacketMediaType::kRetransmission) {
564 counters->retransmitted.Add(counter);
565 }
566 counters->transmitted.Add(counter);
567
568 send_rates_[static_cast<size_t>(packet_type)].Update(packet_size, now_ms);
569 if (bitrate_callback_) {
570 send_rates = GetSendRatesLocked(now_ms);
571 }
572
573 if (rtp_stats_callback_) {
574 rtp_stats_callback_->DataCountersUpdated(*counters, packet_ssrc);
575 }
576 }
577
578 // The bitrate_callback_ and rtp_stats_callback_ pointers in practice point
579 // to the same object, so these callbacks could be consolidated into one.
580 if (bitrate_callback_) {
581 bitrate_callback_->Notify(
582 send_rates.Sum().bps(),
583 send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_);
584 }
585 }
586
PeriodicUpdate()587 void RtpSenderEgress::PeriodicUpdate() {
588 RTC_DCHECK_RUN_ON(worker_queue_);
589 RTC_DCHECK(bitrate_callback_);
590 RtpSendRates send_rates = GetSendRates();
591 bitrate_callback_->Notify(
592 send_rates.Sum().bps(),
593 send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_);
594 }
595
596 #if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
BweTestLoggingPlot(int64_t now_ms,uint32_t packet_ssrc)597 void RtpSenderEgress::BweTestLoggingPlot(int64_t now_ms, uint32_t packet_ssrc) {
598 RTC_DCHECK_RUN_ON(worker_queue_);
599
600 const auto rates = GetSendRates();
601 if (is_audio_) {
602 BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "AudioTotBitrate_kbps", now_ms,
603 rates.Sum().kbps(), packet_ssrc);
604 BWE_TEST_LOGGING_PLOT_WITH_SSRC(
605 1, "AudioNackBitrate_kbps", now_ms,
606 rates[RtpPacketMediaType::kRetransmission].kbps(), packet_ssrc);
607 } else {
608 BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "VideoTotBitrate_kbps", now_ms,
609 rates.Sum().kbps(), packet_ssrc);
610 BWE_TEST_LOGGING_PLOT_WITH_SSRC(
611 1, "VideoNackBitrate_kbps", now_ms,
612 rates[RtpPacketMediaType::kRetransmission].kbps(), packet_ssrc);
613 }
614 }
615 #endif // BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
616
617 } // namespace webrtc
618