1 /*
2  *  Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "webrtc/call/congestion_controller.h"
12 
13 #include "webrtc/base/checks.h"
14 #include "webrtc/base/logging.h"
15 #include "webrtc/base/thread_annotations.h"
16 #include "webrtc/common.h"
17 #include "webrtc/modules/bitrate_controller/include/bitrate_controller.h"
18 #include "webrtc/modules/pacing/paced_sender.h"
19 #include "webrtc/modules/pacing/packet_router.h"
20 #include "webrtc/modules/remote_bitrate_estimator/include/send_time_history.h"
21 #include "webrtc/modules/remote_bitrate_estimator/remote_bitrate_estimator_abs_send_time.h"
22 #include "webrtc/modules/remote_bitrate_estimator/remote_bitrate_estimator_single_stream.h"
23 #include "webrtc/modules/remote_bitrate_estimator/remote_estimator_proxy.h"
24 #include "webrtc/modules/remote_bitrate_estimator/transport_feedback_adapter.h"
25 #include "webrtc/modules/rtp_rtcp/include/rtp_rtcp.h"
26 #include "webrtc/modules/utility/include/process_thread.h"
27 #include "webrtc/system_wrappers/include/critical_section_wrapper.h"
28 #include "webrtc/video/call_stats.h"
29 #include "webrtc/video/payload_router.h"
30 #include "webrtc/video/vie_encoder.h"
31 #include "webrtc/video/vie_remb.h"
32 #include "webrtc/voice_engine/include/voe_video_sync.h"
33 
34 namespace webrtc {
35 namespace {
36 
37 static const uint32_t kTimeOffsetSwitchThreshold = 30;
38 
39 class WrappingBitrateEstimator : public RemoteBitrateEstimator {
40  public:
WrappingBitrateEstimator(RemoteBitrateObserver * observer,Clock * clock)41   WrappingBitrateEstimator(RemoteBitrateObserver* observer, Clock* clock)
42       : observer_(observer),
43         clock_(clock),
44         crit_sect_(CriticalSectionWrapper::CreateCriticalSection()),
45         rbe_(new RemoteBitrateEstimatorSingleStream(observer_, clock_)),
46         using_absolute_send_time_(false),
47         packets_since_absolute_send_time_(0),
48         min_bitrate_bps_(RemoteBitrateEstimator::kDefaultMinBitrateBps) {}
49 
~WrappingBitrateEstimator()50   virtual ~WrappingBitrateEstimator() {}
51 
IncomingPacket(int64_t arrival_time_ms,size_t payload_size,const RTPHeader & header,bool was_paced)52   void IncomingPacket(int64_t arrival_time_ms,
53                       size_t payload_size,
54                       const RTPHeader& header,
55                       bool was_paced) override {
56     CriticalSectionScoped cs(crit_sect_.get());
57     PickEstimatorFromHeader(header);
58     rbe_->IncomingPacket(arrival_time_ms, payload_size, header, was_paced);
59   }
60 
Process()61   int32_t Process() override {
62     CriticalSectionScoped cs(crit_sect_.get());
63     return rbe_->Process();
64   }
65 
TimeUntilNextProcess()66   int64_t TimeUntilNextProcess() override {
67     CriticalSectionScoped cs(crit_sect_.get());
68     return rbe_->TimeUntilNextProcess();
69   }
70 
OnRttUpdate(int64_t avg_rtt_ms,int64_t max_rtt_ms)71   void OnRttUpdate(int64_t avg_rtt_ms, int64_t max_rtt_ms) override {
72     CriticalSectionScoped cs(crit_sect_.get());
73     rbe_->OnRttUpdate(avg_rtt_ms, max_rtt_ms);
74   }
75 
RemoveStream(unsigned int ssrc)76   void RemoveStream(unsigned int ssrc) override {
77     CriticalSectionScoped cs(crit_sect_.get());
78     rbe_->RemoveStream(ssrc);
79   }
80 
LatestEstimate(std::vector<unsigned int> * ssrcs,unsigned int * bitrate_bps) const81   bool LatestEstimate(std::vector<unsigned int>* ssrcs,
82                       unsigned int* bitrate_bps) const override {
83     CriticalSectionScoped cs(crit_sect_.get());
84     return rbe_->LatestEstimate(ssrcs, bitrate_bps);
85   }
86 
GetStats(ReceiveBandwidthEstimatorStats * output) const87   bool GetStats(ReceiveBandwidthEstimatorStats* output) const override {
88     CriticalSectionScoped cs(crit_sect_.get());
89     return rbe_->GetStats(output);
90   }
91 
SetMinBitrate(int min_bitrate_bps)92   void SetMinBitrate(int min_bitrate_bps) {
93     CriticalSectionScoped cs(crit_sect_.get());
94     rbe_->SetMinBitrate(min_bitrate_bps);
95     min_bitrate_bps_ = min_bitrate_bps;
96   }
97 
98  private:
PickEstimatorFromHeader(const RTPHeader & header)99   void PickEstimatorFromHeader(const RTPHeader& header)
100       EXCLUSIVE_LOCKS_REQUIRED(crit_sect_.get()) {
101     if (header.extension.hasAbsoluteSendTime) {
102       // If we see AST in header, switch RBE strategy immediately.
103       if (!using_absolute_send_time_) {
104         LOG(LS_INFO) <<
105             "WrappingBitrateEstimator: Switching to absolute send time RBE.";
106         using_absolute_send_time_ = true;
107         PickEstimator();
108       }
109       packets_since_absolute_send_time_ = 0;
110     } else {
111       // When we don't see AST, wait for a few packets before going back to TOF.
112       if (using_absolute_send_time_) {
113         ++packets_since_absolute_send_time_;
114         if (packets_since_absolute_send_time_ >= kTimeOffsetSwitchThreshold) {
115           LOG(LS_INFO) << "WrappingBitrateEstimator: Switching to transmission "
116                        << "time offset RBE.";
117           using_absolute_send_time_ = false;
118           PickEstimator();
119         }
120       }
121     }
122   }
123 
124   // Instantiate RBE for Time Offset or Absolute Send Time extensions.
PickEstimator()125   void PickEstimator() EXCLUSIVE_LOCKS_REQUIRED(crit_sect_.get()) {
126     if (using_absolute_send_time_) {
127       rbe_.reset(new RemoteBitrateEstimatorAbsSendTime(observer_, clock_));
128     } else {
129       rbe_.reset(new RemoteBitrateEstimatorSingleStream(observer_, clock_));
130     }
131     rbe_->SetMinBitrate(min_bitrate_bps_);
132   }
133 
134   RemoteBitrateObserver* observer_;
135   Clock* clock_;
136   rtc::scoped_ptr<CriticalSectionWrapper> crit_sect_;
137   rtc::scoped_ptr<RemoteBitrateEstimator> rbe_;
138   bool using_absolute_send_time_;
139   uint32_t packets_since_absolute_send_time_;
140   int min_bitrate_bps_;
141 
142   RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(WrappingBitrateEstimator);
143 };
144 
145 }  // namespace
146 
CongestionController(ProcessThread * process_thread,CallStats * call_stats,BitrateObserver * bitrate_observer)147 CongestionController::CongestionController(ProcessThread* process_thread,
148                                            CallStats* call_stats,
149                                            BitrateObserver* bitrate_observer)
150     : remb_(new VieRemb(Clock::GetRealTimeClock())),
151       packet_router_(new PacketRouter()),
152       pacer_(new PacedSender(Clock::GetRealTimeClock(),
153                              packet_router_.get(),
154                              BitrateController::kDefaultStartBitrateKbps,
155                              PacedSender::kDefaultPaceMultiplier *
156                                  BitrateController::kDefaultStartBitrateKbps,
157                              0)),
158       remote_bitrate_estimator_(
159           new WrappingBitrateEstimator(remb_.get(), Clock::GetRealTimeClock())),
160       remote_estimator_proxy_(
161           new RemoteEstimatorProxy(Clock::GetRealTimeClock(),
162                                    packet_router_.get())),
163       process_thread_(process_thread),
164       call_stats_(call_stats),
165       pacer_thread_(ProcessThread::Create("PacerThread")),
166       // Constructed last as this object calls the provided callback on
167       // construction.
168       bitrate_controller_(
169           BitrateController::CreateBitrateController(Clock::GetRealTimeClock(),
170                                                      bitrate_observer)),
171       min_bitrate_bps_(RemoteBitrateEstimator::kDefaultMinBitrateBps) {
172   call_stats_->RegisterStatsObserver(remote_bitrate_estimator_.get());
173 
174   pacer_thread_->RegisterModule(pacer_.get());
175   pacer_thread_->Start();
176 
177   process_thread->RegisterModule(remote_estimator_proxy_.get());
178   process_thread->RegisterModule(remote_bitrate_estimator_.get());
179   process_thread->RegisterModule(bitrate_controller_.get());
180 }
181 
~CongestionController()182 CongestionController::~CongestionController() {
183   pacer_thread_->Stop();
184   pacer_thread_->DeRegisterModule(pacer_.get());
185   process_thread_->DeRegisterModule(bitrate_controller_.get());
186   process_thread_->DeRegisterModule(remote_bitrate_estimator_.get());
187   process_thread_->DeRegisterModule(remote_estimator_proxy_.get());
188   call_stats_->DeregisterStatsObserver(remote_bitrate_estimator_.get());
189   if (transport_feedback_adapter_.get())
190     call_stats_->DeregisterStatsObserver(transport_feedback_adapter_.get());
191   RTC_DCHECK(!remb_->InUse());
192   RTC_DCHECK(encoders_.empty());
193 }
194 
AddEncoder(ViEEncoder * encoder)195 void CongestionController::AddEncoder(ViEEncoder* encoder) {
196   rtc::CritScope lock(&encoder_crit_);
197   encoders_.push_back(encoder);
198 }
199 
RemoveEncoder(ViEEncoder * encoder)200 void CongestionController::RemoveEncoder(ViEEncoder* encoder) {
201   rtc::CritScope lock(&encoder_crit_);
202   for (auto it = encoders_.begin(); it != encoders_.end(); ++it) {
203     if (*it == encoder) {
204       encoders_.erase(it);
205       return;
206     }
207   }
208 }
209 
SetBweBitrates(int min_bitrate_bps,int start_bitrate_bps,int max_bitrate_bps)210 void CongestionController::SetBweBitrates(int min_bitrate_bps,
211                                           int start_bitrate_bps,
212                                           int max_bitrate_bps) {
213   if (start_bitrate_bps > 0)
214     bitrate_controller_->SetStartBitrate(start_bitrate_bps);
215   bitrate_controller_->SetMinMaxBitrate(min_bitrate_bps, max_bitrate_bps);
216   if (remote_bitrate_estimator_.get())
217     remote_bitrate_estimator_->SetMinBitrate(min_bitrate_bps);
218   if (transport_feedback_adapter_.get())
219     transport_feedback_adapter_->GetBitrateEstimator()->SetMinBitrate(
220         min_bitrate_bps);
221   min_bitrate_bps_ = min_bitrate_bps;
222 }
223 
GetBitrateController() const224 BitrateController* CongestionController::GetBitrateController() const {
225   return bitrate_controller_.get();
226 }
227 
GetRemoteBitrateEstimator(bool send_side_bwe) const228 RemoteBitrateEstimator* CongestionController::GetRemoteBitrateEstimator(
229     bool send_side_bwe) const {
230 
231   if (send_side_bwe)
232     return remote_estimator_proxy_.get();
233   else
234     return remote_bitrate_estimator_.get();
235 }
236 
237 TransportFeedbackObserver*
GetTransportFeedbackObserver()238 CongestionController::GetTransportFeedbackObserver() {
239   if (transport_feedback_adapter_.get() == nullptr) {
240     transport_feedback_adapter_.reset(new TransportFeedbackAdapter(
241         bitrate_controller_->CreateRtcpBandwidthObserver(),
242         Clock::GetRealTimeClock(), process_thread_));
243     transport_feedback_adapter_->SetBitrateEstimator(
244         new RemoteBitrateEstimatorAbsSendTime(
245             transport_feedback_adapter_.get(), Clock::GetRealTimeClock()));
246     transport_feedback_adapter_->GetBitrateEstimator()->SetMinBitrate(
247         min_bitrate_bps_);
248     call_stats_->RegisterStatsObserver(transport_feedback_adapter_.get());
249   }
250   return transport_feedback_adapter_.get();
251 }
252 
UpdatePacerBitrate(int bitrate_kbps,int max_bitrate_kbps,int min_bitrate_kbps)253 void CongestionController::UpdatePacerBitrate(int bitrate_kbps,
254                                               int max_bitrate_kbps,
255                                               int min_bitrate_kbps) {
256   pacer_->UpdateBitrate(bitrate_kbps, max_bitrate_kbps, min_bitrate_kbps);
257 }
258 
GetPacerQueuingDelayMs() const259 int64_t CongestionController::GetPacerQueuingDelayMs() const {
260   return pacer_->QueueInMs();
261 }
262 
263 // TODO(mflodman): Move out of this class.
SetChannelRembStatus(bool sender,bool receiver,RtpRtcp * rtp_module)264 void CongestionController::SetChannelRembStatus(bool sender,
265                                                 bool receiver,
266                                                 RtpRtcp* rtp_module) {
267   rtp_module->SetREMBStatus(sender || receiver);
268   if (sender) {
269     remb_->AddRembSender(rtp_module);
270   } else {
271     remb_->RemoveRembSender(rtp_module);
272   }
273   if (receiver) {
274     remb_->AddReceiveChannel(rtp_module);
275   } else {
276     remb_->RemoveReceiveChannel(rtp_module);
277   }
278 }
279 
SignalNetworkState(NetworkState state)280 void CongestionController::SignalNetworkState(NetworkState state) {
281   if (state == kNetworkUp) {
282     pacer_->Resume();
283   } else {
284     pacer_->Pause();
285   }
286 }
287 
OnSentPacket(const rtc::SentPacket & sent_packet)288 void CongestionController::OnSentPacket(const rtc::SentPacket& sent_packet) {
289   if (transport_feedback_adapter_) {
290     transport_feedback_adapter_->OnSentPacket(sent_packet.packet_id,
291                                               sent_packet.send_time_ms);
292   }
293 }
294 }  // namespace webrtc
295