1 /*
2  *  Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "call/call.h"
12 
13 #include <string.h>
14 
15 #include <algorithm>
16 #include <map>
17 #include <memory>
18 #include <set>
19 #include <utility>
20 #include <vector>
21 
22 #include "absl/types/optional.h"
23 #include "api/rtc_event_log/rtc_event_log.h"
24 #include "api/transport/network_control.h"
25 #include "audio/audio_receive_stream.h"
26 #include "audio/audio_send_stream.h"
27 #include "audio/audio_state.h"
28 #include "call/adaptation/broadcast_resource_listener.h"
29 #include "call/bitrate_allocator.h"
30 #include "call/flexfec_receive_stream_impl.h"
31 #include "call/receive_time_calculator.h"
32 #include "call/rtp_stream_receiver_controller.h"
33 #include "call/rtp_transport_controller_send.h"
34 #include "logging/rtc_event_log/events/rtc_event_audio_receive_stream_config.h"
35 #include "logging/rtc_event_log/events/rtc_event_rtcp_packet_incoming.h"
36 #include "logging/rtc_event_log/events/rtc_event_rtp_packet_incoming.h"
37 #include "logging/rtc_event_log/events/rtc_event_video_receive_stream_config.h"
38 #include "logging/rtc_event_log/events/rtc_event_video_send_stream_config.h"
39 #include "logging/rtc_event_log/rtc_stream_config.h"
40 #include "modules/congestion_controller/include/receive_side_congestion_controller.h"
41 #include "modules/rtp_rtcp/include/flexfec_receiver.h"
42 #include "modules/rtp_rtcp/include/rtp_header_extension_map.h"
43 #include "modules/rtp_rtcp/source/byte_io.h"
44 #include "modules/rtp_rtcp/source/rtp_packet_received.h"
45 #include "modules/rtp_rtcp/source/rtp_utility.h"
46 #include "modules/utility/include/process_thread.h"
47 #include "modules/video_coding/fec_controller_default.h"
48 #include "rtc_base/checks.h"
49 #include "rtc_base/constructor_magic.h"
50 #include "rtc_base/location.h"
51 #include "rtc_base/logging.h"
52 #include "rtc_base/strings/string_builder.h"
53 #include "rtc_base/synchronization/sequence_checker.h"
54 #include "rtc_base/task_utils/pending_task_safety_flag.h"
55 #include "rtc_base/thread_annotations.h"
56 #include "rtc_base/time_utils.h"
57 #include "rtc_base/trace_event.h"
58 #include "system_wrappers/include/clock.h"
59 #include "system_wrappers/include/cpu_info.h"
60 #include "system_wrappers/include/field_trial.h"
61 #include "system_wrappers/include/metrics.h"
62 #include "video/call_stats2.h"
63 #include "video/send_delay_stats.h"
64 #include "video/stats_counter.h"
65 #include "video/video_receive_stream2.h"
66 #include "video/video_send_stream.h"
67 
68 namespace webrtc {
69 
70 namespace {
SendPeriodicFeedback(const std::vector<RtpExtension> & extensions)71 bool SendPeriodicFeedback(const std::vector<RtpExtension>& extensions) {
72   for (const auto& extension : extensions) {
73     if (extension.uri == RtpExtension::kTransportSequenceNumberV2Uri)
74       return false;
75   }
76   return true;
77 }
78 
79 // TODO(nisse): This really begs for a shared context struct.
UseSendSideBwe(const std::vector<RtpExtension> & extensions,bool transport_cc)80 bool UseSendSideBwe(const std::vector<RtpExtension>& extensions,
81                     bool transport_cc) {
82   if (!transport_cc)
83     return false;
84   for (const auto& extension : extensions) {
85     if (extension.uri == RtpExtension::kTransportSequenceNumberUri ||
86         extension.uri == RtpExtension::kTransportSequenceNumberV2Uri)
87       return true;
88   }
89   return false;
90 }
91 
UseSendSideBwe(const VideoReceiveStream::Config & config)92 bool UseSendSideBwe(const VideoReceiveStream::Config& config) {
93   return UseSendSideBwe(config.rtp.extensions, config.rtp.transport_cc);
94 }
95 
UseSendSideBwe(const AudioReceiveStream::Config & config)96 bool UseSendSideBwe(const AudioReceiveStream::Config& config) {
97   return UseSendSideBwe(config.rtp.extensions, config.rtp.transport_cc);
98 }
99 
UseSendSideBwe(const FlexfecReceiveStream::Config & config)100 bool UseSendSideBwe(const FlexfecReceiveStream::Config& config) {
101   return UseSendSideBwe(config.rtp_header_extensions, config.transport_cc);
102 }
103 
FindKeyByValue(const std::map<int,int> & m,int v)104 const int* FindKeyByValue(const std::map<int, int>& m, int v) {
105   for (const auto& kv : m) {
106     if (kv.second == v)
107       return &kv.first;
108   }
109   return nullptr;
110 }
111 
CreateRtcLogStreamConfig(const VideoReceiveStream::Config & config)112 std::unique_ptr<rtclog::StreamConfig> CreateRtcLogStreamConfig(
113     const VideoReceiveStream::Config& config) {
114   auto rtclog_config = std::make_unique<rtclog::StreamConfig>();
115   rtclog_config->remote_ssrc = config.rtp.remote_ssrc;
116   rtclog_config->local_ssrc = config.rtp.local_ssrc;
117   rtclog_config->rtx_ssrc = config.rtp.rtx_ssrc;
118   rtclog_config->rtcp_mode = config.rtp.rtcp_mode;
119   rtclog_config->rtp_extensions = config.rtp.extensions;
120 
121   for (const auto& d : config.decoders) {
122     const int* search =
123         FindKeyByValue(config.rtp.rtx_associated_payload_types, d.payload_type);
124     rtclog_config->codecs.emplace_back(d.video_format.name, d.payload_type,
125                                        search ? *search : 0);
126   }
127   return rtclog_config;
128 }
129 
CreateRtcLogStreamConfig(const VideoSendStream::Config & config,size_t ssrc_index)130 std::unique_ptr<rtclog::StreamConfig> CreateRtcLogStreamConfig(
131     const VideoSendStream::Config& config,
132     size_t ssrc_index) {
133   auto rtclog_config = std::make_unique<rtclog::StreamConfig>();
134   rtclog_config->local_ssrc = config.rtp.ssrcs[ssrc_index];
135   if (ssrc_index < config.rtp.rtx.ssrcs.size()) {
136     rtclog_config->rtx_ssrc = config.rtp.rtx.ssrcs[ssrc_index];
137   }
138   rtclog_config->rtcp_mode = config.rtp.rtcp_mode;
139   rtclog_config->rtp_extensions = config.rtp.extensions;
140 
141   rtclog_config->codecs.emplace_back(config.rtp.payload_name,
142                                      config.rtp.payload_type,
143                                      config.rtp.rtx.payload_type);
144   return rtclog_config;
145 }
146 
CreateRtcLogStreamConfig(const AudioReceiveStream::Config & config)147 std::unique_ptr<rtclog::StreamConfig> CreateRtcLogStreamConfig(
148     const AudioReceiveStream::Config& config) {
149   auto rtclog_config = std::make_unique<rtclog::StreamConfig>();
150   rtclog_config->remote_ssrc = config.rtp.remote_ssrc;
151   rtclog_config->local_ssrc = config.rtp.local_ssrc;
152   rtclog_config->rtp_extensions = config.rtp.extensions;
153   return rtclog_config;
154 }
155 
IsRtcp(const uint8_t * packet,size_t length)156 bool IsRtcp(const uint8_t* packet, size_t length) {
157   RtpUtility::RtpHeaderParser rtp_parser(packet, length);
158   return rtp_parser.RTCP();
159 }
160 
GetCurrentTaskQueueOrThread()161 TaskQueueBase* GetCurrentTaskQueueOrThread() {
162   TaskQueueBase* current = TaskQueueBase::Current();
163   if (!current)
164     current = rtc::ThreadManager::Instance()->CurrentThread();
165   return current;
166 }
167 
168 }  // namespace
169 
170 namespace internal {
171 
172 // Wraps an injected resource in a BroadcastResourceListener and handles adding
173 // and removing adapter resources to individual VideoSendStreams.
174 class ResourceVideoSendStreamForwarder {
175  public:
ResourceVideoSendStreamForwarder(rtc::scoped_refptr<webrtc::Resource> resource)176   ResourceVideoSendStreamForwarder(
177       rtc::scoped_refptr<webrtc::Resource> resource)
178       : broadcast_resource_listener_(resource) {
179     broadcast_resource_listener_.StartListening();
180   }
~ResourceVideoSendStreamForwarder()181   ~ResourceVideoSendStreamForwarder() {
182     RTC_DCHECK(adapter_resources_.empty());
183     broadcast_resource_listener_.StopListening();
184   }
185 
Resource() const186   rtc::scoped_refptr<webrtc::Resource> Resource() const {
187     return broadcast_resource_listener_.SourceResource();
188   }
189 
OnCreateVideoSendStream(VideoSendStream * video_send_stream)190   void OnCreateVideoSendStream(VideoSendStream* video_send_stream) {
191     RTC_DCHECK(adapter_resources_.find(video_send_stream) ==
192                adapter_resources_.end());
193     auto adapter_resource =
194         broadcast_resource_listener_.CreateAdapterResource();
195     video_send_stream->AddAdaptationResource(adapter_resource);
196     adapter_resources_.insert(
197         std::make_pair(video_send_stream, adapter_resource));
198   }
199 
OnDestroyVideoSendStream(VideoSendStream * video_send_stream)200   void OnDestroyVideoSendStream(VideoSendStream* video_send_stream) {
201     auto it = adapter_resources_.find(video_send_stream);
202     RTC_DCHECK(it != adapter_resources_.end());
203     broadcast_resource_listener_.RemoveAdapterResource(it->second);
204     adapter_resources_.erase(it);
205   }
206 
207  private:
208   BroadcastResourceListener broadcast_resource_listener_;
209   std::map<VideoSendStream*, rtc::scoped_refptr<webrtc::Resource>>
210       adapter_resources_;
211 };
212 
213 class Call final : public webrtc::Call,
214                    public PacketReceiver,
215                    public RecoveredPacketReceiver,
216                    public TargetTransferRateObserver,
217                    public BitrateAllocator::LimitObserver {
218  public:
219   Call(Clock* clock,
220        const Call::Config& config,
221        std::unique_ptr<RtpTransportControllerSendInterface> transport_send,
222        rtc::scoped_refptr<SharedModuleThread> module_process_thread,
223        TaskQueueFactory* task_queue_factory);
224   ~Call() override;
225 
226   // Implements webrtc::Call.
227   PacketReceiver* Receiver() override;
228 
229   webrtc::AudioSendStream* CreateAudioSendStream(
230       const webrtc::AudioSendStream::Config& config) override;
231   void DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) override;
232 
233   webrtc::AudioReceiveStream* CreateAudioReceiveStream(
234       const webrtc::AudioReceiveStream::Config& config) override;
235   void DestroyAudioReceiveStream(
236       webrtc::AudioReceiveStream* receive_stream) override;
237 
238   webrtc::VideoSendStream* CreateVideoSendStream(
239       webrtc::VideoSendStream::Config config,
240       VideoEncoderConfig encoder_config) override;
241   webrtc::VideoSendStream* CreateVideoSendStream(
242       webrtc::VideoSendStream::Config config,
243       VideoEncoderConfig encoder_config,
244       std::unique_ptr<FecController> fec_controller) override;
245   void DestroyVideoSendStream(webrtc::VideoSendStream* send_stream) override;
246 
247   webrtc::VideoReceiveStream* CreateVideoReceiveStream(
248       webrtc::VideoReceiveStream::Config configuration) override;
249   void DestroyVideoReceiveStream(
250       webrtc::VideoReceiveStream* receive_stream) override;
251 
252   FlexfecReceiveStream* CreateFlexfecReceiveStream(
253       const FlexfecReceiveStream::Config& config) override;
254   void DestroyFlexfecReceiveStream(
255       FlexfecReceiveStream* receive_stream) override;
256 
257   void AddAdaptationResource(rtc::scoped_refptr<Resource> resource) override;
258 
259   RtpTransportControllerSendInterface* GetTransportControllerSend() override;
260 
261   Stats GetStats() const override;
262 
263   // Implements PacketReceiver.
264   DeliveryStatus DeliverPacket(MediaType media_type,
265                                rtc::CopyOnWriteBuffer packet,
266                                int64_t packet_time_us) override;
267 
268   // Implements RecoveredPacketReceiver.
269   void OnRecoveredPacket(const uint8_t* packet, size_t length) override;
270 
271   void SignalChannelNetworkState(MediaType media, NetworkState state) override;
272 
273   void OnAudioTransportOverheadChanged(
274       int transport_overhead_per_packet) override;
275 
276   void OnSentPacket(const rtc::SentPacket& sent_packet) override;
277 
278   // Implements TargetTransferRateObserver,
279   void OnTargetTransferRate(TargetTransferRate msg) override;
280   void OnStartRateUpdate(DataRate start_rate) override;
281 
282   // Implements BitrateAllocator::LimitObserver.
283   void OnAllocationLimitsChanged(BitrateAllocationLimits limits) override;
284 
285   void SetClientBitratePreferences(const BitrateSettings& preferences) override;
286 
287  private:
288   DeliveryStatus DeliverRtcp(MediaType media_type,
289                              const uint8_t* packet,
290                              size_t length)
291       RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_);
292   DeliveryStatus DeliverRtp(MediaType media_type,
293                             rtc::CopyOnWriteBuffer packet,
294                             int64_t packet_time_us)
295       RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_);
296   void ConfigureSync(const std::string& sync_group)
297       RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_);
298 
299   void NotifyBweOfReceivedPacket(const RtpPacketReceived& packet,
300                                  MediaType media_type)
301       RTC_SHARED_LOCKS_REQUIRED(worker_thread_);
302 
303   void UpdateSendHistograms(Timestamp first_sent_packet)
304       RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_);
305   void UpdateReceiveHistograms();
306   void UpdateHistograms();
307   void UpdateAggregateNetworkState();
308 
309   void RegisterRateObserver();
310 
send_transport_queue() const311   rtc::TaskQueue* send_transport_queue() const {
312     return transport_send_ptr_->GetWorkerQueue();
313   }
314 
315   Clock* const clock_;
316   TaskQueueFactory* const task_queue_factory_;
317   TaskQueueBase* const worker_thread_;
318 
319   const int num_cpu_cores_;
320   const rtc::scoped_refptr<SharedModuleThread> module_process_thread_;
321   const std::unique_ptr<CallStats> call_stats_;
322   const std::unique_ptr<BitrateAllocator> bitrate_allocator_;
323   Call::Config config_;
324 
325   NetworkState audio_network_state_;
326   NetworkState video_network_state_;
327   bool aggregate_network_up_ RTC_GUARDED_BY(worker_thread_);
328 
329   // Audio, Video, and FlexFEC receive streams are owned by the client that
330   // creates them.
331   std::set<AudioReceiveStream*> audio_receive_streams_
332       RTC_GUARDED_BY(worker_thread_);
333   std::set<VideoReceiveStream2*> video_receive_streams_
334       RTC_GUARDED_BY(worker_thread_);
335 
336   std::map<std::string, AudioReceiveStream*> sync_stream_mapping_
337       RTC_GUARDED_BY(worker_thread_);
338 
339   // TODO(nisse): Should eventually be injected at creation,
340   // with a single object in the bundled case.
341   RtpStreamReceiverController audio_receiver_controller_;
342   RtpStreamReceiverController video_receiver_controller_;
343 
344   // This extra map is used for receive processing which is
345   // independent of media type.
346 
347   // TODO(nisse): In the RTP transport refactoring, we should have a
348   // single mapping from ssrc to a more abstract receive stream, with
349   // accessor methods for all configuration we need at this level.
350   struct ReceiveRtpConfig {
ReceiveRtpConfigwebrtc::internal::Call::ReceiveRtpConfig351     explicit ReceiveRtpConfig(const webrtc::AudioReceiveStream::Config& config)
352         : extensions(config.rtp.extensions),
353           use_send_side_bwe(UseSendSideBwe(config)) {}
ReceiveRtpConfigwebrtc::internal::Call::ReceiveRtpConfig354     explicit ReceiveRtpConfig(const webrtc::VideoReceiveStream::Config& config)
355         : extensions(config.rtp.extensions),
356           use_send_side_bwe(UseSendSideBwe(config)) {}
ReceiveRtpConfigwebrtc::internal::Call::ReceiveRtpConfig357     explicit ReceiveRtpConfig(const FlexfecReceiveStream::Config& config)
358         : extensions(config.rtp_header_extensions),
359           use_send_side_bwe(UseSendSideBwe(config)) {}
360 
361     // Registered RTP header extensions for each stream. Note that RTP header
362     // extensions are negotiated per track ("m= line") in the SDP, but we have
363     // no notion of tracks at the Call level. We therefore store the RTP header
364     // extensions per SSRC instead, which leads to some storage overhead.
365     const RtpHeaderExtensionMap extensions;
366     // Set if both RTP extension the RTCP feedback message needed for
367     // send side BWE are negotiated.
368     const bool use_send_side_bwe;
369   };
370   std::map<uint32_t, ReceiveRtpConfig> receive_rtp_config_
371       RTC_GUARDED_BY(worker_thread_);
372 
373   // Audio and Video send streams are owned by the client that creates them.
374   std::map<uint32_t, AudioSendStream*> audio_send_ssrcs_
375       RTC_GUARDED_BY(worker_thread_);
376   std::map<uint32_t, VideoSendStream*> video_send_ssrcs_
377       RTC_GUARDED_BY(worker_thread_);
378   std::set<VideoSendStream*> video_send_streams_ RTC_GUARDED_BY(worker_thread_);
379 
380   // Each forwarder wraps an adaptation resource that was added to the call.
381   std::vector<std::unique_ptr<ResourceVideoSendStreamForwarder>>
382       adaptation_resource_forwarders_ RTC_GUARDED_BY(worker_thread_);
383 
384   using RtpStateMap = std::map<uint32_t, RtpState>;
385   RtpStateMap suspended_audio_send_ssrcs_ RTC_GUARDED_BY(worker_thread_);
386   RtpStateMap suspended_video_send_ssrcs_ RTC_GUARDED_BY(worker_thread_);
387 
388   using RtpPayloadStateMap = std::map<uint32_t, RtpPayloadState>;
389   RtpPayloadStateMap suspended_video_payload_states_
390       RTC_GUARDED_BY(worker_thread_);
391 
392   webrtc::RtcEventLog* event_log_;
393 
394   // The following members are only accessed (exclusively) from one thread and
395   // from the destructor, and therefore doesn't need any explicit
396   // synchronization.
397   RateCounter received_bytes_per_second_counter_;
398   RateCounter received_audio_bytes_per_second_counter_;
399   RateCounter received_video_bytes_per_second_counter_;
400   RateCounter received_rtcp_bytes_per_second_counter_;
401   absl::optional<int64_t> first_received_rtp_audio_ms_;
402   absl::optional<int64_t> last_received_rtp_audio_ms_;
403   absl::optional<int64_t> first_received_rtp_video_ms_;
404   absl::optional<int64_t> last_received_rtp_video_ms_;
405 
406   uint32_t last_bandwidth_bps_ RTC_GUARDED_BY(worker_thread_);
407   // TODO(holmer): Remove this lock once BitrateController no longer calls
408   // OnNetworkChanged from multiple threads.
409   uint32_t min_allocated_send_bitrate_bps_ RTC_GUARDED_BY(worker_thread_);
410   uint32_t configured_max_padding_bitrate_bps_ RTC_GUARDED_BY(worker_thread_);
411   AvgCounter estimated_send_bitrate_kbps_counter_
412       RTC_GUARDED_BY(worker_thread_);
413   AvgCounter pacer_bitrate_kbps_counter_ RTC_GUARDED_BY(worker_thread_);
414 
415   ReceiveSideCongestionController receive_side_cc_;
416 
417   const std::unique_ptr<ReceiveTimeCalculator> receive_time_calculator_;
418 
419   const std::unique_ptr<SendDelayStats> video_send_delay_stats_;
420   const int64_t start_ms_;
421 
422   // Note that |task_safety_| needs to be at a greater scope than the task queue
423   // owned by |transport_send_| since calls might arrive on the network thread
424   // while Call is being deleted and the task queue is being torn down.
425   ScopedTaskSafety task_safety_;
426 
427   // Caches transport_send_.get(), to avoid racing with destructor.
428   // Note that this is declared before transport_send_ to ensure that it is not
429   // invalidated until no more tasks can be running on the transport_send_ task
430   // queue.
431   RtpTransportControllerSendInterface* const transport_send_ptr_;
432   // Declared last since it will issue callbacks from a task queue. Declaring it
433   // last ensures that it is destroyed first and any running tasks are finished.
434   std::unique_ptr<RtpTransportControllerSendInterface> transport_send_;
435 
436   bool is_target_rate_observer_registered_ RTC_GUARDED_BY(worker_thread_) =
437       false;
438 
439   RTC_DISALLOW_COPY_AND_ASSIGN(Call);
440 };
441 }  // namespace internal
442 
ToString(int64_t time_ms) const443 std::string Call::Stats::ToString(int64_t time_ms) const {
444   char buf[1024];
445   rtc::SimpleStringBuilder ss(buf);
446   ss << "Call stats: " << time_ms << ", {";
447   ss << "send_bw_bps: " << send_bandwidth_bps << ", ";
448   ss << "recv_bw_bps: " << recv_bandwidth_bps << ", ";
449   ss << "max_pad_bps: " << max_padding_bitrate_bps << ", ";
450   ss << "pacer_delay_ms: " << pacer_delay_ms << ", ";
451   ss << "rtt_ms: " << rtt_ms;
452   ss << '}';
453   return ss.str();
454 }
455 
Create(const Call::Config & config)456 Call* Call::Create(const Call::Config& config) {
457   rtc::scoped_refptr<SharedModuleThread> call_thread =
458       SharedModuleThread::Create(ProcessThread::Create("ModuleProcessThread"),
459                                  nullptr);
460   return Create(config, std::move(call_thread));
461 }
462 
Create(const Call::Config & config,rtc::scoped_refptr<SharedModuleThread> call_thread)463 Call* Call::Create(const Call::Config& config,
464                    rtc::scoped_refptr<SharedModuleThread> call_thread) {
465   return Create(config, Clock::GetRealTimeClock(), std::move(call_thread),
466                 ProcessThread::Create("PacerThread"));
467 }
468 
Create(const Call::Config & config,Clock * clock,rtc::scoped_refptr<SharedModuleThread> call_thread,std::unique_ptr<ProcessThread> pacer_thread)469 Call* Call::Create(const Call::Config& config,
470                    Clock* clock,
471                    rtc::scoped_refptr<SharedModuleThread> call_thread,
472                    std::unique_ptr<ProcessThread> pacer_thread) {
473   RTC_DCHECK(config.task_queue_factory);
474   return new internal::Call(
475       clock, config,
476       std::make_unique<RtpTransportControllerSend>(
477           clock, config.event_log, config.network_state_predictor_factory,
478           config.network_controller_factory, config.bitrate_config,
479           std::move(pacer_thread), config.task_queue_factory, config.trials),
480       std::move(call_thread), config.task_queue_factory);
481 }
482 
483 class SharedModuleThread::Impl {
484  public:
Impl(std::unique_ptr<ProcessThread> process_thread,std::function<void ()> on_one_ref_remaining)485   Impl(std::unique_ptr<ProcessThread> process_thread,
486        std::function<void()> on_one_ref_remaining)
487       : module_thread_(std::move(process_thread)),
488         on_one_ref_remaining_(std::move(on_one_ref_remaining)) {}
489 
EnsureStarted()490   void EnsureStarted() {
491     RTC_DCHECK_RUN_ON(&sequence_checker_);
492     if (started_)
493       return;
494     started_ = true;
495     module_thread_->Start();
496   }
497 
process_thread()498   ProcessThread* process_thread() {
499     RTC_DCHECK_RUN_ON(&sequence_checker_);
500     return module_thread_.get();
501   }
502 
AddRef() const503   void AddRef() const {
504     RTC_DCHECK_RUN_ON(&sequence_checker_);
505     ++ref_count_;
506   }
507 
Release() const508   rtc::RefCountReleaseStatus Release() const {
509     RTC_DCHECK_RUN_ON(&sequence_checker_);
510     --ref_count_;
511 
512     if (ref_count_ == 0) {
513       module_thread_->Stop();
514       return rtc::RefCountReleaseStatus::kDroppedLastRef;
515     }
516 
517     if (ref_count_ == 1 && on_one_ref_remaining_) {
518       auto moved_fn = std::move(on_one_ref_remaining_);
519       // NOTE: after this function returns, chances are that |this| has been
520       // deleted - do not touch any member variables.
521       // If the owner of the last reference implements a lambda that releases
522       // that last reference inside of the callback (which is legal according
523       // to this implementation), we will recursively enter Release() above,
524       // call Stop() and release the last reference.
525       moved_fn();
526     }
527 
528     return rtc::RefCountReleaseStatus::kOtherRefsRemained;
529   }
530 
531  private:
532   SequenceChecker sequence_checker_;
533   mutable int ref_count_ RTC_GUARDED_BY(sequence_checker_) = 0;
534   std::unique_ptr<ProcessThread> const module_thread_;
535   std::function<void()> const on_one_ref_remaining_;
536   bool started_ = false;
537 };
538 
SharedModuleThread(std::unique_ptr<ProcessThread> process_thread,std::function<void ()> on_one_ref_remaining)539 SharedModuleThread::SharedModuleThread(
540     std::unique_ptr<ProcessThread> process_thread,
541     std::function<void()> on_one_ref_remaining)
542     : impl_(std::make_unique<Impl>(std::move(process_thread),
543                                    std::move(on_one_ref_remaining))) {}
544 
545 SharedModuleThread::~SharedModuleThread() = default;
546 
547 // static
548 
Create(std::unique_ptr<ProcessThread> process_thread,std::function<void ()> on_one_ref_remaining)549 rtc::scoped_refptr<SharedModuleThread> SharedModuleThread::Create(
550     std::unique_ptr<ProcessThread> process_thread,
551     std::function<void()> on_one_ref_remaining) {
552   return new SharedModuleThread(std::move(process_thread),
553                                 std::move(on_one_ref_remaining));
554 }
555 
EnsureStarted()556 void SharedModuleThread::EnsureStarted() {
557   impl_->EnsureStarted();
558 }
559 
process_thread()560 ProcessThread* SharedModuleThread::process_thread() {
561   return impl_->process_thread();
562 }
563 
AddRef() const564 void SharedModuleThread::AddRef() const {
565   impl_->AddRef();
566 }
567 
Release() const568 rtc::RefCountReleaseStatus SharedModuleThread::Release() const {
569   auto ret = impl_->Release();
570   if (ret == rtc::RefCountReleaseStatus::kDroppedLastRef)
571     delete this;
572   return ret;
573 }
574 
575 // This method here to avoid subclasses has to implement this method.
576 // Call perf test will use Internal::Call::CreateVideoSendStream() to inject
577 // FecController.
CreateVideoSendStream(VideoSendStream::Config config,VideoEncoderConfig encoder_config,std::unique_ptr<FecController> fec_controller)578 VideoSendStream* Call::CreateVideoSendStream(
579     VideoSendStream::Config config,
580     VideoEncoderConfig encoder_config,
581     std::unique_ptr<FecController> fec_controller) {
582   return nullptr;
583 }
584 
585 namespace internal {
586 
Call(Clock * clock,const Call::Config & config,std::unique_ptr<RtpTransportControllerSendInterface> transport_send,rtc::scoped_refptr<SharedModuleThread> module_process_thread,TaskQueueFactory * task_queue_factory)587 Call::Call(Clock* clock,
588            const Call::Config& config,
589            std::unique_ptr<RtpTransportControllerSendInterface> transport_send,
590            rtc::scoped_refptr<SharedModuleThread> module_process_thread,
591            TaskQueueFactory* task_queue_factory)
592     : clock_(clock),
593       task_queue_factory_(task_queue_factory),
594       worker_thread_(GetCurrentTaskQueueOrThread()),
595       num_cpu_cores_(CpuInfo::DetectNumberOfCores()),
596       module_process_thread_(std::move(module_process_thread)),
597       call_stats_(new CallStats(clock_, worker_thread_)),
598       bitrate_allocator_(new BitrateAllocator(this)),
599       config_(config),
600       audio_network_state_(kNetworkDown),
601       video_network_state_(kNetworkDown),
602       aggregate_network_up_(false),
603       event_log_(config.event_log),
604       received_bytes_per_second_counter_(clock_, nullptr, true),
605       received_audio_bytes_per_second_counter_(clock_, nullptr, true),
606       received_video_bytes_per_second_counter_(clock_, nullptr, true),
607       received_rtcp_bytes_per_second_counter_(clock_, nullptr, true),
608       last_bandwidth_bps_(0),
609       min_allocated_send_bitrate_bps_(0),
610       configured_max_padding_bitrate_bps_(0),
611       estimated_send_bitrate_kbps_counter_(clock_, nullptr, true),
612       pacer_bitrate_kbps_counter_(clock_, nullptr, true),
613       receive_side_cc_(clock_, transport_send->packet_router()),
614       receive_time_calculator_(ReceiveTimeCalculator::CreateFromFieldTrial()),
615       video_send_delay_stats_(new SendDelayStats(clock_)),
616       start_ms_(clock_->TimeInMilliseconds()),
617       transport_send_ptr_(transport_send.get()),
618       transport_send_(std::move(transport_send)) {
619   RTC_DCHECK(config.event_log != nullptr);
620   RTC_DCHECK(config.trials != nullptr);
621   RTC_DCHECK(worker_thread_->IsCurrent());
622 
623   call_stats_->RegisterStatsObserver(&receive_side_cc_);
624 
625   module_process_thread_->process_thread()->RegisterModule(
626       receive_side_cc_.GetRemoteBitrateEstimator(true), RTC_FROM_HERE);
627   module_process_thread_->process_thread()->RegisterModule(&receive_side_cc_,
628                                                            RTC_FROM_HERE);
629 }
630 
~Call()631 Call::~Call() {
632   RTC_DCHECK_RUN_ON(worker_thread_);
633 
634   RTC_CHECK(audio_send_ssrcs_.empty());
635   RTC_CHECK(video_send_ssrcs_.empty());
636   RTC_CHECK(video_send_streams_.empty());
637   RTC_CHECK(audio_receive_streams_.empty());
638   RTC_CHECK(video_receive_streams_.empty());
639 
640   module_process_thread_->process_thread()->DeRegisterModule(
641       receive_side_cc_.GetRemoteBitrateEstimator(true));
642   module_process_thread_->process_thread()->DeRegisterModule(&receive_side_cc_);
643   call_stats_->DeregisterStatsObserver(&receive_side_cc_);
644 
645   absl::optional<Timestamp> first_sent_packet_ms =
646       transport_send_->GetFirstPacketTime();
647 
648   // Only update histograms after process threads have been shut down, so that
649   // they won't try to concurrently update stats.
650   if (first_sent_packet_ms) {
651     UpdateSendHistograms(*first_sent_packet_ms);
652   }
653 
654   UpdateReceiveHistograms();
655   UpdateHistograms();
656 }
657 
RegisterRateObserver()658 void Call::RegisterRateObserver() {
659   RTC_DCHECK_RUN_ON(worker_thread_);
660 
661   if (is_target_rate_observer_registered_)
662     return;
663 
664   is_target_rate_observer_registered_ = true;
665 
666   // This call seems to kick off a number of things, so probably better left
667   // off being kicked off on request rather than in the ctor.
668   transport_send_ptr_->RegisterTargetTransferRateObserver(this);
669 
670   module_process_thread_->EnsureStarted();
671 }
672 
SetClientBitratePreferences(const BitrateSettings & preferences)673 void Call::SetClientBitratePreferences(const BitrateSettings& preferences) {
674   RTC_DCHECK_RUN_ON(worker_thread_);
675   GetTransportControllerSend()->SetClientBitratePreferences(preferences);
676 }
677 
UpdateHistograms()678 void Call::UpdateHistograms() {
679   RTC_HISTOGRAM_COUNTS_100000(
680       "WebRTC.Call.LifetimeInSeconds",
681       (clock_->TimeInMilliseconds() - start_ms_) / 1000);
682 }
683 
684 // Called from the dtor.
UpdateSendHistograms(Timestamp first_sent_packet)685 void Call::UpdateSendHistograms(Timestamp first_sent_packet) {
686   int64_t elapsed_sec =
687       (clock_->TimeInMilliseconds() - first_sent_packet.ms()) / 1000;
688   if (elapsed_sec < metrics::kMinRunTimeInSeconds)
689     return;
690   const int kMinRequiredPeriodicSamples = 5;
691   AggregatedStats send_bitrate_stats =
692       estimated_send_bitrate_kbps_counter_.ProcessAndGetStats();
693   if (send_bitrate_stats.num_samples > kMinRequiredPeriodicSamples) {
694     RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.EstimatedSendBitrateInKbps",
695                                 send_bitrate_stats.average);
696     RTC_LOG(LS_INFO) << "WebRTC.Call.EstimatedSendBitrateInKbps, "
697                      << send_bitrate_stats.ToString();
698   }
699   AggregatedStats pacer_bitrate_stats =
700       pacer_bitrate_kbps_counter_.ProcessAndGetStats();
701   if (pacer_bitrate_stats.num_samples > kMinRequiredPeriodicSamples) {
702     RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.PacerBitrateInKbps",
703                                 pacer_bitrate_stats.average);
704     RTC_LOG(LS_INFO) << "WebRTC.Call.PacerBitrateInKbps, "
705                      << pacer_bitrate_stats.ToString();
706   }
707 }
708 
UpdateReceiveHistograms()709 void Call::UpdateReceiveHistograms() {
710   if (first_received_rtp_audio_ms_) {
711     RTC_HISTOGRAM_COUNTS_100000(
712         "WebRTC.Call.TimeReceivingAudioRtpPacketsInSeconds",
713         (*last_received_rtp_audio_ms_ - *first_received_rtp_audio_ms_) / 1000);
714   }
715   if (first_received_rtp_video_ms_) {
716     RTC_HISTOGRAM_COUNTS_100000(
717         "WebRTC.Call.TimeReceivingVideoRtpPacketsInSeconds",
718         (*last_received_rtp_video_ms_ - *first_received_rtp_video_ms_) / 1000);
719   }
720   const int kMinRequiredPeriodicSamples = 5;
721   AggregatedStats video_bytes_per_sec =
722       received_video_bytes_per_second_counter_.GetStats();
723   if (video_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) {
724     RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.VideoBitrateReceivedInKbps",
725                                 video_bytes_per_sec.average * 8 / 1000);
726     RTC_LOG(LS_INFO) << "WebRTC.Call.VideoBitrateReceivedInBps, "
727                      << video_bytes_per_sec.ToStringWithMultiplier(8);
728   }
729   AggregatedStats audio_bytes_per_sec =
730       received_audio_bytes_per_second_counter_.GetStats();
731   if (audio_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) {
732     RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.AudioBitrateReceivedInKbps",
733                                 audio_bytes_per_sec.average * 8 / 1000);
734     RTC_LOG(LS_INFO) << "WebRTC.Call.AudioBitrateReceivedInBps, "
735                      << audio_bytes_per_sec.ToStringWithMultiplier(8);
736   }
737   AggregatedStats rtcp_bytes_per_sec =
738       received_rtcp_bytes_per_second_counter_.GetStats();
739   if (rtcp_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) {
740     RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.RtcpBitrateReceivedInBps",
741                                 rtcp_bytes_per_sec.average * 8);
742     RTC_LOG(LS_INFO) << "WebRTC.Call.RtcpBitrateReceivedInBps, "
743                      << rtcp_bytes_per_sec.ToStringWithMultiplier(8);
744   }
745   AggregatedStats recv_bytes_per_sec =
746       received_bytes_per_second_counter_.GetStats();
747   if (recv_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) {
748     RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.BitrateReceivedInKbps",
749                                 recv_bytes_per_sec.average * 8 / 1000);
750     RTC_LOG(LS_INFO) << "WebRTC.Call.BitrateReceivedInBps, "
751                      << recv_bytes_per_sec.ToStringWithMultiplier(8);
752   }
753 }
754 
Receiver()755 PacketReceiver* Call::Receiver() {
756   RTC_DCHECK_RUN_ON(worker_thread_);
757   return this;
758 }
759 
CreateAudioSendStream(const webrtc::AudioSendStream::Config & config)760 webrtc::AudioSendStream* Call::CreateAudioSendStream(
761     const webrtc::AudioSendStream::Config& config) {
762   TRACE_EVENT0("webrtc", "Call::CreateAudioSendStream");
763   RTC_DCHECK_RUN_ON(worker_thread_);
764 
765   RegisterRateObserver();
766 
767   // Stream config is logged in AudioSendStream::ConfigureStream, as it may
768   // change during the stream's lifetime.
769   absl::optional<RtpState> suspended_rtp_state;
770   {
771     const auto& iter = suspended_audio_send_ssrcs_.find(config.rtp.ssrc);
772     if (iter != suspended_audio_send_ssrcs_.end()) {
773       suspended_rtp_state.emplace(iter->second);
774     }
775   }
776 
777   AudioSendStream* send_stream = new AudioSendStream(
778       clock_, config, config_.audio_state, task_queue_factory_,
779       module_process_thread_->process_thread(), transport_send_ptr_,
780       bitrate_allocator_.get(), event_log_, call_stats_->AsRtcpRttStats(),
781       suspended_rtp_state);
782   RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) ==
783              audio_send_ssrcs_.end());
784   audio_send_ssrcs_[config.rtp.ssrc] = send_stream;
785 
786   for (AudioReceiveStream* stream : audio_receive_streams_) {
787     if (stream->config().rtp.local_ssrc == config.rtp.ssrc) {
788       stream->AssociateSendStream(send_stream);
789     }
790   }
791 
792   UpdateAggregateNetworkState();
793   return send_stream;
794 }
795 
DestroyAudioSendStream(webrtc::AudioSendStream * send_stream)796 void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) {
797   TRACE_EVENT0("webrtc", "Call::DestroyAudioSendStream");
798   RTC_DCHECK_RUN_ON(worker_thread_);
799   RTC_DCHECK(send_stream != nullptr);
800 
801   send_stream->Stop();
802 
803   const uint32_t ssrc = send_stream->GetConfig().rtp.ssrc;
804   webrtc::internal::AudioSendStream* audio_send_stream =
805       static_cast<webrtc::internal::AudioSendStream*>(send_stream);
806   suspended_audio_send_ssrcs_[ssrc] = audio_send_stream->GetRtpState();
807 
808   size_t num_deleted = audio_send_ssrcs_.erase(ssrc);
809   RTC_DCHECK_EQ(1, num_deleted);
810 
811   for (AudioReceiveStream* stream : audio_receive_streams_) {
812     if (stream->config().rtp.local_ssrc == ssrc) {
813       stream->AssociateSendStream(nullptr);
814     }
815   }
816 
817   UpdateAggregateNetworkState();
818   delete send_stream;
819 }
820 
CreateAudioReceiveStream(const webrtc::AudioReceiveStream::Config & config)821 webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream(
822     const webrtc::AudioReceiveStream::Config& config) {
823   TRACE_EVENT0("webrtc", "Call::CreateAudioReceiveStream");
824   RTC_DCHECK_RUN_ON(worker_thread_);
825   RegisterRateObserver();
826   event_log_->Log(std::make_unique<RtcEventAudioReceiveStreamConfig>(
827       CreateRtcLogStreamConfig(config)));
828   AudioReceiveStream* receive_stream = new AudioReceiveStream(
829       clock_, &audio_receiver_controller_, transport_send_ptr_->packet_router(),
830       module_process_thread_->process_thread(), config_.neteq_factory, config,
831       config_.audio_state, event_log_);
832 
833   receive_rtp_config_.emplace(config.rtp.remote_ssrc, ReceiveRtpConfig(config));
834   audio_receive_streams_.insert(receive_stream);
835 
836   ConfigureSync(config.sync_group);
837 
838   auto it = audio_send_ssrcs_.find(config.rtp.local_ssrc);
839   if (it != audio_send_ssrcs_.end()) {
840     receive_stream->AssociateSendStream(it->second);
841   }
842 
843   UpdateAggregateNetworkState();
844   return receive_stream;
845 }
846 
DestroyAudioReceiveStream(webrtc::AudioReceiveStream * receive_stream)847 void Call::DestroyAudioReceiveStream(
848     webrtc::AudioReceiveStream* receive_stream) {
849   TRACE_EVENT0("webrtc", "Call::DestroyAudioReceiveStream");
850   RTC_DCHECK_RUN_ON(worker_thread_);
851   RTC_DCHECK(receive_stream != nullptr);
852   webrtc::internal::AudioReceiveStream* audio_receive_stream =
853       static_cast<webrtc::internal::AudioReceiveStream*>(receive_stream);
854 
855   const AudioReceiveStream::Config& config = audio_receive_stream->config();
856   uint32_t ssrc = config.rtp.remote_ssrc;
857   receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
858       ->RemoveStream(ssrc);
859   audio_receive_streams_.erase(audio_receive_stream);
860   const std::string& sync_group = audio_receive_stream->config().sync_group;
861   const auto it = sync_stream_mapping_.find(sync_group);
862   if (it != sync_stream_mapping_.end() && it->second == audio_receive_stream) {
863     sync_stream_mapping_.erase(it);
864     ConfigureSync(sync_group);
865   }
866   receive_rtp_config_.erase(ssrc);
867 
868   UpdateAggregateNetworkState();
869   delete audio_receive_stream;
870 }
871 
872 // This method can be used for Call tests with external fec controller factory.
CreateVideoSendStream(webrtc::VideoSendStream::Config config,VideoEncoderConfig encoder_config,std::unique_ptr<FecController> fec_controller)873 webrtc::VideoSendStream* Call::CreateVideoSendStream(
874     webrtc::VideoSendStream::Config config,
875     VideoEncoderConfig encoder_config,
876     std::unique_ptr<FecController> fec_controller) {
877   TRACE_EVENT0("webrtc", "Call::CreateVideoSendStream");
878   RTC_DCHECK_RUN_ON(worker_thread_);
879 
880   RegisterRateObserver();
881 
882   video_send_delay_stats_->AddSsrcs(config);
883   for (size_t ssrc_index = 0; ssrc_index < config.rtp.ssrcs.size();
884        ++ssrc_index) {
885     event_log_->Log(std::make_unique<RtcEventVideoSendStreamConfig>(
886         CreateRtcLogStreamConfig(config, ssrc_index)));
887   }
888 
889   // TODO(mflodman): Base the start bitrate on a current bandwidth estimate, if
890   // the call has already started.
891   // Copy ssrcs from |config| since |config| is moved.
892   std::vector<uint32_t> ssrcs = config.rtp.ssrcs;
893 
894   VideoSendStream* send_stream = new VideoSendStream(
895       clock_, num_cpu_cores_, module_process_thread_->process_thread(),
896       task_queue_factory_, call_stats_->AsRtcpRttStats(), transport_send_ptr_,
897       bitrate_allocator_.get(), video_send_delay_stats_.get(), event_log_,
898       std::move(config), std::move(encoder_config), suspended_video_send_ssrcs_,
899       suspended_video_payload_states_, std::move(fec_controller));
900 
901   for (uint32_t ssrc : ssrcs) {
902     RTC_DCHECK(video_send_ssrcs_.find(ssrc) == video_send_ssrcs_.end());
903     video_send_ssrcs_[ssrc] = send_stream;
904   }
905   video_send_streams_.insert(send_stream);
906   // Forward resources that were previously added to the call to the new stream.
907   for (const auto& resource_forwarder : adaptation_resource_forwarders_) {
908     resource_forwarder->OnCreateVideoSendStream(send_stream);
909   }
910 
911   UpdateAggregateNetworkState();
912 
913   return send_stream;
914 }
915 
CreateVideoSendStream(webrtc::VideoSendStream::Config config,VideoEncoderConfig encoder_config)916 webrtc::VideoSendStream* Call::CreateVideoSendStream(
917     webrtc::VideoSendStream::Config config,
918     VideoEncoderConfig encoder_config) {
919   if (config_.fec_controller_factory) {
920     RTC_LOG(LS_INFO) << "External FEC Controller will be used.";
921   }
922   std::unique_ptr<FecController> fec_controller =
923       config_.fec_controller_factory
924           ? config_.fec_controller_factory->CreateFecController()
925           : std::make_unique<FecControllerDefault>(clock_);
926   return CreateVideoSendStream(std::move(config), std::move(encoder_config),
927                                std::move(fec_controller));
928 }
929 
DestroyVideoSendStream(webrtc::VideoSendStream * send_stream)930 void Call::DestroyVideoSendStream(webrtc::VideoSendStream* send_stream) {
931   TRACE_EVENT0("webrtc", "Call::DestroyVideoSendStream");
932   RTC_DCHECK(send_stream != nullptr);
933   RTC_DCHECK_RUN_ON(worker_thread_);
934 
935   send_stream->Stop();
936 
937   VideoSendStream* send_stream_impl = nullptr;
938 
939   auto it = video_send_ssrcs_.begin();
940   while (it != video_send_ssrcs_.end()) {
941     if (it->second == static_cast<VideoSendStream*>(send_stream)) {
942       send_stream_impl = it->second;
943       video_send_ssrcs_.erase(it++);
944     } else {
945       ++it;
946     }
947   }
948   // Stop forwarding resources to the stream being destroyed.
949   for (const auto& resource_forwarder : adaptation_resource_forwarders_) {
950     resource_forwarder->OnDestroyVideoSendStream(send_stream_impl);
951   }
952   video_send_streams_.erase(send_stream_impl);
953 
954   RTC_CHECK(send_stream_impl != nullptr);
955 
956   VideoSendStream::RtpStateMap rtp_states;
957   VideoSendStream::RtpPayloadStateMap rtp_payload_states;
958   send_stream_impl->StopPermanentlyAndGetRtpStates(&rtp_states,
959                                                    &rtp_payload_states);
960   for (const auto& kv : rtp_states) {
961     suspended_video_send_ssrcs_[kv.first] = kv.second;
962   }
963   for (const auto& kv : rtp_payload_states) {
964     suspended_video_payload_states_[kv.first] = kv.second;
965   }
966 
967   UpdateAggregateNetworkState();
968   delete send_stream_impl;
969 }
970 
CreateVideoReceiveStream(webrtc::VideoReceiveStream::Config configuration)971 webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream(
972     webrtc::VideoReceiveStream::Config configuration) {
973   TRACE_EVENT0("webrtc", "Call::CreateVideoReceiveStream");
974   RTC_DCHECK_RUN_ON(worker_thread_);
975 
976   receive_side_cc_.SetSendPeriodicFeedback(
977       SendPeriodicFeedback(configuration.rtp.extensions));
978 
979   RegisterRateObserver();
980 
981   TaskQueueBase* current = GetCurrentTaskQueueOrThread();
982   RTC_CHECK(current);
983   VideoReceiveStream2* receive_stream = new VideoReceiveStream2(
984       task_queue_factory_, current, &video_receiver_controller_, num_cpu_cores_,
985       transport_send_ptr_->packet_router(), std::move(configuration),
986       module_process_thread_->process_thread(), call_stats_.get(), clock_,
987       new VCMTiming(clock_));
988 
989   const webrtc::VideoReceiveStream::Config& config = receive_stream->config();
990   if (config.rtp.rtx_ssrc) {
991     // We record identical config for the rtx stream as for the main
992     // stream. Since the transport_send_cc negotiation is per payload
993     // type, we may get an incorrect value for the rtx stream, but
994     // that is unlikely to matter in practice.
995     receive_rtp_config_.emplace(config.rtp.rtx_ssrc, ReceiveRtpConfig(config));
996   }
997   receive_rtp_config_.emplace(config.rtp.remote_ssrc, ReceiveRtpConfig(config));
998   video_receive_streams_.insert(receive_stream);
999   ConfigureSync(config.sync_group);
1000 
1001   receive_stream->SignalNetworkState(video_network_state_);
1002   UpdateAggregateNetworkState();
1003   event_log_->Log(std::make_unique<RtcEventVideoReceiveStreamConfig>(
1004       CreateRtcLogStreamConfig(config)));
1005   return receive_stream;
1006 }
1007 
DestroyVideoReceiveStream(webrtc::VideoReceiveStream * receive_stream)1008 void Call::DestroyVideoReceiveStream(
1009     webrtc::VideoReceiveStream* receive_stream) {
1010   TRACE_EVENT0("webrtc", "Call::DestroyVideoReceiveStream");
1011   RTC_DCHECK_RUN_ON(worker_thread_);
1012   RTC_DCHECK(receive_stream != nullptr);
1013   VideoReceiveStream2* receive_stream_impl =
1014       static_cast<VideoReceiveStream2*>(receive_stream);
1015   const VideoReceiveStream::Config& config = receive_stream_impl->config();
1016 
1017   // Remove all ssrcs pointing to a receive stream. As RTX retransmits on a
1018   // separate SSRC there can be either one or two.
1019   receive_rtp_config_.erase(config.rtp.remote_ssrc);
1020   if (config.rtp.rtx_ssrc) {
1021     receive_rtp_config_.erase(config.rtp.rtx_ssrc);
1022   }
1023   video_receive_streams_.erase(receive_stream_impl);
1024   ConfigureSync(config.sync_group);
1025 
1026   receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
1027       ->RemoveStream(config.rtp.remote_ssrc);
1028 
1029   UpdateAggregateNetworkState();
1030   delete receive_stream_impl;
1031 }
1032 
CreateFlexfecReceiveStream(const FlexfecReceiveStream::Config & config)1033 FlexfecReceiveStream* Call::CreateFlexfecReceiveStream(
1034     const FlexfecReceiveStream::Config& config) {
1035   TRACE_EVENT0("webrtc", "Call::CreateFlexfecReceiveStream");
1036   RTC_DCHECK_RUN_ON(worker_thread_);
1037 
1038   RecoveredPacketReceiver* recovered_packet_receiver = this;
1039 
1040   FlexfecReceiveStreamImpl* receive_stream;
1041 
1042   // Unlike the video and audio receive streams, FlexfecReceiveStream implements
1043   // RtpPacketSinkInterface itself, and hence its constructor passes its |this|
1044   // pointer to video_receiver_controller_->CreateStream(). Calling the
1045   // constructor while on the worker thread ensures that we don't call
1046   // OnRtpPacket until the constructor is finished and the object is
1047   // in a valid state, since OnRtpPacket runs on the same thread.
1048   receive_stream = new FlexfecReceiveStreamImpl(
1049       clock_, &video_receiver_controller_, config, recovered_packet_receiver,
1050       call_stats_->AsRtcpRttStats(), module_process_thread_->process_thread());
1051 
1052   RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) ==
1053              receive_rtp_config_.end());
1054   receive_rtp_config_.emplace(config.remote_ssrc, ReceiveRtpConfig(config));
1055 
1056   // TODO(brandtr): Store config in RtcEventLog here.
1057 
1058   return receive_stream;
1059 }
1060 
DestroyFlexfecReceiveStream(FlexfecReceiveStream * receive_stream)1061 void Call::DestroyFlexfecReceiveStream(FlexfecReceiveStream* receive_stream) {
1062   TRACE_EVENT0("webrtc", "Call::DestroyFlexfecReceiveStream");
1063   RTC_DCHECK_RUN_ON(worker_thread_);
1064 
1065   RTC_DCHECK(receive_stream != nullptr);
1066   const FlexfecReceiveStream::Config& config = receive_stream->GetConfig();
1067   uint32_t ssrc = config.remote_ssrc;
1068   receive_rtp_config_.erase(ssrc);
1069 
1070   // Remove all SSRCs pointing to the FlexfecReceiveStreamImpl to be
1071   // destroyed.
1072   receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
1073       ->RemoveStream(ssrc);
1074 
1075   delete receive_stream;
1076 }
1077 
AddAdaptationResource(rtc::scoped_refptr<Resource> resource)1078 void Call::AddAdaptationResource(rtc::scoped_refptr<Resource> resource) {
1079   RTC_DCHECK_RUN_ON(worker_thread_);
1080   adaptation_resource_forwarders_.push_back(
1081       std::make_unique<ResourceVideoSendStreamForwarder>(resource));
1082   const auto& resource_forwarder = adaptation_resource_forwarders_.back();
1083   for (VideoSendStream* send_stream : video_send_streams_) {
1084     resource_forwarder->OnCreateVideoSendStream(send_stream);
1085   }
1086 }
1087 
GetTransportControllerSend()1088 RtpTransportControllerSendInterface* Call::GetTransportControllerSend() {
1089   return transport_send_ptr_;
1090 }
1091 
GetStats() const1092 Call::Stats Call::GetStats() const {
1093   RTC_DCHECK_RUN_ON(worker_thread_);
1094 
1095   Stats stats;
1096   // TODO(srte): It is unclear if we only want to report queues if network is
1097   // available.
1098   stats.pacer_delay_ms =
1099       aggregate_network_up_ ? transport_send_ptr_->GetPacerQueuingDelayMs() : 0;
1100 
1101   stats.rtt_ms = call_stats_->LastProcessedRtt();
1102 
1103   // Fetch available send/receive bitrates.
1104   std::vector<unsigned int> ssrcs;
1105   uint32_t recv_bandwidth = 0;
1106   receive_side_cc_.GetRemoteBitrateEstimator(false)->LatestEstimate(
1107       &ssrcs, &recv_bandwidth);
1108   stats.recv_bandwidth_bps = recv_bandwidth;
1109   stats.send_bandwidth_bps = last_bandwidth_bps_;
1110   stats.max_padding_bitrate_bps = configured_max_padding_bitrate_bps_;
1111 
1112   return stats;
1113 }
1114 
SignalChannelNetworkState(MediaType media,NetworkState state)1115 void Call::SignalChannelNetworkState(MediaType media, NetworkState state) {
1116   RTC_DCHECK_RUN_ON(worker_thread_);
1117   switch (media) {
1118     case MediaType::AUDIO:
1119       audio_network_state_ = state;
1120       break;
1121     case MediaType::VIDEO:
1122       video_network_state_ = state;
1123       break;
1124     case MediaType::ANY:
1125     case MediaType::DATA:
1126       RTC_NOTREACHED();
1127       break;
1128   }
1129 
1130   UpdateAggregateNetworkState();
1131   for (VideoReceiveStream2* video_receive_stream : video_receive_streams_) {
1132     video_receive_stream->SignalNetworkState(video_network_state_);
1133   }
1134 }
1135 
OnAudioTransportOverheadChanged(int transport_overhead_per_packet)1136 void Call::OnAudioTransportOverheadChanged(int transport_overhead_per_packet) {
1137   RTC_DCHECK_RUN_ON(worker_thread_);
1138   for (auto& kv : audio_send_ssrcs_) {
1139     kv.second->SetTransportOverhead(transport_overhead_per_packet);
1140   }
1141 }
1142 
UpdateAggregateNetworkState()1143 void Call::UpdateAggregateNetworkState() {
1144   RTC_DCHECK_RUN_ON(worker_thread_);
1145 
1146   bool have_audio =
1147       !audio_send_ssrcs_.empty() || !audio_receive_streams_.empty();
1148   bool have_video =
1149       !video_send_ssrcs_.empty() || !video_receive_streams_.empty();
1150 
1151   bool aggregate_network_up =
1152       ((have_video && video_network_state_ == kNetworkUp) ||
1153        (have_audio && audio_network_state_ == kNetworkUp));
1154 
1155   if (aggregate_network_up != aggregate_network_up_) {
1156     RTC_LOG(LS_INFO)
1157         << "UpdateAggregateNetworkState: aggregate_state change to "
1158         << (aggregate_network_up ? "up" : "down");
1159   } else {
1160     RTC_LOG(LS_VERBOSE)
1161         << "UpdateAggregateNetworkState: aggregate_state remains at "
1162         << (aggregate_network_up ? "up" : "down");
1163   }
1164   aggregate_network_up_ = aggregate_network_up;
1165 
1166   transport_send_ptr_->OnNetworkAvailability(aggregate_network_up);
1167 }
1168 
OnSentPacket(const rtc::SentPacket & sent_packet)1169 void Call::OnSentPacket(const rtc::SentPacket& sent_packet) {
1170   video_send_delay_stats_->OnSentPacket(sent_packet.packet_id,
1171                                         clock_->TimeInMilliseconds());
1172   transport_send_ptr_->OnSentPacket(sent_packet);
1173 }
1174 
OnStartRateUpdate(DataRate start_rate)1175 void Call::OnStartRateUpdate(DataRate start_rate) {
1176   RTC_DCHECK_RUN_ON(send_transport_queue());
1177   bitrate_allocator_->UpdateStartRate(start_rate.bps<uint32_t>());
1178 }
1179 
OnTargetTransferRate(TargetTransferRate msg)1180 void Call::OnTargetTransferRate(TargetTransferRate msg) {
1181   RTC_DCHECK_RUN_ON(send_transport_queue());
1182 
1183   uint32_t target_bitrate_bps = msg.target_rate.bps();
1184   // For controlling the rate of feedback messages.
1185   receive_side_cc_.OnBitrateChanged(target_bitrate_bps);
1186   bitrate_allocator_->OnNetworkEstimateChanged(msg);
1187 
1188   worker_thread_->PostTask(
1189       ToQueuedTask(task_safety_, [this, target_bitrate_bps]() {
1190         RTC_DCHECK_RUN_ON(worker_thread_);
1191         last_bandwidth_bps_ = target_bitrate_bps;
1192 
1193         // Ignore updates if bitrate is zero (the aggregate network state is
1194         // down) or if we're not sending video.
1195         if (target_bitrate_bps == 0 || video_send_streams_.empty()) {
1196           estimated_send_bitrate_kbps_counter_.ProcessAndPause();
1197           pacer_bitrate_kbps_counter_.ProcessAndPause();
1198           return;
1199         }
1200 
1201         estimated_send_bitrate_kbps_counter_.Add(target_bitrate_bps / 1000);
1202         // Pacer bitrate may be higher than bitrate estimate if enforcing min
1203         // bitrate.
1204         uint32_t pacer_bitrate_bps =
1205             std::max(target_bitrate_bps, min_allocated_send_bitrate_bps_);
1206         pacer_bitrate_kbps_counter_.Add(pacer_bitrate_bps / 1000);
1207       }));
1208 }
1209 
OnAllocationLimitsChanged(BitrateAllocationLimits limits)1210 void Call::OnAllocationLimitsChanged(BitrateAllocationLimits limits) {
1211   RTC_DCHECK_RUN_ON(send_transport_queue());
1212 
1213   transport_send_ptr_->SetAllocatedSendBitrateLimits(limits);
1214 
1215   worker_thread_->PostTask(ToQueuedTask(task_safety_, [this, limits]() {
1216     RTC_DCHECK_RUN_ON(worker_thread_);
1217     min_allocated_send_bitrate_bps_ = limits.min_allocatable_rate.bps();
1218     configured_max_padding_bitrate_bps_ = limits.max_padding_rate.bps();
1219   }));
1220 }
1221 
ConfigureSync(const std::string & sync_group)1222 void Call::ConfigureSync(const std::string& sync_group) {
1223   // Set sync only if there was no previous one.
1224   if (sync_group.empty())
1225     return;
1226 
1227   AudioReceiveStream* sync_audio_stream = nullptr;
1228   // Find existing audio stream.
1229   const auto it = sync_stream_mapping_.find(sync_group);
1230   if (it != sync_stream_mapping_.end()) {
1231     sync_audio_stream = it->second;
1232   } else {
1233     // No configured audio stream, see if we can find one.
1234     for (AudioReceiveStream* stream : audio_receive_streams_) {
1235       if (stream->config().sync_group == sync_group) {
1236         if (sync_audio_stream != nullptr) {
1237           RTC_LOG(LS_WARNING)
1238               << "Attempting to sync more than one audio stream "
1239                  "within the same sync group. This is not "
1240                  "supported in the current implementation.";
1241           break;
1242         }
1243         sync_audio_stream = stream;
1244       }
1245     }
1246   }
1247   if (sync_audio_stream)
1248     sync_stream_mapping_[sync_group] = sync_audio_stream;
1249   size_t num_synced_streams = 0;
1250   for (VideoReceiveStream2* video_stream : video_receive_streams_) {
1251     if (video_stream->config().sync_group != sync_group)
1252       continue;
1253     ++num_synced_streams;
1254     if (num_synced_streams > 1) {
1255       // TODO(pbos): Support synchronizing more than one A/V pair.
1256       // https://code.google.com/p/webrtc/issues/detail?id=4762
1257       RTC_LOG(LS_WARNING)
1258           << "Attempting to sync more than one audio/video pair "
1259              "within the same sync group. This is not supported in "
1260              "the current implementation.";
1261     }
1262     // Only sync the first A/V pair within this sync group.
1263     if (num_synced_streams == 1) {
1264       // sync_audio_stream may be null and that's ok.
1265       video_stream->SetSync(sync_audio_stream);
1266     } else {
1267       video_stream->SetSync(nullptr);
1268     }
1269   }
1270 }
1271 
DeliverRtcp(MediaType media_type,const uint8_t * packet,size_t length)1272 PacketReceiver::DeliveryStatus Call::DeliverRtcp(MediaType media_type,
1273                                                  const uint8_t* packet,
1274                                                  size_t length) {
1275   TRACE_EVENT0("webrtc", "Call::DeliverRtcp");
1276   // TODO(pbos): Make sure it's a valid packet.
1277   //             Return DELIVERY_UNKNOWN_SSRC if it can be determined that
1278   //             there's no receiver of the packet.
1279   if (received_bytes_per_second_counter_.HasSample()) {
1280     // First RTP packet has been received.
1281     received_bytes_per_second_counter_.Add(static_cast<int>(length));
1282     received_rtcp_bytes_per_second_counter_.Add(static_cast<int>(length));
1283   }
1284   bool rtcp_delivered = false;
1285   if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) {
1286     for (VideoReceiveStream2* stream : video_receive_streams_) {
1287       if (stream->DeliverRtcp(packet, length))
1288         rtcp_delivered = true;
1289     }
1290   }
1291   if (media_type == MediaType::ANY || media_type == MediaType::AUDIO) {
1292     for (AudioReceiveStream* stream : audio_receive_streams_) {
1293       stream->DeliverRtcp(packet, length);
1294       rtcp_delivered = true;
1295     }
1296   }
1297   if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) {
1298     for (VideoSendStream* stream : video_send_streams_) {
1299       stream->DeliverRtcp(packet, length);
1300       rtcp_delivered = true;
1301     }
1302   }
1303   if (media_type == MediaType::ANY || media_type == MediaType::AUDIO) {
1304     for (auto& kv : audio_send_ssrcs_) {
1305       kv.second->DeliverRtcp(packet, length);
1306       rtcp_delivered = true;
1307     }
1308   }
1309 
1310   if (rtcp_delivered) {
1311     event_log_->Log(std::make_unique<RtcEventRtcpPacketIncoming>(
1312         rtc::MakeArrayView(packet, length)));
1313   }
1314 
1315   return rtcp_delivered ? DELIVERY_OK : DELIVERY_PACKET_ERROR;
1316 }
1317 
DeliverRtp(MediaType media_type,rtc::CopyOnWriteBuffer packet,int64_t packet_time_us)1318 PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type,
1319                                                 rtc::CopyOnWriteBuffer packet,
1320                                                 int64_t packet_time_us) {
1321   TRACE_EVENT0("webrtc", "Call::DeliverRtp");
1322 
1323   RtpPacketReceived parsed_packet;
1324   if (!parsed_packet.Parse(std::move(packet)))
1325     return DELIVERY_PACKET_ERROR;
1326 
1327   if (packet_time_us != -1) {
1328     if (receive_time_calculator_) {
1329       // Repair packet_time_us for clock resets by comparing a new read of
1330       // the same clock (TimeUTCMicros) to a monotonic clock reading.
1331       packet_time_us = receive_time_calculator_->ReconcileReceiveTimes(
1332           packet_time_us, rtc::TimeUTCMicros(), clock_->TimeInMicroseconds());
1333     }
1334     parsed_packet.set_arrival_time_ms((packet_time_us + 500) / 1000);
1335   } else {
1336     parsed_packet.set_arrival_time_ms(clock_->TimeInMilliseconds());
1337   }
1338 
1339   // We might get RTP keep-alive packets in accordance with RFC6263 section 4.6.
1340   // These are empty (zero length payload) RTP packets with an unsignaled
1341   // payload type.
1342   const bool is_keep_alive_packet = parsed_packet.payload_size() == 0;
1343 
1344   RTC_DCHECK(media_type == MediaType::AUDIO || media_type == MediaType::VIDEO ||
1345              is_keep_alive_packet);
1346 
1347   auto it = receive_rtp_config_.find(parsed_packet.Ssrc());
1348   if (it == receive_rtp_config_.end()) {
1349     RTC_LOG(LS_ERROR) << "receive_rtp_config_ lookup failed for ssrc "
1350                       << parsed_packet.Ssrc();
1351     // Destruction of the receive stream, including deregistering from the
1352     // RtpDemuxer, is not protected by the |worker_thread_|.
1353     // But deregistering in the |receive_rtp_config_| map is. So by not passing
1354     // the packet on to demuxing in this case, we prevent incoming packets to be
1355     // passed on via the demuxer to a receive stream which is being torned down.
1356     return DELIVERY_UNKNOWN_SSRC;
1357   }
1358 
1359   parsed_packet.IdentifyExtensions(it->second.extensions);
1360 
1361   NotifyBweOfReceivedPacket(parsed_packet, media_type);
1362 
1363   // RateCounters expect input parameter as int, save it as int,
1364   // instead of converting each time it is passed to RateCounter::Add below.
1365   int length = static_cast<int>(parsed_packet.size());
1366   if (media_type == MediaType::AUDIO) {
1367     if (audio_receiver_controller_.OnRtpPacket(parsed_packet)) {
1368       received_bytes_per_second_counter_.Add(length);
1369       received_audio_bytes_per_second_counter_.Add(length);
1370       event_log_->Log(
1371           std::make_unique<RtcEventRtpPacketIncoming>(parsed_packet));
1372       const int64_t arrival_time_ms = parsed_packet.arrival_time_ms();
1373       if (!first_received_rtp_audio_ms_) {
1374         first_received_rtp_audio_ms_.emplace(arrival_time_ms);
1375       }
1376       last_received_rtp_audio_ms_.emplace(arrival_time_ms);
1377       return DELIVERY_OK;
1378     }
1379   } else if (media_type == MediaType::VIDEO) {
1380     parsed_packet.set_payload_type_frequency(kVideoPayloadTypeFrequency);
1381     if (video_receiver_controller_.OnRtpPacket(parsed_packet)) {
1382       received_bytes_per_second_counter_.Add(length);
1383       received_video_bytes_per_second_counter_.Add(length);
1384       event_log_->Log(
1385           std::make_unique<RtcEventRtpPacketIncoming>(parsed_packet));
1386       const int64_t arrival_time_ms = parsed_packet.arrival_time_ms();
1387       if (!first_received_rtp_video_ms_) {
1388         first_received_rtp_video_ms_.emplace(arrival_time_ms);
1389       }
1390       last_received_rtp_video_ms_.emplace(arrival_time_ms);
1391       return DELIVERY_OK;
1392     }
1393   }
1394   return DELIVERY_UNKNOWN_SSRC;
1395 }
1396 
DeliverPacket(MediaType media_type,rtc::CopyOnWriteBuffer packet,int64_t packet_time_us)1397 PacketReceiver::DeliveryStatus Call::DeliverPacket(
1398     MediaType media_type,
1399     rtc::CopyOnWriteBuffer packet,
1400     int64_t packet_time_us) {
1401   RTC_DCHECK_RUN_ON(worker_thread_);
1402 
1403   if (IsRtcp(packet.cdata(), packet.size()))
1404     return DeliverRtcp(media_type, packet.cdata(), packet.size());
1405 
1406   return DeliverRtp(media_type, std::move(packet), packet_time_us);
1407 }
1408 
OnRecoveredPacket(const uint8_t * packet,size_t length)1409 void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) {
1410   RTC_DCHECK_RUN_ON(worker_thread_);
1411   RtpPacketReceived parsed_packet;
1412   if (!parsed_packet.Parse(packet, length))
1413     return;
1414 
1415   parsed_packet.set_recovered(true);
1416 
1417   auto it = receive_rtp_config_.find(parsed_packet.Ssrc());
1418   if (it == receive_rtp_config_.end()) {
1419     RTC_LOG(LS_ERROR) << "receive_rtp_config_ lookup failed for ssrc "
1420                       << parsed_packet.Ssrc();
1421     // Destruction of the receive stream, including deregistering from the
1422     // RtpDemuxer, is not protected by the |worker_thread_|.
1423     // But deregistering in the |receive_rtp_config_| map is.
1424     // So by not passing the packet on to demuxing in this case, we prevent
1425     // incoming packets to be passed on via the demuxer to a receive stream
1426     // which is being torn down.
1427     return;
1428   }
1429   parsed_packet.IdentifyExtensions(it->second.extensions);
1430 
1431   // TODO(brandtr): Update here when we support protecting audio packets too.
1432   parsed_packet.set_payload_type_frequency(kVideoPayloadTypeFrequency);
1433   video_receiver_controller_.OnRtpPacket(parsed_packet);
1434 }
1435 
NotifyBweOfReceivedPacket(const RtpPacketReceived & packet,MediaType media_type)1436 void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet,
1437                                      MediaType media_type) {
1438   auto it = receive_rtp_config_.find(packet.Ssrc());
1439   bool use_send_side_bwe =
1440       (it != receive_rtp_config_.end()) && it->second.use_send_side_bwe;
1441 
1442   RTPHeader header;
1443   packet.GetHeader(&header);
1444 
1445   ReceivedPacket packet_msg;
1446   packet_msg.size = DataSize::Bytes(packet.payload_size());
1447   packet_msg.receive_time = Timestamp::Millis(packet.arrival_time_ms());
1448   if (header.extension.hasAbsoluteSendTime) {
1449     packet_msg.send_time = header.extension.GetAbsoluteSendTimestamp();
1450   }
1451   transport_send_ptr_->OnReceivedPacket(packet_msg);
1452 
1453   if (!use_send_side_bwe && header.extension.hasTransportSequenceNumber) {
1454     // Inconsistent configuration of send side BWE. Do nothing.
1455     // TODO(nisse): Without this check, we may produce RTCP feedback
1456     // packets even when not negotiated. But it would be cleaner to
1457     // move the check down to RTCPSender::SendFeedbackPacket, which
1458     // would also help the PacketRouter to select an appropriate rtp
1459     // module in the case that some, but not all, have RTCP feedback
1460     // enabled.
1461     return;
1462   }
1463   // For audio, we only support send side BWE.
1464   if (media_type == MediaType::VIDEO ||
1465       (use_send_side_bwe && header.extension.hasTransportSequenceNumber)) {
1466     receive_side_cc_.OnReceivedPacket(
1467         packet.arrival_time_ms(), packet.payload_size() + packet.padding_size(),
1468         header);
1469   }
1470 }
1471 
1472 }  // namespace internal
1473 
1474 }  // namespace webrtc
1475