1 /*
2  *  Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "modules/pacing/paced_sender.h"
12 
13 #include <algorithm>
14 #include <utility>
15 #include <vector>
16 
17 #include "absl/memory/memory.h"
18 #include "absl/strings/match.h"
19 #include "api/rtc_event_log/rtc_event_log.h"
20 #include "modules/utility/include/process_thread.h"
21 #include "rtc_base/checks.h"
22 #include "rtc_base/location.h"
23 #include "rtc_base/logging.h"
24 #include "rtc_base/time_utils.h"
25 #include "rtc_base/trace_event.h"
26 #include "system_wrappers/include/clock.h"
27 
28 namespace webrtc {
29 const int64_t PacedSender::kMaxQueueLengthMs = 2000;
30 const float PacedSender::kDefaultPaceMultiplier = 2.5f;
31 
PacedSender(Clock * clock,PacketRouter * packet_router,RtcEventLog * event_log,const WebRtcKeyValueConfig * field_trials,ProcessThread * process_thread)32 PacedSender::PacedSender(Clock* clock,
33                          PacketRouter* packet_router,
34                          RtcEventLog* event_log,
35                          const WebRtcKeyValueConfig* field_trials,
36                          ProcessThread* process_thread)
37     : process_mode_(
38           (field_trials != nullptr &&
39            absl::StartsWith(field_trials->Lookup("WebRTC-Pacer-DynamicProcess"),
40                             "Enabled"))
41               ? PacingController::ProcessMode::kDynamic
42               : PacingController::ProcessMode::kPeriodic),
43       pacing_controller_(clock,
44                          packet_router,
45                          event_log,
46                          field_trials,
47                          process_mode_),
48       clock_(clock),
49       process_thread_(process_thread) {
50   if (process_thread_)
51     process_thread_->RegisterModule(&module_proxy_, RTC_FROM_HERE);
52 }
53 
~PacedSender()54 PacedSender::~PacedSender() {
55   if (process_thread_) {
56     process_thread_->DeRegisterModule(&module_proxy_);
57   }
58 }
59 
CreateProbeCluster(DataRate bitrate,int cluster_id)60 void PacedSender::CreateProbeCluster(DataRate bitrate, int cluster_id) {
61   rtc::CritScope cs(&critsect_);
62   return pacing_controller_.CreateProbeCluster(bitrate, cluster_id);
63 }
64 
Pause()65 void PacedSender::Pause() {
66   {
67     rtc::CritScope cs(&critsect_);
68     pacing_controller_.Pause();
69   }
70 
71   // Tell the process thread to call our TimeUntilNextProcess() method to get
72   // a new (longer) estimate for when to call Process().
73   if (process_thread_) {
74     process_thread_->WakeUp(&module_proxy_);
75   }
76 }
77 
Resume()78 void PacedSender::Resume() {
79   {
80     rtc::CritScope cs(&critsect_);
81     pacing_controller_.Resume();
82   }
83 
84   // Tell the process thread to call our TimeUntilNextProcess() method to
85   // refresh the estimate for when to call Process().
86   if (process_thread_) {
87     process_thread_->WakeUp(&module_proxy_);
88   }
89 }
90 
SetCongestionWindow(DataSize congestion_window_size)91 void PacedSender::SetCongestionWindow(DataSize congestion_window_size) {
92   {
93     rtc::CritScope cs(&critsect_);
94     pacing_controller_.SetCongestionWindow(congestion_window_size);
95   }
96   MaybeWakupProcessThread();
97 }
98 
UpdateOutstandingData(DataSize outstanding_data)99 void PacedSender::UpdateOutstandingData(DataSize outstanding_data) {
100   {
101     rtc::CritScope cs(&critsect_);
102     pacing_controller_.UpdateOutstandingData(outstanding_data);
103   }
104   MaybeWakupProcessThread();
105 }
106 
SetPacingRates(DataRate pacing_rate,DataRate padding_rate)107 void PacedSender::SetPacingRates(DataRate pacing_rate, DataRate padding_rate) {
108   {
109     rtc::CritScope cs(&critsect_);
110     pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
111   }
112   MaybeWakupProcessThread();
113 }
114 
EnqueuePackets(std::vector<std::unique_ptr<RtpPacketToSend>> packets)115 void PacedSender::EnqueuePackets(
116     std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
117   {
118     TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
119                  "PacedSender::EnqueuePackets");
120     rtc::CritScope cs(&critsect_);
121     for (auto& packet : packets) {
122       TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc"),
123                    "PacedSender::EnqueuePackets::Loop", "sequence_number",
124                    packet->SequenceNumber(), "rtp_timestamp",
125                    packet->Timestamp());
126 
127       pacing_controller_.EnqueuePacket(std::move(packet));
128     }
129   }
130   MaybeWakupProcessThread();
131 }
132 
SetAccountForAudioPackets(bool account_for_audio)133 void PacedSender::SetAccountForAudioPackets(bool account_for_audio) {
134   rtc::CritScope cs(&critsect_);
135   pacing_controller_.SetAccountForAudioPackets(account_for_audio);
136 }
137 
SetIncludeOverhead()138 void PacedSender::SetIncludeOverhead() {
139   rtc::CritScope cs(&critsect_);
140   pacing_controller_.SetIncludeOverhead();
141 }
142 
SetTransportOverhead(DataSize overhead_per_packet)143 void PacedSender::SetTransportOverhead(DataSize overhead_per_packet) {
144   rtc::CritScope cs(&critsect_);
145   pacing_controller_.SetTransportOverhead(overhead_per_packet);
146 }
147 
ExpectedQueueTime() const148 TimeDelta PacedSender::ExpectedQueueTime() const {
149   rtc::CritScope cs(&critsect_);
150   return pacing_controller_.ExpectedQueueTime();
151 }
152 
QueueSizeData() const153 DataSize PacedSender::QueueSizeData() const {
154   rtc::CritScope cs(&critsect_);
155   return pacing_controller_.QueueSizeData();
156 }
157 
FirstSentPacketTime() const158 absl::optional<Timestamp> PacedSender::FirstSentPacketTime() const {
159   rtc::CritScope cs(&critsect_);
160   return pacing_controller_.FirstSentPacketTime();
161 }
162 
OldestPacketWaitTime() const163 TimeDelta PacedSender::OldestPacketWaitTime() const {
164   rtc::CritScope cs(&critsect_);
165   return pacing_controller_.OldestPacketWaitTime();
166 }
167 
TimeUntilNextProcess()168 int64_t PacedSender::TimeUntilNextProcess() {
169   rtc::CritScope cs(&critsect_);
170 
171   Timestamp next_send_time = pacing_controller_.NextSendTime();
172   TimeDelta sleep_time =
173       std::max(TimeDelta::Zero(), next_send_time - clock_->CurrentTime());
174   if (process_mode_ == PacingController::ProcessMode::kDynamic) {
175     return std::max(sleep_time, PacingController::kMinSleepTime).ms();
176   }
177   return sleep_time.ms();
178 }
179 
Process()180 void PacedSender::Process() {
181   rtc::CritScope cs(&critsect_);
182   pacing_controller_.ProcessPackets();
183 }
184 
ProcessThreadAttached(ProcessThread * process_thread)185 void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) {
186   RTC_LOG(LS_INFO) << "ProcessThreadAttached 0x" << process_thread;
187   RTC_DCHECK(!process_thread || process_thread == process_thread_);
188 }
189 
MaybeWakupProcessThread()190 void PacedSender::MaybeWakupProcessThread() {
191   // Tell the process thread to call our TimeUntilNextProcess() method to get
192   // a new time for when to call Process().
193   if (process_thread_ &&
194       process_mode_ == PacingController::ProcessMode::kDynamic) {
195     process_thread_->WakeUp(&module_proxy_);
196   }
197 }
198 
SetQueueTimeLimit(TimeDelta limit)199 void PacedSender::SetQueueTimeLimit(TimeDelta limit) {
200   {
201     rtc::CritScope cs(&critsect_);
202     pacing_controller_.SetQueueTimeLimit(limit);
203   }
204   MaybeWakupProcessThread();
205 }
206 
207 }  // namespace webrtc
208