1 /*
2  *  Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 #include "call/rtp_transport_controller_send.h"
11 
12 #include <memory>
13 #include <utility>
14 #include <vector>
15 
16 #include "absl/strings/match.h"
17 #include "absl/types/optional.h"
18 #include "api/transport/goog_cc_factory.h"
19 #include "api/transport/network_types.h"
20 #include "api/units/data_rate.h"
21 #include "api/units/time_delta.h"
22 #include "api/units/timestamp.h"
23 #include "call/rtp_video_sender.h"
24 #include "logging/rtc_event_log/events/rtc_event_remote_estimate.h"
25 #include "logging/rtc_event_log/events/rtc_event_route_change.h"
26 #include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h"
27 #include "rtc_base/checks.h"
28 #include "rtc_base/logging.h"
29 #include "rtc_base/rate_limiter.h"
30 
31 namespace webrtc {
32 namespace {
33 static const int64_t kRetransmitWindowSizeMs = 500;
34 static const size_t kMaxOverheadBytes = 500;
35 
36 constexpr TimeDelta kPacerQueueUpdateInterval = TimeDelta::Millis(25);
37 
ConvertConstraints(int min_bitrate_bps,int max_bitrate_bps,int start_bitrate_bps,Clock * clock)38 TargetRateConstraints ConvertConstraints(int min_bitrate_bps,
39                                          int max_bitrate_bps,
40                                          int start_bitrate_bps,
41                                          Clock* clock) {
42   TargetRateConstraints msg;
43   msg.at_time = Timestamp::Millis(clock->TimeInMilliseconds());
44   msg.min_data_rate = min_bitrate_bps >= 0
45                           ? DataRate::BitsPerSec(min_bitrate_bps)
46                           : DataRate::Zero();
47   msg.max_data_rate = max_bitrate_bps > 0
48                           ? DataRate::BitsPerSec(max_bitrate_bps)
49                           : DataRate::Infinity();
50   if (start_bitrate_bps > 0)
51     msg.starting_rate = DataRate::BitsPerSec(start_bitrate_bps);
52   return msg;
53 }
54 
ConvertConstraints(const BitrateConstraints & contraints,Clock * clock)55 TargetRateConstraints ConvertConstraints(const BitrateConstraints& contraints,
56                                          Clock* clock) {
57   return ConvertConstraints(contraints.min_bitrate_bps,
58                             contraints.max_bitrate_bps,
59                             contraints.start_bitrate_bps, clock);
60 }
61 
IsEnabled(const WebRtcKeyValueConfig * trials,absl::string_view key)62 bool IsEnabled(const WebRtcKeyValueConfig* trials, absl::string_view key) {
63   RTC_DCHECK(trials != nullptr);
64   return absl::StartsWith(trials->Lookup(key), "Enabled");
65 }
66 
IsRelayed(const rtc::NetworkRoute & route)67 bool IsRelayed(const rtc::NetworkRoute& route) {
68   return route.local.uses_turn() || route.remote.uses_turn();
69 }
70 
71 }  // namespace
72 
RtpTransportControllerSend(Clock * clock,webrtc::RtcEventLog * event_log,NetworkStatePredictorFactoryInterface * predictor_factory,NetworkControllerFactoryInterface * controller_factory,const BitrateConstraints & bitrate_config,std::unique_ptr<ProcessThread> process_thread,TaskQueueFactory * task_queue_factory,const WebRtcKeyValueConfig * trials)73 RtpTransportControllerSend::RtpTransportControllerSend(
74     Clock* clock,
75     webrtc::RtcEventLog* event_log,
76     NetworkStatePredictorFactoryInterface* predictor_factory,
77     NetworkControllerFactoryInterface* controller_factory,
78     const BitrateConstraints& bitrate_config,
79     std::unique_ptr<ProcessThread> process_thread,
80     TaskQueueFactory* task_queue_factory,
81     const WebRtcKeyValueConfig* trials)
82     : clock_(clock),
83       event_log_(event_log),
84       bitrate_configurator_(bitrate_config),
85       process_thread_(std::move(process_thread)),
86       use_task_queue_pacer_(IsEnabled(trials, "WebRTC-TaskQueuePacer")),
87       process_thread_pacer_(use_task_queue_pacer_
88                                 ? nullptr
89                                 : new PacedSender(clock,
90                                                   &packet_router_,
91                                                   event_log,
92                                                   trials,
93                                                   process_thread_.get())),
94       task_queue_pacer_(
95           use_task_queue_pacer_
96               ? new TaskQueuePacedSender(
97                     clock,
98                     &packet_router_,
99                     event_log,
100                     trials,
101                     task_queue_factory,
102                     /*hold_back_window = */ PacingController::kMinSleepTime)
103               : nullptr),
104       observer_(nullptr),
105       controller_factory_override_(controller_factory),
106       controller_factory_fallback_(
107           std::make_unique<GoogCcNetworkControllerFactory>(predictor_factory)),
108       process_interval_(controller_factory_fallback_->GetProcessInterval()),
109       last_report_block_time_(Timestamp::Millis(clock_->TimeInMilliseconds())),
110       reset_feedback_on_route_change_(
111           !IsEnabled(trials, "WebRTC-Bwe-NoFeedbackReset")),
112       send_side_bwe_with_overhead_(
113           IsEnabled(trials, "WebRTC-SendSideBwe-WithOverhead")),
114       add_pacing_to_cwin_(
115           IsEnabled(trials, "WebRTC-AddPacingToCongestionWindowPushback")),
116       relay_bandwidth_cap_("relay_cap", DataRate::PlusInfinity()),
117       transport_overhead_bytes_per_packet_(0),
118       network_available_(false),
119       retransmission_rate_limiter_(clock, kRetransmitWindowSizeMs),
120       task_queue_(task_queue_factory->CreateTaskQueue(
121           "rtp_send_controller",
122           TaskQueueFactory::Priority::NORMAL)) {
123   ParseFieldTrial({&relay_bandwidth_cap_},
124                   trials->Lookup("WebRTC-Bwe-NetworkRouteConstraints"));
125   initial_config_.constraints = ConvertConstraints(bitrate_config, clock_);
126   initial_config_.event_log = event_log;
127   initial_config_.key_value_config = trials;
128   RTC_DCHECK(bitrate_config.start_bitrate_bps > 0);
129 
130   pacer()->SetPacingRates(
131       DataRate::BitsPerSec(bitrate_config.start_bitrate_bps), DataRate::Zero());
132 
133   if (!use_task_queue_pacer_) {
134     process_thread_->Start();
135   }
136 }
137 
~RtpTransportControllerSend()138 RtpTransportControllerSend::~RtpTransportControllerSend() {
139   if (!use_task_queue_pacer_) {
140     process_thread_->Stop();
141   }
142 }
143 
CreateRtpVideoSender(std::map<uint32_t,RtpState> suspended_ssrcs,const std::map<uint32_t,RtpPayloadState> & states,const RtpConfig & rtp_config,int rtcp_report_interval_ms,Transport * send_transport,const RtpSenderObservers & observers,RtcEventLog * event_log,std::unique_ptr<FecController> fec_controller,const RtpSenderFrameEncryptionConfig & frame_encryption_config,rtc::scoped_refptr<FrameTransformerInterface> frame_transformer)144 RtpVideoSenderInterface* RtpTransportControllerSend::CreateRtpVideoSender(
145     std::map<uint32_t, RtpState> suspended_ssrcs,
146     const std::map<uint32_t, RtpPayloadState>& states,
147     const RtpConfig& rtp_config,
148     int rtcp_report_interval_ms,
149     Transport* send_transport,
150     const RtpSenderObservers& observers,
151     RtcEventLog* event_log,
152     std::unique_ptr<FecController> fec_controller,
153     const RtpSenderFrameEncryptionConfig& frame_encryption_config,
154     rtc::scoped_refptr<FrameTransformerInterface> frame_transformer) {
155   video_rtp_senders_.push_back(std::make_unique<RtpVideoSender>(
156       clock_, suspended_ssrcs, states, rtp_config, rtcp_report_interval_ms,
157       send_transport, observers,
158       // TODO(holmer): Remove this circular dependency by injecting
159       // the parts of RtpTransportControllerSendInterface that are really used.
160       this, event_log, &retransmission_rate_limiter_, std::move(fec_controller),
161       frame_encryption_config.frame_encryptor,
162       frame_encryption_config.crypto_options, std::move(frame_transformer)));
163   return video_rtp_senders_.back().get();
164 }
165 
DestroyRtpVideoSender(RtpVideoSenderInterface * rtp_video_sender)166 void RtpTransportControllerSend::DestroyRtpVideoSender(
167     RtpVideoSenderInterface* rtp_video_sender) {
168   std::vector<std::unique_ptr<RtpVideoSenderInterface>>::iterator it =
169       video_rtp_senders_.end();
170   for (it = video_rtp_senders_.begin(); it != video_rtp_senders_.end(); ++it) {
171     if (it->get() == rtp_video_sender) {
172       break;
173     }
174   }
175   RTC_DCHECK(it != video_rtp_senders_.end());
176   video_rtp_senders_.erase(it);
177 }
178 
UpdateControlState()179 void RtpTransportControllerSend::UpdateControlState() {
180   absl::optional<TargetTransferRate> update = control_handler_->GetUpdate();
181   if (!update)
182     return;
183   retransmission_rate_limiter_.SetMaxRate(update->target_rate.bps());
184   // We won't create control_handler_ until we have an observers.
185   RTC_DCHECK(observer_ != nullptr);
186   observer_->OnTargetTransferRate(*update);
187 }
188 
pacer()189 RtpPacketPacer* RtpTransportControllerSend::pacer() {
190   if (use_task_queue_pacer_) {
191     return task_queue_pacer_.get();
192   }
193   return process_thread_pacer_.get();
194 }
195 
pacer() const196 const RtpPacketPacer* RtpTransportControllerSend::pacer() const {
197   if (use_task_queue_pacer_) {
198     return task_queue_pacer_.get();
199   }
200   return process_thread_pacer_.get();
201 }
202 
GetWorkerQueue()203 rtc::TaskQueue* RtpTransportControllerSend::GetWorkerQueue() {
204   return &task_queue_;
205 }
206 
packet_router()207 PacketRouter* RtpTransportControllerSend::packet_router() {
208   return &packet_router_;
209 }
210 
211 NetworkStateEstimateObserver*
network_state_estimate_observer()212 RtpTransportControllerSend::network_state_estimate_observer() {
213   return this;
214 }
215 
216 TransportFeedbackObserver*
transport_feedback_observer()217 RtpTransportControllerSend::transport_feedback_observer() {
218   return this;
219 }
220 
packet_sender()221 RtpPacketSender* RtpTransportControllerSend::packet_sender() {
222   if (use_task_queue_pacer_) {
223     return task_queue_pacer_.get();
224   }
225   return process_thread_pacer_.get();
226 }
227 
SetAllocatedSendBitrateLimits(BitrateAllocationLimits limits)228 void RtpTransportControllerSend::SetAllocatedSendBitrateLimits(
229     BitrateAllocationLimits limits) {
230   RTC_DCHECK_RUN_ON(&task_queue_);
231   streams_config_.min_total_allocated_bitrate = limits.min_allocatable_rate;
232   streams_config_.max_padding_rate = limits.max_padding_rate;
233   streams_config_.max_total_allocated_bitrate = limits.max_allocatable_rate;
234   UpdateStreamsConfig();
235 }
SetPacingFactor(float pacing_factor)236 void RtpTransportControllerSend::SetPacingFactor(float pacing_factor) {
237   RTC_DCHECK_RUN_ON(&task_queue_);
238   streams_config_.pacing_factor = pacing_factor;
239   UpdateStreamsConfig();
240 }
SetQueueTimeLimit(int limit_ms)241 void RtpTransportControllerSend::SetQueueTimeLimit(int limit_ms) {
242   pacer()->SetQueueTimeLimit(TimeDelta::Millis(limit_ms));
243 }
244 StreamFeedbackProvider*
GetStreamFeedbackProvider()245 RtpTransportControllerSend::GetStreamFeedbackProvider() {
246   return &feedback_demuxer_;
247 }
248 
RegisterTargetTransferRateObserver(TargetTransferRateObserver * observer)249 void RtpTransportControllerSend::RegisterTargetTransferRateObserver(
250     TargetTransferRateObserver* observer) {
251   task_queue_.PostTask([this, observer] {
252     RTC_DCHECK_RUN_ON(&task_queue_);
253     RTC_DCHECK(observer_ == nullptr);
254     observer_ = observer;
255     observer_->OnStartRateUpdate(*initial_config_.constraints.starting_rate);
256     MaybeCreateControllers();
257   });
258 }
259 
IsRelevantRouteChange(const rtc::NetworkRoute & old_route,const rtc::NetworkRoute & new_route) const260 bool RtpTransportControllerSend::IsRelevantRouteChange(
261     const rtc::NetworkRoute& old_route,
262     const rtc::NetworkRoute& new_route) const {
263   // TODO(bugs.webrtc.org/11438): Experiment with using more information/
264   // other conditions.
265   bool connected_changed = old_route.connected != new_route.connected;
266   bool route_ids_changed =
267       old_route.local.network_id() != new_route.local.network_id() ||
268       old_route.remote.network_id() != new_route.remote.network_id();
269   if (relay_bandwidth_cap_->IsFinite()) {
270     bool relaying_changed = IsRelayed(old_route) != IsRelayed(new_route);
271     return connected_changed || route_ids_changed || relaying_changed;
272   } else {
273     return connected_changed || route_ids_changed;
274   }
275 }
276 
OnNetworkRouteChanged(const std::string & transport_name,const rtc::NetworkRoute & network_route)277 void RtpTransportControllerSend::OnNetworkRouteChanged(
278     const std::string& transport_name,
279     const rtc::NetworkRoute& network_route) {
280   // Check if the network route is connected.
281 
282   if (!network_route.connected) {
283     // TODO(honghaiz): Perhaps handle this in SignalChannelNetworkState and
284     // consider merging these two methods.
285     return;
286   }
287 
288   absl::optional<BitrateConstraints> relay_constraint_update =
289       ApplyOrLiftRelayCap(IsRelayed(network_route));
290 
291   // Check whether the network route has changed on each transport.
292   auto result =
293       network_routes_.insert(std::make_pair(transport_name, network_route));
294   auto kv = result.first;
295   bool inserted = result.second;
296   if (inserted || !(kv->second == network_route)) {
297     RTC_LOG(LS_INFO) << "Network route changed on transport " << transport_name
298                      << ": new_route = " << network_route.DebugString();
299     if (!inserted) {
300       RTC_LOG(LS_INFO) << "old_route = " << kv->second.DebugString();
301     }
302   }
303 
304   if (inserted) {
305     if (relay_constraint_update.has_value()) {
306       UpdateBitrateConstraints(*relay_constraint_update);
307     }
308     task_queue_.PostTask([this, network_route] {
309       RTC_DCHECK_RUN_ON(&task_queue_);
310       transport_overhead_bytes_per_packet_ = network_route.packet_overhead;
311     });
312     // No need to reset BWE if this is the first time the network connects.
313     return;
314   }
315 
316   const rtc::NetworkRoute old_route = kv->second;
317   kv->second = network_route;
318 
319   // Check if enough conditions of the new/old route has changed
320   // to trigger resetting of bitrates (and a probe).
321   if (IsRelevantRouteChange(old_route, network_route)) {
322     BitrateConstraints bitrate_config = bitrate_configurator_.GetConfig();
323     RTC_LOG(LS_INFO) << "Reset bitrates to min: "
324                      << bitrate_config.min_bitrate_bps
325                      << " bps, start: " << bitrate_config.start_bitrate_bps
326                      << " bps,  max: " << bitrate_config.max_bitrate_bps
327                      << " bps.";
328     RTC_DCHECK_GT(bitrate_config.start_bitrate_bps, 0);
329 
330     if (event_log_) {
331       event_log_->Log(std::make_unique<RtcEventRouteChange>(
332           network_route.connected, network_route.packet_overhead));
333     }
334     NetworkRouteChange msg;
335     msg.at_time = Timestamp::Millis(clock_->TimeInMilliseconds());
336     msg.constraints = ConvertConstraints(bitrate_config, clock_);
337     task_queue_.PostTask([this, msg, network_route] {
338       RTC_DCHECK_RUN_ON(&task_queue_);
339       transport_overhead_bytes_per_packet_ = network_route.packet_overhead;
340       if (reset_feedback_on_route_change_) {
341         transport_feedback_adapter_.SetNetworkRoute(network_route);
342       }
343       if (controller_) {
344         PostUpdates(controller_->OnNetworkRouteChange(msg));
345       } else {
346         UpdateInitialConstraints(msg.constraints);
347       }
348       pacer()->UpdateOutstandingData(DataSize::Zero());
349     });
350   }
351 }
OnNetworkAvailability(bool network_available)352 void RtpTransportControllerSend::OnNetworkAvailability(bool network_available) {
353   RTC_LOG(LS_VERBOSE) << "SignalNetworkState "
354                       << (network_available ? "Up" : "Down");
355   NetworkAvailability msg;
356   msg.at_time = Timestamp::Millis(clock_->TimeInMilliseconds());
357   msg.network_available = network_available;
358   task_queue_.PostTask([this, msg]() {
359     RTC_DCHECK_RUN_ON(&task_queue_);
360     if (network_available_ == msg.network_available)
361       return;
362     network_available_ = msg.network_available;
363     if (network_available_) {
364       pacer()->Resume();
365     } else {
366       pacer()->Pause();
367     }
368     pacer()->UpdateOutstandingData(DataSize::Zero());
369 
370     if (controller_) {
371       control_handler_->SetNetworkAvailability(network_available_);
372       PostUpdates(controller_->OnNetworkAvailability(msg));
373       UpdateControlState();
374     } else {
375       MaybeCreateControllers();
376     }
377   });
378 
379   for (auto& rtp_sender : video_rtp_senders_) {
380     rtp_sender->OnNetworkAvailability(network_available);
381   }
382 }
GetBandwidthObserver()383 RtcpBandwidthObserver* RtpTransportControllerSend::GetBandwidthObserver() {
384   return this;
385 }
GetPacerQueuingDelayMs() const386 int64_t RtpTransportControllerSend::GetPacerQueuingDelayMs() const {
387   return pacer()->OldestPacketWaitTime().ms();
388 }
GetFirstPacketTime() const389 absl::optional<Timestamp> RtpTransportControllerSend::GetFirstPacketTime()
390     const {
391   return pacer()->FirstSentPacketTime();
392 }
EnablePeriodicAlrProbing(bool enable)393 void RtpTransportControllerSend::EnablePeriodicAlrProbing(bool enable) {
394   task_queue_.PostTask([this, enable]() {
395     RTC_DCHECK_RUN_ON(&task_queue_);
396     streams_config_.requests_alr_probing = enable;
397     UpdateStreamsConfig();
398   });
399 }
OnSentPacket(const rtc::SentPacket & sent_packet)400 void RtpTransportControllerSend::OnSentPacket(
401     const rtc::SentPacket& sent_packet) {
402   task_queue_.PostTask([this, sent_packet]() {
403     RTC_DCHECK_RUN_ON(&task_queue_);
404     absl::optional<SentPacket> packet_msg =
405         transport_feedback_adapter_.ProcessSentPacket(sent_packet);
406     pacer()->UpdateOutstandingData(
407         transport_feedback_adapter_.GetOutstandingData());
408     if (packet_msg && controller_)
409       PostUpdates(controller_->OnSentPacket(*packet_msg));
410   });
411 }
412 
OnReceivedPacket(const ReceivedPacket & packet_msg)413 void RtpTransportControllerSend::OnReceivedPacket(
414     const ReceivedPacket& packet_msg) {
415   task_queue_.PostTask([this, packet_msg]() {
416     RTC_DCHECK_RUN_ON(&task_queue_);
417     if (controller_)
418       PostUpdates(controller_->OnReceivedPacket(packet_msg));
419   });
420 }
421 
UpdateBitrateConstraints(const BitrateConstraints & updated)422 void RtpTransportControllerSend::UpdateBitrateConstraints(
423     const BitrateConstraints& updated) {
424   TargetRateConstraints msg = ConvertConstraints(updated, clock_);
425   task_queue_.PostTask([this, msg]() {
426     RTC_DCHECK_RUN_ON(&task_queue_);
427     if (controller_) {
428       PostUpdates(controller_->OnTargetRateConstraints(msg));
429     } else {
430       UpdateInitialConstraints(msg);
431     }
432   });
433 }
434 
SetSdpBitrateParameters(const BitrateConstraints & constraints)435 void RtpTransportControllerSend::SetSdpBitrateParameters(
436     const BitrateConstraints& constraints) {
437   absl::optional<BitrateConstraints> updated =
438       bitrate_configurator_.UpdateWithSdpParameters(constraints);
439   if (updated.has_value()) {
440     UpdateBitrateConstraints(*updated);
441   } else {
442     RTC_LOG(LS_VERBOSE)
443         << "WebRTC.RtpTransportControllerSend.SetSdpBitrateParameters: "
444            "nothing to update";
445   }
446 }
447 
SetClientBitratePreferences(const BitrateSettings & preferences)448 void RtpTransportControllerSend::SetClientBitratePreferences(
449     const BitrateSettings& preferences) {
450   absl::optional<BitrateConstraints> updated =
451       bitrate_configurator_.UpdateWithClientPreferences(preferences);
452   if (updated.has_value()) {
453     UpdateBitrateConstraints(*updated);
454   } else {
455     RTC_LOG(LS_VERBOSE)
456         << "WebRTC.RtpTransportControllerSend.SetClientBitratePreferences: "
457            "nothing to update";
458   }
459 }
460 
461 absl::optional<BitrateConstraints>
ApplyOrLiftRelayCap(bool is_relayed)462 RtpTransportControllerSend::ApplyOrLiftRelayCap(bool is_relayed) {
463   DataRate cap = is_relayed ? relay_bandwidth_cap_ : DataRate::PlusInfinity();
464   return bitrate_configurator_.UpdateWithRelayCap(cap);
465 }
466 
OnTransportOverheadChanged(size_t transport_overhead_bytes_per_packet)467 void RtpTransportControllerSend::OnTransportOverheadChanged(
468     size_t transport_overhead_bytes_per_packet) {
469   if (transport_overhead_bytes_per_packet >= kMaxOverheadBytes) {
470     RTC_LOG(LS_ERROR) << "Transport overhead exceeds " << kMaxOverheadBytes;
471     return;
472   }
473 
474   pacer()->SetTransportOverhead(
475       DataSize::Bytes(transport_overhead_bytes_per_packet));
476 
477   // TODO(holmer): Call AudioRtpSenders when they have been moved to
478   // RtpTransportControllerSend.
479   for (auto& rtp_video_sender : video_rtp_senders_) {
480     rtp_video_sender->OnTransportOverheadChanged(
481         transport_overhead_bytes_per_packet);
482   }
483 }
484 
AccountForAudioPacketsInPacedSender(bool account_for_audio)485 void RtpTransportControllerSend::AccountForAudioPacketsInPacedSender(
486     bool account_for_audio) {
487   pacer()->SetAccountForAudioPackets(account_for_audio);
488 }
489 
IncludeOverheadInPacedSender()490 void RtpTransportControllerSend::IncludeOverheadInPacedSender() {
491   pacer()->SetIncludeOverhead();
492 }
493 
OnReceivedEstimatedBitrate(uint32_t bitrate)494 void RtpTransportControllerSend::OnReceivedEstimatedBitrate(uint32_t bitrate) {
495   RemoteBitrateReport msg;
496   msg.receive_time = Timestamp::Millis(clock_->TimeInMilliseconds());
497   msg.bandwidth = DataRate::BitsPerSec(bitrate);
498   task_queue_.PostTask([this, msg]() {
499     RTC_DCHECK_RUN_ON(&task_queue_);
500     if (controller_)
501       PostUpdates(controller_->OnRemoteBitrateReport(msg));
502   });
503 }
504 
OnReceivedRtcpReceiverReport(const ReportBlockList & report_blocks,int64_t rtt_ms,int64_t now_ms)505 void RtpTransportControllerSend::OnReceivedRtcpReceiverReport(
506     const ReportBlockList& report_blocks,
507     int64_t rtt_ms,
508     int64_t now_ms) {
509   task_queue_.PostTask([this, report_blocks, now_ms]() {
510     RTC_DCHECK_RUN_ON(&task_queue_);
511     OnReceivedRtcpReceiverReportBlocks(report_blocks, now_ms);
512   });
513 
514   task_queue_.PostTask([this, now_ms, rtt_ms]() {
515     RTC_DCHECK_RUN_ON(&task_queue_);
516     RoundTripTimeUpdate report;
517     report.receive_time = Timestamp::Millis(now_ms);
518     report.round_trip_time = TimeDelta::Millis(rtt_ms);
519     report.smoothed = false;
520     if (controller_ && !report.round_trip_time.IsZero())
521       PostUpdates(controller_->OnRoundTripTimeUpdate(report));
522   });
523 }
524 
OnAddPacket(const RtpPacketSendInfo & packet_info)525 void RtpTransportControllerSend::OnAddPacket(
526     const RtpPacketSendInfo& packet_info) {
527   feedback_demuxer_.AddPacket(packet_info);
528 
529   Timestamp creation_time = Timestamp::Millis(clock_->TimeInMilliseconds());
530   task_queue_.PostTask([this, packet_info, creation_time]() {
531     RTC_DCHECK_RUN_ON(&task_queue_);
532     transport_feedback_adapter_.AddPacket(
533         packet_info,
534         send_side_bwe_with_overhead_ ? transport_overhead_bytes_per_packet_ : 0,
535         creation_time);
536   });
537 }
538 
OnTransportFeedback(const rtcp::TransportFeedback & feedback)539 void RtpTransportControllerSend::OnTransportFeedback(
540     const rtcp::TransportFeedback& feedback) {
541   feedback_demuxer_.OnTransportFeedback(feedback);
542   auto feedback_time = Timestamp::Millis(clock_->TimeInMilliseconds());
543   task_queue_.PostTask([this, feedback, feedback_time]() {
544     RTC_DCHECK_RUN_ON(&task_queue_);
545     absl::optional<TransportPacketsFeedback> feedback_msg =
546         transport_feedback_adapter_.ProcessTransportFeedback(feedback,
547                                                              feedback_time);
548     if (feedback_msg && controller_) {
549       PostUpdates(controller_->OnTransportPacketsFeedback(*feedback_msg));
550     }
551     pacer()->UpdateOutstandingData(
552         transport_feedback_adapter_.GetOutstandingData());
553   });
554 }
555 
OnRemoteNetworkEstimate(NetworkStateEstimate estimate)556 void RtpTransportControllerSend::OnRemoteNetworkEstimate(
557     NetworkStateEstimate estimate) {
558   if (event_log_) {
559     event_log_->Log(std::make_unique<RtcEventRemoteEstimate>(
560         estimate.link_capacity_lower, estimate.link_capacity_upper));
561   }
562   estimate.update_time = Timestamp::Millis(clock_->TimeInMilliseconds());
563   task_queue_.PostTask([this, estimate] {
564     RTC_DCHECK_RUN_ON(&task_queue_);
565     if (controller_)
566       PostUpdates(controller_->OnNetworkStateEstimate(estimate));
567   });
568 }
569 
MaybeCreateControllers()570 void RtpTransportControllerSend::MaybeCreateControllers() {
571   RTC_DCHECK(!controller_);
572   RTC_DCHECK(!control_handler_);
573 
574   if (!network_available_ || !observer_)
575     return;
576   control_handler_ = std::make_unique<CongestionControlHandler>();
577 
578   initial_config_.constraints.at_time =
579       Timestamp::Millis(clock_->TimeInMilliseconds());
580   initial_config_.stream_based_config = streams_config_;
581 
582   // TODO(srte): Use fallback controller if no feedback is available.
583   if (controller_factory_override_) {
584     RTC_LOG(LS_INFO) << "Creating overridden congestion controller";
585     controller_ = controller_factory_override_->Create(initial_config_);
586     process_interval_ = controller_factory_override_->GetProcessInterval();
587   } else {
588     RTC_LOG(LS_INFO) << "Creating fallback congestion controller";
589     controller_ = controller_factory_fallback_->Create(initial_config_);
590     process_interval_ = controller_factory_fallback_->GetProcessInterval();
591   }
592   UpdateControllerWithTimeInterval();
593   StartProcessPeriodicTasks();
594 }
595 
UpdateInitialConstraints(TargetRateConstraints new_contraints)596 void RtpTransportControllerSend::UpdateInitialConstraints(
597     TargetRateConstraints new_contraints) {
598   if (!new_contraints.starting_rate)
599     new_contraints.starting_rate = initial_config_.constraints.starting_rate;
600   RTC_DCHECK(new_contraints.starting_rate);
601   initial_config_.constraints = new_contraints;
602 }
603 
StartProcessPeriodicTasks()604 void RtpTransportControllerSend::StartProcessPeriodicTasks() {
605   if (!pacer_queue_update_task_.Running()) {
606     pacer_queue_update_task_ = RepeatingTaskHandle::DelayedStart(
607         task_queue_.Get(), kPacerQueueUpdateInterval, [this]() {
608           RTC_DCHECK_RUN_ON(&task_queue_);
609           TimeDelta expected_queue_time = pacer()->ExpectedQueueTime();
610           control_handler_->SetPacerQueue(expected_queue_time);
611           UpdateControlState();
612           return kPacerQueueUpdateInterval;
613         });
614   }
615   controller_task_.Stop();
616   if (process_interval_.IsFinite()) {
617     controller_task_ = RepeatingTaskHandle::DelayedStart(
618         task_queue_.Get(), process_interval_, [this]() {
619           RTC_DCHECK_RUN_ON(&task_queue_);
620           UpdateControllerWithTimeInterval();
621           return process_interval_;
622         });
623   }
624 }
625 
UpdateControllerWithTimeInterval()626 void RtpTransportControllerSend::UpdateControllerWithTimeInterval() {
627   RTC_DCHECK(controller_);
628   ProcessInterval msg;
629   msg.at_time = Timestamp::Millis(clock_->TimeInMilliseconds());
630   if (add_pacing_to_cwin_)
631     msg.pacer_queue = pacer()->QueueSizeData();
632   PostUpdates(controller_->OnProcessInterval(msg));
633 }
634 
UpdateStreamsConfig()635 void RtpTransportControllerSend::UpdateStreamsConfig() {
636   streams_config_.at_time = Timestamp::Millis(clock_->TimeInMilliseconds());
637   if (controller_)
638     PostUpdates(controller_->OnStreamsConfig(streams_config_));
639 }
640 
PostUpdates(NetworkControlUpdate update)641 void RtpTransportControllerSend::PostUpdates(NetworkControlUpdate update) {
642   if (update.congestion_window) {
643     pacer()->SetCongestionWindow(*update.congestion_window);
644   }
645   if (update.pacer_config) {
646     pacer()->SetPacingRates(update.pacer_config->data_rate(),
647                             update.pacer_config->pad_rate());
648   }
649   for (const auto& probe : update.probe_cluster_configs) {
650     pacer()->CreateProbeCluster(probe.target_data_rate, probe.id);
651   }
652   if (update.target_rate) {
653     control_handler_->SetTargetRate(*update.target_rate);
654     UpdateControlState();
655   }
656 }
657 
OnReceivedRtcpReceiverReportBlocks(const ReportBlockList & report_blocks,int64_t now_ms)658 void RtpTransportControllerSend::OnReceivedRtcpReceiverReportBlocks(
659     const ReportBlockList& report_blocks,
660     int64_t now_ms) {
661   if (report_blocks.empty())
662     return;
663 
664   int total_packets_lost_delta = 0;
665   int total_packets_delta = 0;
666 
667   // Compute the packet loss from all report blocks.
668   for (const RTCPReportBlock& report_block : report_blocks) {
669     auto it = last_report_blocks_.find(report_block.source_ssrc);
670     if (it != last_report_blocks_.end()) {
671       auto number_of_packets = report_block.extended_highest_sequence_number -
672                                it->second.extended_highest_sequence_number;
673       total_packets_delta += number_of_packets;
674       auto lost_delta = report_block.packets_lost - it->second.packets_lost;
675       total_packets_lost_delta += lost_delta;
676     }
677     last_report_blocks_[report_block.source_ssrc] = report_block;
678   }
679   // Can only compute delta if there has been previous blocks to compare to. If
680   // not, total_packets_delta will be unchanged and there's nothing more to do.
681   if (!total_packets_delta)
682     return;
683   int packets_received_delta = total_packets_delta - total_packets_lost_delta;
684   // To detect lost packets, at least one packet has to be received. This check
685   // is needed to avoid bandwith detection update in
686   // VideoSendStreamTest.SuspendBelowMinBitrate
687 
688   if (packets_received_delta < 1)
689     return;
690   Timestamp now = Timestamp::Millis(now_ms);
691   TransportLossReport msg;
692   msg.packets_lost_delta = total_packets_lost_delta;
693   msg.packets_received_delta = packets_received_delta;
694   msg.receive_time = now;
695   msg.start_time = last_report_block_time_;
696   msg.end_time = now;
697   if (controller_)
698     PostUpdates(controller_->OnTransportLossReport(msg));
699   last_report_block_time_ = now;
700 }
701 
702 }  // namespace webrtc
703