1 /*
2 * Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "call/call.h"
12
13 #include <string.h>
14
15 #include <algorithm>
16 #include <map>
17 #include <memory>
18 #include <set>
19 #include <utility>
20 #include <vector>
21
22 #include "absl/types/optional.h"
23 #include "api/rtc_event_log/rtc_event_log.h"
24 #include "api/transport/network_control.h"
25 #include "audio/audio_receive_stream.h"
26 #include "audio/audio_send_stream.h"
27 #include "audio/audio_state.h"
28 #include "call/adaptation/broadcast_resource_listener.h"
29 #include "call/bitrate_allocator.h"
30 #include "call/flexfec_receive_stream_impl.h"
31 #include "call/receive_time_calculator.h"
32 #include "call/rtp_stream_receiver_controller.h"
33 #include "call/rtp_transport_controller_send.h"
34 #include "logging/rtc_event_log/events/rtc_event_audio_receive_stream_config.h"
35 #include "logging/rtc_event_log/events/rtc_event_rtcp_packet_incoming.h"
36 #include "logging/rtc_event_log/events/rtc_event_rtp_packet_incoming.h"
37 #include "logging/rtc_event_log/events/rtc_event_video_receive_stream_config.h"
38 #include "logging/rtc_event_log/events/rtc_event_video_send_stream_config.h"
39 #include "logging/rtc_event_log/rtc_stream_config.h"
40 #include "modules/congestion_controller/include/receive_side_congestion_controller.h"
41 #include "modules/rtp_rtcp/include/flexfec_receiver.h"
42 #include "modules/rtp_rtcp/include/rtp_header_extension_map.h"
43 #include "modules/rtp_rtcp/source/byte_io.h"
44 #include "modules/rtp_rtcp/source/rtp_packet_received.h"
45 #include "modules/rtp_rtcp/source/rtp_utility.h"
46 #include "modules/utility/include/process_thread.h"
47 #include "modules/video_coding/fec_controller_default.h"
48 #include "rtc_base/checks.h"
49 #include "rtc_base/constructor_magic.h"
50 #include "rtc_base/location.h"
51 #include "rtc_base/logging.h"
52 #include "rtc_base/strings/string_builder.h"
53 #include "rtc_base/synchronization/sequence_checker.h"
54 #include "rtc_base/task_utils/pending_task_safety_flag.h"
55 #include "rtc_base/thread_annotations.h"
56 #include "rtc_base/time_utils.h"
57 #include "rtc_base/trace_event.h"
58 #include "system_wrappers/include/clock.h"
59 #include "system_wrappers/include/cpu_info.h"
60 #include "system_wrappers/include/field_trial.h"
61 #include "system_wrappers/include/metrics.h"
62 #include "video/call_stats2.h"
63 #include "video/send_delay_stats.h"
64 #include "video/stats_counter.h"
65 #include "video/video_receive_stream2.h"
66 #include "video/video_send_stream.h"
67
68 namespace webrtc {
69
70 namespace {
SendPeriodicFeedback(const std::vector<RtpExtension> & extensions)71 bool SendPeriodicFeedback(const std::vector<RtpExtension>& extensions) {
72 for (const auto& extension : extensions) {
73 if (extension.uri == RtpExtension::kTransportSequenceNumberV2Uri)
74 return false;
75 }
76 return true;
77 }
78
79 // TODO(nisse): This really begs for a shared context struct.
UseSendSideBwe(const std::vector<RtpExtension> & extensions,bool transport_cc)80 bool UseSendSideBwe(const std::vector<RtpExtension>& extensions,
81 bool transport_cc) {
82 if (!transport_cc)
83 return false;
84 for (const auto& extension : extensions) {
85 if (extension.uri == RtpExtension::kTransportSequenceNumberUri ||
86 extension.uri == RtpExtension::kTransportSequenceNumberV2Uri)
87 return true;
88 }
89 return false;
90 }
91
UseSendSideBwe(const VideoReceiveStream::Config & config)92 bool UseSendSideBwe(const VideoReceiveStream::Config& config) {
93 return UseSendSideBwe(config.rtp.extensions, config.rtp.transport_cc);
94 }
95
UseSendSideBwe(const AudioReceiveStream::Config & config)96 bool UseSendSideBwe(const AudioReceiveStream::Config& config) {
97 return UseSendSideBwe(config.rtp.extensions, config.rtp.transport_cc);
98 }
99
UseSendSideBwe(const FlexfecReceiveStream::Config & config)100 bool UseSendSideBwe(const FlexfecReceiveStream::Config& config) {
101 return UseSendSideBwe(config.rtp_header_extensions, config.transport_cc);
102 }
103
FindKeyByValue(const std::map<int,int> & m,int v)104 const int* FindKeyByValue(const std::map<int, int>& m, int v) {
105 for (const auto& kv : m) {
106 if (kv.second == v)
107 return &kv.first;
108 }
109 return nullptr;
110 }
111
CreateRtcLogStreamConfig(const VideoReceiveStream::Config & config)112 std::unique_ptr<rtclog::StreamConfig> CreateRtcLogStreamConfig(
113 const VideoReceiveStream::Config& config) {
114 auto rtclog_config = std::make_unique<rtclog::StreamConfig>();
115 rtclog_config->remote_ssrc = config.rtp.remote_ssrc;
116 rtclog_config->local_ssrc = config.rtp.local_ssrc;
117 rtclog_config->rtx_ssrc = config.rtp.rtx_ssrc;
118 rtclog_config->rtcp_mode = config.rtp.rtcp_mode;
119 rtclog_config->rtp_extensions = config.rtp.extensions;
120
121 for (const auto& d : config.decoders) {
122 const int* search =
123 FindKeyByValue(config.rtp.rtx_associated_payload_types, d.payload_type);
124 rtclog_config->codecs.emplace_back(d.video_format.name, d.payload_type,
125 search ? *search : 0);
126 }
127 return rtclog_config;
128 }
129
CreateRtcLogStreamConfig(const VideoSendStream::Config & config,size_t ssrc_index)130 std::unique_ptr<rtclog::StreamConfig> CreateRtcLogStreamConfig(
131 const VideoSendStream::Config& config,
132 size_t ssrc_index) {
133 auto rtclog_config = std::make_unique<rtclog::StreamConfig>();
134 rtclog_config->local_ssrc = config.rtp.ssrcs[ssrc_index];
135 if (ssrc_index < config.rtp.rtx.ssrcs.size()) {
136 rtclog_config->rtx_ssrc = config.rtp.rtx.ssrcs[ssrc_index];
137 }
138 rtclog_config->rtcp_mode = config.rtp.rtcp_mode;
139 rtclog_config->rtp_extensions = config.rtp.extensions;
140
141 rtclog_config->codecs.emplace_back(config.rtp.payload_name,
142 config.rtp.payload_type,
143 config.rtp.rtx.payload_type);
144 return rtclog_config;
145 }
146
CreateRtcLogStreamConfig(const AudioReceiveStream::Config & config)147 std::unique_ptr<rtclog::StreamConfig> CreateRtcLogStreamConfig(
148 const AudioReceiveStream::Config& config) {
149 auto rtclog_config = std::make_unique<rtclog::StreamConfig>();
150 rtclog_config->remote_ssrc = config.rtp.remote_ssrc;
151 rtclog_config->local_ssrc = config.rtp.local_ssrc;
152 rtclog_config->rtp_extensions = config.rtp.extensions;
153 return rtclog_config;
154 }
155
IsRtcp(const uint8_t * packet,size_t length)156 bool IsRtcp(const uint8_t* packet, size_t length) {
157 RtpUtility::RtpHeaderParser rtp_parser(packet, length);
158 return rtp_parser.RTCP();
159 }
160
GetCurrentTaskQueueOrThread()161 TaskQueueBase* GetCurrentTaskQueueOrThread() {
162 TaskQueueBase* current = TaskQueueBase::Current();
163 if (!current)
164 current = rtc::ThreadManager::Instance()->CurrentThread();
165 return current;
166 }
167
168 } // namespace
169
170 namespace internal {
171
172 // Wraps an injected resource in a BroadcastResourceListener and handles adding
173 // and removing adapter resources to individual VideoSendStreams.
174 class ResourceVideoSendStreamForwarder {
175 public:
ResourceVideoSendStreamForwarder(rtc::scoped_refptr<webrtc::Resource> resource)176 ResourceVideoSendStreamForwarder(
177 rtc::scoped_refptr<webrtc::Resource> resource)
178 : broadcast_resource_listener_(resource) {
179 broadcast_resource_listener_.StartListening();
180 }
~ResourceVideoSendStreamForwarder()181 ~ResourceVideoSendStreamForwarder() {
182 RTC_DCHECK(adapter_resources_.empty());
183 broadcast_resource_listener_.StopListening();
184 }
185
Resource() const186 rtc::scoped_refptr<webrtc::Resource> Resource() const {
187 return broadcast_resource_listener_.SourceResource();
188 }
189
OnCreateVideoSendStream(VideoSendStream * video_send_stream)190 void OnCreateVideoSendStream(VideoSendStream* video_send_stream) {
191 RTC_DCHECK(adapter_resources_.find(video_send_stream) ==
192 adapter_resources_.end());
193 auto adapter_resource =
194 broadcast_resource_listener_.CreateAdapterResource();
195 video_send_stream->AddAdaptationResource(adapter_resource);
196 adapter_resources_.insert(
197 std::make_pair(video_send_stream, adapter_resource));
198 }
199
OnDestroyVideoSendStream(VideoSendStream * video_send_stream)200 void OnDestroyVideoSendStream(VideoSendStream* video_send_stream) {
201 auto it = adapter_resources_.find(video_send_stream);
202 RTC_DCHECK(it != adapter_resources_.end());
203 broadcast_resource_listener_.RemoveAdapterResource(it->second);
204 adapter_resources_.erase(it);
205 }
206
207 private:
208 BroadcastResourceListener broadcast_resource_listener_;
209 std::map<VideoSendStream*, rtc::scoped_refptr<webrtc::Resource>>
210 adapter_resources_;
211 };
212
213 class Call final : public webrtc::Call,
214 public PacketReceiver,
215 public RecoveredPacketReceiver,
216 public TargetTransferRateObserver,
217 public BitrateAllocator::LimitObserver {
218 public:
219 Call(Clock* clock,
220 const Call::Config& config,
221 std::unique_ptr<RtpTransportControllerSendInterface> transport_send,
222 rtc::scoped_refptr<SharedModuleThread> module_process_thread,
223 TaskQueueFactory* task_queue_factory);
224 ~Call() override;
225
226 // Implements webrtc::Call.
227 PacketReceiver* Receiver() override;
228
229 webrtc::AudioSendStream* CreateAudioSendStream(
230 const webrtc::AudioSendStream::Config& config) override;
231 void DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) override;
232
233 webrtc::AudioReceiveStream* CreateAudioReceiveStream(
234 const webrtc::AudioReceiveStream::Config& config) override;
235 void DestroyAudioReceiveStream(
236 webrtc::AudioReceiveStream* receive_stream) override;
237
238 webrtc::VideoSendStream* CreateVideoSendStream(
239 webrtc::VideoSendStream::Config config,
240 VideoEncoderConfig encoder_config) override;
241 webrtc::VideoSendStream* CreateVideoSendStream(
242 webrtc::VideoSendStream::Config config,
243 VideoEncoderConfig encoder_config,
244 std::unique_ptr<FecController> fec_controller) override;
245 void DestroyVideoSendStream(webrtc::VideoSendStream* send_stream) override;
246
247 webrtc::VideoReceiveStream* CreateVideoReceiveStream(
248 webrtc::VideoReceiveStream::Config configuration) override;
249 void DestroyVideoReceiveStream(
250 webrtc::VideoReceiveStream* receive_stream) override;
251
252 FlexfecReceiveStream* CreateFlexfecReceiveStream(
253 const FlexfecReceiveStream::Config& config) override;
254 void DestroyFlexfecReceiveStream(
255 FlexfecReceiveStream* receive_stream) override;
256
257 void AddAdaptationResource(rtc::scoped_refptr<Resource> resource) override;
258
259 RtpTransportControllerSendInterface* GetTransportControllerSend() override;
260
261 Stats GetStats() const override;
262
263 // Implements PacketReceiver.
264 DeliveryStatus DeliverPacket(MediaType media_type,
265 rtc::CopyOnWriteBuffer packet,
266 int64_t packet_time_us) override;
267
268 // Implements RecoveredPacketReceiver.
269 void OnRecoveredPacket(const uint8_t* packet, size_t length) override;
270
271 void SignalChannelNetworkState(MediaType media, NetworkState state) override;
272
273 void OnAudioTransportOverheadChanged(
274 int transport_overhead_per_packet) override;
275
276 void OnSentPacket(const rtc::SentPacket& sent_packet) override;
277
278 // Implements TargetTransferRateObserver,
279 void OnTargetTransferRate(TargetTransferRate msg) override;
280 void OnStartRateUpdate(DataRate start_rate) override;
281
282 // Implements BitrateAllocator::LimitObserver.
283 void OnAllocationLimitsChanged(BitrateAllocationLimits limits) override;
284
285 void SetClientBitratePreferences(const BitrateSettings& preferences) override;
286
287 private:
288 DeliveryStatus DeliverRtcp(MediaType media_type,
289 const uint8_t* packet,
290 size_t length)
291 RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_);
292 DeliveryStatus DeliverRtp(MediaType media_type,
293 rtc::CopyOnWriteBuffer packet,
294 int64_t packet_time_us)
295 RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_);
296 void ConfigureSync(const std::string& sync_group)
297 RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_);
298
299 void NotifyBweOfReceivedPacket(const RtpPacketReceived& packet,
300 MediaType media_type)
301 RTC_SHARED_LOCKS_REQUIRED(worker_thread_);
302
303 void UpdateSendHistograms(Timestamp first_sent_packet)
304 RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_);
305 void UpdateReceiveHistograms();
306 void UpdateHistograms();
307 void UpdateAggregateNetworkState();
308
309 void RegisterRateObserver();
310
send_transport_queue() const311 rtc::TaskQueue* send_transport_queue() const {
312 return transport_send_ptr_->GetWorkerQueue();
313 }
314
315 Clock* const clock_;
316 TaskQueueFactory* const task_queue_factory_;
317 TaskQueueBase* const worker_thread_;
318
319 const int num_cpu_cores_;
320 const rtc::scoped_refptr<SharedModuleThread> module_process_thread_;
321 const std::unique_ptr<CallStats> call_stats_;
322 const std::unique_ptr<BitrateAllocator> bitrate_allocator_;
323 Call::Config config_;
324
325 NetworkState audio_network_state_;
326 NetworkState video_network_state_;
327 bool aggregate_network_up_ RTC_GUARDED_BY(worker_thread_);
328
329 // Audio, Video, and FlexFEC receive streams are owned by the client that
330 // creates them.
331 std::set<AudioReceiveStream*> audio_receive_streams_
332 RTC_GUARDED_BY(worker_thread_);
333 std::set<VideoReceiveStream2*> video_receive_streams_
334 RTC_GUARDED_BY(worker_thread_);
335
336 std::map<std::string, AudioReceiveStream*> sync_stream_mapping_
337 RTC_GUARDED_BY(worker_thread_);
338
339 // TODO(nisse): Should eventually be injected at creation,
340 // with a single object in the bundled case.
341 RtpStreamReceiverController audio_receiver_controller_;
342 RtpStreamReceiverController video_receiver_controller_;
343
344 // This extra map is used for receive processing which is
345 // independent of media type.
346
347 // TODO(nisse): In the RTP transport refactoring, we should have a
348 // single mapping from ssrc to a more abstract receive stream, with
349 // accessor methods for all configuration we need at this level.
350 struct ReceiveRtpConfig {
ReceiveRtpConfigwebrtc::internal::Call::ReceiveRtpConfig351 explicit ReceiveRtpConfig(const webrtc::AudioReceiveStream::Config& config)
352 : extensions(config.rtp.extensions),
353 use_send_side_bwe(UseSendSideBwe(config)) {}
ReceiveRtpConfigwebrtc::internal::Call::ReceiveRtpConfig354 explicit ReceiveRtpConfig(const webrtc::VideoReceiveStream::Config& config)
355 : extensions(config.rtp.extensions),
356 use_send_side_bwe(UseSendSideBwe(config)) {}
ReceiveRtpConfigwebrtc::internal::Call::ReceiveRtpConfig357 explicit ReceiveRtpConfig(const FlexfecReceiveStream::Config& config)
358 : extensions(config.rtp_header_extensions),
359 use_send_side_bwe(UseSendSideBwe(config)) {}
360
361 // Registered RTP header extensions for each stream. Note that RTP header
362 // extensions are negotiated per track ("m= line") in the SDP, but we have
363 // no notion of tracks at the Call level. We therefore store the RTP header
364 // extensions per SSRC instead, which leads to some storage overhead.
365 const RtpHeaderExtensionMap extensions;
366 // Set if both RTP extension the RTCP feedback message needed for
367 // send side BWE are negotiated.
368 const bool use_send_side_bwe;
369 };
370 std::map<uint32_t, ReceiveRtpConfig> receive_rtp_config_
371 RTC_GUARDED_BY(worker_thread_);
372
373 // Audio and Video send streams are owned by the client that creates them.
374 std::map<uint32_t, AudioSendStream*> audio_send_ssrcs_
375 RTC_GUARDED_BY(worker_thread_);
376 std::map<uint32_t, VideoSendStream*> video_send_ssrcs_
377 RTC_GUARDED_BY(worker_thread_);
378 std::set<VideoSendStream*> video_send_streams_ RTC_GUARDED_BY(worker_thread_);
379
380 // Each forwarder wraps an adaptation resource that was added to the call.
381 std::vector<std::unique_ptr<ResourceVideoSendStreamForwarder>>
382 adaptation_resource_forwarders_ RTC_GUARDED_BY(worker_thread_);
383
384 using RtpStateMap = std::map<uint32_t, RtpState>;
385 RtpStateMap suspended_audio_send_ssrcs_ RTC_GUARDED_BY(worker_thread_);
386 RtpStateMap suspended_video_send_ssrcs_ RTC_GUARDED_BY(worker_thread_);
387
388 using RtpPayloadStateMap = std::map<uint32_t, RtpPayloadState>;
389 RtpPayloadStateMap suspended_video_payload_states_
390 RTC_GUARDED_BY(worker_thread_);
391
392 webrtc::RtcEventLog* event_log_;
393
394 // The following members are only accessed (exclusively) from one thread and
395 // from the destructor, and therefore doesn't need any explicit
396 // synchronization.
397 RateCounter received_bytes_per_second_counter_;
398 RateCounter received_audio_bytes_per_second_counter_;
399 RateCounter received_video_bytes_per_second_counter_;
400 RateCounter received_rtcp_bytes_per_second_counter_;
401 absl::optional<int64_t> first_received_rtp_audio_ms_;
402 absl::optional<int64_t> last_received_rtp_audio_ms_;
403 absl::optional<int64_t> first_received_rtp_video_ms_;
404 absl::optional<int64_t> last_received_rtp_video_ms_;
405
406 uint32_t last_bandwidth_bps_ RTC_GUARDED_BY(worker_thread_);
407 // TODO(holmer): Remove this lock once BitrateController no longer calls
408 // OnNetworkChanged from multiple threads.
409 uint32_t min_allocated_send_bitrate_bps_ RTC_GUARDED_BY(worker_thread_);
410 uint32_t configured_max_padding_bitrate_bps_ RTC_GUARDED_BY(worker_thread_);
411 AvgCounter estimated_send_bitrate_kbps_counter_
412 RTC_GUARDED_BY(worker_thread_);
413 AvgCounter pacer_bitrate_kbps_counter_ RTC_GUARDED_BY(worker_thread_);
414
415 ReceiveSideCongestionController receive_side_cc_;
416
417 const std::unique_ptr<ReceiveTimeCalculator> receive_time_calculator_;
418
419 const std::unique_ptr<SendDelayStats> video_send_delay_stats_;
420 const int64_t start_ms_;
421
422 // Note that |task_safety_| needs to be at a greater scope than the task queue
423 // owned by |transport_send_| since calls might arrive on the network thread
424 // while Call is being deleted and the task queue is being torn down.
425 ScopedTaskSafety task_safety_;
426
427 // Caches transport_send_.get(), to avoid racing with destructor.
428 // Note that this is declared before transport_send_ to ensure that it is not
429 // invalidated until no more tasks can be running on the transport_send_ task
430 // queue.
431 RtpTransportControllerSendInterface* const transport_send_ptr_;
432 // Declared last since it will issue callbacks from a task queue. Declaring it
433 // last ensures that it is destroyed first and any running tasks are finished.
434 std::unique_ptr<RtpTransportControllerSendInterface> transport_send_;
435
436 bool is_target_rate_observer_registered_ RTC_GUARDED_BY(worker_thread_) =
437 false;
438
439 RTC_DISALLOW_COPY_AND_ASSIGN(Call);
440 };
441 } // namespace internal
442
ToString(int64_t time_ms) const443 std::string Call::Stats::ToString(int64_t time_ms) const {
444 char buf[1024];
445 rtc::SimpleStringBuilder ss(buf);
446 ss << "Call stats: " << time_ms << ", {";
447 ss << "send_bw_bps: " << send_bandwidth_bps << ", ";
448 ss << "recv_bw_bps: " << recv_bandwidth_bps << ", ";
449 ss << "max_pad_bps: " << max_padding_bitrate_bps << ", ";
450 ss << "pacer_delay_ms: " << pacer_delay_ms << ", ";
451 ss << "rtt_ms: " << rtt_ms;
452 ss << '}';
453 return ss.str();
454 }
455
Create(const Call::Config & config)456 Call* Call::Create(const Call::Config& config) {
457 rtc::scoped_refptr<SharedModuleThread> call_thread =
458 SharedModuleThread::Create(ProcessThread::Create("ModuleProcessThread"),
459 nullptr);
460 return Create(config, std::move(call_thread));
461 }
462
Create(const Call::Config & config,rtc::scoped_refptr<SharedModuleThread> call_thread)463 Call* Call::Create(const Call::Config& config,
464 rtc::scoped_refptr<SharedModuleThread> call_thread) {
465 return Create(config, Clock::GetRealTimeClock(), std::move(call_thread),
466 ProcessThread::Create("PacerThread"));
467 }
468
Create(const Call::Config & config,Clock * clock,rtc::scoped_refptr<SharedModuleThread> call_thread,std::unique_ptr<ProcessThread> pacer_thread)469 Call* Call::Create(const Call::Config& config,
470 Clock* clock,
471 rtc::scoped_refptr<SharedModuleThread> call_thread,
472 std::unique_ptr<ProcessThread> pacer_thread) {
473 RTC_DCHECK(config.task_queue_factory);
474 return new internal::Call(
475 clock, config,
476 std::make_unique<RtpTransportControllerSend>(
477 clock, config.event_log, config.network_state_predictor_factory,
478 config.network_controller_factory, config.bitrate_config,
479 std::move(pacer_thread), config.task_queue_factory, config.trials),
480 std::move(call_thread), config.task_queue_factory);
481 }
482
483 class SharedModuleThread::Impl {
484 public:
Impl(std::unique_ptr<ProcessThread> process_thread,std::function<void ()> on_one_ref_remaining)485 Impl(std::unique_ptr<ProcessThread> process_thread,
486 std::function<void()> on_one_ref_remaining)
487 : module_thread_(std::move(process_thread)),
488 on_one_ref_remaining_(std::move(on_one_ref_remaining)) {}
489
EnsureStarted()490 void EnsureStarted() {
491 RTC_DCHECK_RUN_ON(&sequence_checker_);
492 if (started_)
493 return;
494 started_ = true;
495 module_thread_->Start();
496 }
497
process_thread()498 ProcessThread* process_thread() {
499 RTC_DCHECK_RUN_ON(&sequence_checker_);
500 return module_thread_.get();
501 }
502
AddRef() const503 void AddRef() const {
504 RTC_DCHECK_RUN_ON(&sequence_checker_);
505 ++ref_count_;
506 }
507
Release() const508 rtc::RefCountReleaseStatus Release() const {
509 RTC_DCHECK_RUN_ON(&sequence_checker_);
510 --ref_count_;
511
512 if (ref_count_ == 0) {
513 module_thread_->Stop();
514 return rtc::RefCountReleaseStatus::kDroppedLastRef;
515 }
516
517 if (ref_count_ == 1 && on_one_ref_remaining_) {
518 auto moved_fn = std::move(on_one_ref_remaining_);
519 // NOTE: after this function returns, chances are that |this| has been
520 // deleted - do not touch any member variables.
521 // If the owner of the last reference implements a lambda that releases
522 // that last reference inside of the callback (which is legal according
523 // to this implementation), we will recursively enter Release() above,
524 // call Stop() and release the last reference.
525 moved_fn();
526 }
527
528 return rtc::RefCountReleaseStatus::kOtherRefsRemained;
529 }
530
531 private:
532 SequenceChecker sequence_checker_;
533 mutable int ref_count_ RTC_GUARDED_BY(sequence_checker_) = 0;
534 std::unique_ptr<ProcessThread> const module_thread_;
535 std::function<void()> const on_one_ref_remaining_;
536 bool started_ = false;
537 };
538
SharedModuleThread(std::unique_ptr<ProcessThread> process_thread,std::function<void ()> on_one_ref_remaining)539 SharedModuleThread::SharedModuleThread(
540 std::unique_ptr<ProcessThread> process_thread,
541 std::function<void()> on_one_ref_remaining)
542 : impl_(std::make_unique<Impl>(std::move(process_thread),
543 std::move(on_one_ref_remaining))) {}
544
545 SharedModuleThread::~SharedModuleThread() = default;
546
547 // static
548
Create(std::unique_ptr<ProcessThread> process_thread,std::function<void ()> on_one_ref_remaining)549 rtc::scoped_refptr<SharedModuleThread> SharedModuleThread::Create(
550 std::unique_ptr<ProcessThread> process_thread,
551 std::function<void()> on_one_ref_remaining) {
552 return new SharedModuleThread(std::move(process_thread),
553 std::move(on_one_ref_remaining));
554 }
555
EnsureStarted()556 void SharedModuleThread::EnsureStarted() {
557 impl_->EnsureStarted();
558 }
559
process_thread()560 ProcessThread* SharedModuleThread::process_thread() {
561 return impl_->process_thread();
562 }
563
AddRef() const564 void SharedModuleThread::AddRef() const {
565 impl_->AddRef();
566 }
567
Release() const568 rtc::RefCountReleaseStatus SharedModuleThread::Release() const {
569 auto ret = impl_->Release();
570 if (ret == rtc::RefCountReleaseStatus::kDroppedLastRef)
571 delete this;
572 return ret;
573 }
574
575 // This method here to avoid subclasses has to implement this method.
576 // Call perf test will use Internal::Call::CreateVideoSendStream() to inject
577 // FecController.
CreateVideoSendStream(VideoSendStream::Config config,VideoEncoderConfig encoder_config,std::unique_ptr<FecController> fec_controller)578 VideoSendStream* Call::CreateVideoSendStream(
579 VideoSendStream::Config config,
580 VideoEncoderConfig encoder_config,
581 std::unique_ptr<FecController> fec_controller) {
582 return nullptr;
583 }
584
585 namespace internal {
586
Call(Clock * clock,const Call::Config & config,std::unique_ptr<RtpTransportControllerSendInterface> transport_send,rtc::scoped_refptr<SharedModuleThread> module_process_thread,TaskQueueFactory * task_queue_factory)587 Call::Call(Clock* clock,
588 const Call::Config& config,
589 std::unique_ptr<RtpTransportControllerSendInterface> transport_send,
590 rtc::scoped_refptr<SharedModuleThread> module_process_thread,
591 TaskQueueFactory* task_queue_factory)
592 : clock_(clock),
593 task_queue_factory_(task_queue_factory),
594 worker_thread_(GetCurrentTaskQueueOrThread()),
595 num_cpu_cores_(CpuInfo::DetectNumberOfCores()),
596 module_process_thread_(std::move(module_process_thread)),
597 call_stats_(new CallStats(clock_, worker_thread_)),
598 bitrate_allocator_(new BitrateAllocator(this)),
599 config_(config),
600 audio_network_state_(kNetworkDown),
601 video_network_state_(kNetworkDown),
602 aggregate_network_up_(false),
603 event_log_(config.event_log),
604 received_bytes_per_second_counter_(clock_, nullptr, true),
605 received_audio_bytes_per_second_counter_(clock_, nullptr, true),
606 received_video_bytes_per_second_counter_(clock_, nullptr, true),
607 received_rtcp_bytes_per_second_counter_(clock_, nullptr, true),
608 last_bandwidth_bps_(0),
609 min_allocated_send_bitrate_bps_(0),
610 configured_max_padding_bitrate_bps_(0),
611 estimated_send_bitrate_kbps_counter_(clock_, nullptr, true),
612 pacer_bitrate_kbps_counter_(clock_, nullptr, true),
613 receive_side_cc_(clock_, transport_send->packet_router()),
614 receive_time_calculator_(ReceiveTimeCalculator::CreateFromFieldTrial()),
615 video_send_delay_stats_(new SendDelayStats(clock_)),
616 start_ms_(clock_->TimeInMilliseconds()),
617 transport_send_ptr_(transport_send.get()),
618 transport_send_(std::move(transport_send)) {
619 RTC_DCHECK(config.event_log != nullptr);
620 RTC_DCHECK(config.trials != nullptr);
621 RTC_DCHECK(worker_thread_->IsCurrent());
622
623 call_stats_->RegisterStatsObserver(&receive_side_cc_);
624
625 module_process_thread_->process_thread()->RegisterModule(
626 receive_side_cc_.GetRemoteBitrateEstimator(true), RTC_FROM_HERE);
627 module_process_thread_->process_thread()->RegisterModule(&receive_side_cc_,
628 RTC_FROM_HERE);
629 }
630
~Call()631 Call::~Call() {
632 RTC_DCHECK_RUN_ON(worker_thread_);
633
634 RTC_CHECK(audio_send_ssrcs_.empty());
635 RTC_CHECK(video_send_ssrcs_.empty());
636 RTC_CHECK(video_send_streams_.empty());
637 RTC_CHECK(audio_receive_streams_.empty());
638 RTC_CHECK(video_receive_streams_.empty());
639
640 module_process_thread_->process_thread()->DeRegisterModule(
641 receive_side_cc_.GetRemoteBitrateEstimator(true));
642 module_process_thread_->process_thread()->DeRegisterModule(&receive_side_cc_);
643 call_stats_->DeregisterStatsObserver(&receive_side_cc_);
644
645 absl::optional<Timestamp> first_sent_packet_ms =
646 transport_send_->GetFirstPacketTime();
647
648 // Only update histograms after process threads have been shut down, so that
649 // they won't try to concurrently update stats.
650 if (first_sent_packet_ms) {
651 UpdateSendHistograms(*first_sent_packet_ms);
652 }
653
654 UpdateReceiveHistograms();
655 UpdateHistograms();
656 }
657
RegisterRateObserver()658 void Call::RegisterRateObserver() {
659 RTC_DCHECK_RUN_ON(worker_thread_);
660
661 if (is_target_rate_observer_registered_)
662 return;
663
664 is_target_rate_observer_registered_ = true;
665
666 // This call seems to kick off a number of things, so probably better left
667 // off being kicked off on request rather than in the ctor.
668 transport_send_ptr_->RegisterTargetTransferRateObserver(this);
669
670 module_process_thread_->EnsureStarted();
671 }
672
SetClientBitratePreferences(const BitrateSettings & preferences)673 void Call::SetClientBitratePreferences(const BitrateSettings& preferences) {
674 RTC_DCHECK_RUN_ON(worker_thread_);
675 GetTransportControllerSend()->SetClientBitratePreferences(preferences);
676 }
677
UpdateHistograms()678 void Call::UpdateHistograms() {
679 RTC_HISTOGRAM_COUNTS_100000(
680 "WebRTC.Call.LifetimeInSeconds",
681 (clock_->TimeInMilliseconds() - start_ms_) / 1000);
682 }
683
684 // Called from the dtor.
UpdateSendHistograms(Timestamp first_sent_packet)685 void Call::UpdateSendHistograms(Timestamp first_sent_packet) {
686 int64_t elapsed_sec =
687 (clock_->TimeInMilliseconds() - first_sent_packet.ms()) / 1000;
688 if (elapsed_sec < metrics::kMinRunTimeInSeconds)
689 return;
690 const int kMinRequiredPeriodicSamples = 5;
691 AggregatedStats send_bitrate_stats =
692 estimated_send_bitrate_kbps_counter_.ProcessAndGetStats();
693 if (send_bitrate_stats.num_samples > kMinRequiredPeriodicSamples) {
694 RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.EstimatedSendBitrateInKbps",
695 send_bitrate_stats.average);
696 RTC_LOG(LS_INFO) << "WebRTC.Call.EstimatedSendBitrateInKbps, "
697 << send_bitrate_stats.ToString();
698 }
699 AggregatedStats pacer_bitrate_stats =
700 pacer_bitrate_kbps_counter_.ProcessAndGetStats();
701 if (pacer_bitrate_stats.num_samples > kMinRequiredPeriodicSamples) {
702 RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.PacerBitrateInKbps",
703 pacer_bitrate_stats.average);
704 RTC_LOG(LS_INFO) << "WebRTC.Call.PacerBitrateInKbps, "
705 << pacer_bitrate_stats.ToString();
706 }
707 }
708
UpdateReceiveHistograms()709 void Call::UpdateReceiveHistograms() {
710 if (first_received_rtp_audio_ms_) {
711 RTC_HISTOGRAM_COUNTS_100000(
712 "WebRTC.Call.TimeReceivingAudioRtpPacketsInSeconds",
713 (*last_received_rtp_audio_ms_ - *first_received_rtp_audio_ms_) / 1000);
714 }
715 if (first_received_rtp_video_ms_) {
716 RTC_HISTOGRAM_COUNTS_100000(
717 "WebRTC.Call.TimeReceivingVideoRtpPacketsInSeconds",
718 (*last_received_rtp_video_ms_ - *first_received_rtp_video_ms_) / 1000);
719 }
720 const int kMinRequiredPeriodicSamples = 5;
721 AggregatedStats video_bytes_per_sec =
722 received_video_bytes_per_second_counter_.GetStats();
723 if (video_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) {
724 RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.VideoBitrateReceivedInKbps",
725 video_bytes_per_sec.average * 8 / 1000);
726 RTC_LOG(LS_INFO) << "WebRTC.Call.VideoBitrateReceivedInBps, "
727 << video_bytes_per_sec.ToStringWithMultiplier(8);
728 }
729 AggregatedStats audio_bytes_per_sec =
730 received_audio_bytes_per_second_counter_.GetStats();
731 if (audio_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) {
732 RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.AudioBitrateReceivedInKbps",
733 audio_bytes_per_sec.average * 8 / 1000);
734 RTC_LOG(LS_INFO) << "WebRTC.Call.AudioBitrateReceivedInBps, "
735 << audio_bytes_per_sec.ToStringWithMultiplier(8);
736 }
737 AggregatedStats rtcp_bytes_per_sec =
738 received_rtcp_bytes_per_second_counter_.GetStats();
739 if (rtcp_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) {
740 RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.RtcpBitrateReceivedInBps",
741 rtcp_bytes_per_sec.average * 8);
742 RTC_LOG(LS_INFO) << "WebRTC.Call.RtcpBitrateReceivedInBps, "
743 << rtcp_bytes_per_sec.ToStringWithMultiplier(8);
744 }
745 AggregatedStats recv_bytes_per_sec =
746 received_bytes_per_second_counter_.GetStats();
747 if (recv_bytes_per_sec.num_samples > kMinRequiredPeriodicSamples) {
748 RTC_HISTOGRAM_COUNTS_100000("WebRTC.Call.BitrateReceivedInKbps",
749 recv_bytes_per_sec.average * 8 / 1000);
750 RTC_LOG(LS_INFO) << "WebRTC.Call.BitrateReceivedInBps, "
751 << recv_bytes_per_sec.ToStringWithMultiplier(8);
752 }
753 }
754
Receiver()755 PacketReceiver* Call::Receiver() {
756 RTC_DCHECK_RUN_ON(worker_thread_);
757 return this;
758 }
759
CreateAudioSendStream(const webrtc::AudioSendStream::Config & config)760 webrtc::AudioSendStream* Call::CreateAudioSendStream(
761 const webrtc::AudioSendStream::Config& config) {
762 TRACE_EVENT0("webrtc", "Call::CreateAudioSendStream");
763 RTC_DCHECK_RUN_ON(worker_thread_);
764
765 RegisterRateObserver();
766
767 // Stream config is logged in AudioSendStream::ConfigureStream, as it may
768 // change during the stream's lifetime.
769 absl::optional<RtpState> suspended_rtp_state;
770 {
771 const auto& iter = suspended_audio_send_ssrcs_.find(config.rtp.ssrc);
772 if (iter != suspended_audio_send_ssrcs_.end()) {
773 suspended_rtp_state.emplace(iter->second);
774 }
775 }
776
777 AudioSendStream* send_stream = new AudioSendStream(
778 clock_, config, config_.audio_state, task_queue_factory_,
779 module_process_thread_->process_thread(), transport_send_ptr_,
780 bitrate_allocator_.get(), event_log_, call_stats_->AsRtcpRttStats(),
781 suspended_rtp_state);
782 RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) ==
783 audio_send_ssrcs_.end());
784 audio_send_ssrcs_[config.rtp.ssrc] = send_stream;
785
786 for (AudioReceiveStream* stream : audio_receive_streams_) {
787 if (stream->config().rtp.local_ssrc == config.rtp.ssrc) {
788 stream->AssociateSendStream(send_stream);
789 }
790 }
791
792 UpdateAggregateNetworkState();
793 return send_stream;
794 }
795
DestroyAudioSendStream(webrtc::AudioSendStream * send_stream)796 void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) {
797 TRACE_EVENT0("webrtc", "Call::DestroyAudioSendStream");
798 RTC_DCHECK_RUN_ON(worker_thread_);
799 RTC_DCHECK(send_stream != nullptr);
800
801 send_stream->Stop();
802
803 const uint32_t ssrc = send_stream->GetConfig().rtp.ssrc;
804 webrtc::internal::AudioSendStream* audio_send_stream =
805 static_cast<webrtc::internal::AudioSendStream*>(send_stream);
806 suspended_audio_send_ssrcs_[ssrc] = audio_send_stream->GetRtpState();
807
808 size_t num_deleted = audio_send_ssrcs_.erase(ssrc);
809 RTC_DCHECK_EQ(1, num_deleted);
810
811 for (AudioReceiveStream* stream : audio_receive_streams_) {
812 if (stream->config().rtp.local_ssrc == ssrc) {
813 stream->AssociateSendStream(nullptr);
814 }
815 }
816
817 UpdateAggregateNetworkState();
818 delete send_stream;
819 }
820
CreateAudioReceiveStream(const webrtc::AudioReceiveStream::Config & config)821 webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream(
822 const webrtc::AudioReceiveStream::Config& config) {
823 TRACE_EVENT0("webrtc", "Call::CreateAudioReceiveStream");
824 RTC_DCHECK_RUN_ON(worker_thread_);
825 RegisterRateObserver();
826 event_log_->Log(std::make_unique<RtcEventAudioReceiveStreamConfig>(
827 CreateRtcLogStreamConfig(config)));
828 AudioReceiveStream* receive_stream = new AudioReceiveStream(
829 clock_, &audio_receiver_controller_, transport_send_ptr_->packet_router(),
830 module_process_thread_->process_thread(), config_.neteq_factory, config,
831 config_.audio_state, event_log_);
832
833 receive_rtp_config_.emplace(config.rtp.remote_ssrc, ReceiveRtpConfig(config));
834 audio_receive_streams_.insert(receive_stream);
835
836 ConfigureSync(config.sync_group);
837
838 auto it = audio_send_ssrcs_.find(config.rtp.local_ssrc);
839 if (it != audio_send_ssrcs_.end()) {
840 receive_stream->AssociateSendStream(it->second);
841 }
842
843 UpdateAggregateNetworkState();
844 return receive_stream;
845 }
846
DestroyAudioReceiveStream(webrtc::AudioReceiveStream * receive_stream)847 void Call::DestroyAudioReceiveStream(
848 webrtc::AudioReceiveStream* receive_stream) {
849 TRACE_EVENT0("webrtc", "Call::DestroyAudioReceiveStream");
850 RTC_DCHECK_RUN_ON(worker_thread_);
851 RTC_DCHECK(receive_stream != nullptr);
852 webrtc::internal::AudioReceiveStream* audio_receive_stream =
853 static_cast<webrtc::internal::AudioReceiveStream*>(receive_stream);
854
855 const AudioReceiveStream::Config& config = audio_receive_stream->config();
856 uint32_t ssrc = config.rtp.remote_ssrc;
857 receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
858 ->RemoveStream(ssrc);
859 audio_receive_streams_.erase(audio_receive_stream);
860 const std::string& sync_group = audio_receive_stream->config().sync_group;
861 const auto it = sync_stream_mapping_.find(sync_group);
862 if (it != sync_stream_mapping_.end() && it->second == audio_receive_stream) {
863 sync_stream_mapping_.erase(it);
864 ConfigureSync(sync_group);
865 }
866 receive_rtp_config_.erase(ssrc);
867
868 UpdateAggregateNetworkState();
869 delete audio_receive_stream;
870 }
871
872 // This method can be used for Call tests with external fec controller factory.
CreateVideoSendStream(webrtc::VideoSendStream::Config config,VideoEncoderConfig encoder_config,std::unique_ptr<FecController> fec_controller)873 webrtc::VideoSendStream* Call::CreateVideoSendStream(
874 webrtc::VideoSendStream::Config config,
875 VideoEncoderConfig encoder_config,
876 std::unique_ptr<FecController> fec_controller) {
877 TRACE_EVENT0("webrtc", "Call::CreateVideoSendStream");
878 RTC_DCHECK_RUN_ON(worker_thread_);
879
880 RegisterRateObserver();
881
882 video_send_delay_stats_->AddSsrcs(config);
883 for (size_t ssrc_index = 0; ssrc_index < config.rtp.ssrcs.size();
884 ++ssrc_index) {
885 event_log_->Log(std::make_unique<RtcEventVideoSendStreamConfig>(
886 CreateRtcLogStreamConfig(config, ssrc_index)));
887 }
888
889 // TODO(mflodman): Base the start bitrate on a current bandwidth estimate, if
890 // the call has already started.
891 // Copy ssrcs from |config| since |config| is moved.
892 std::vector<uint32_t> ssrcs = config.rtp.ssrcs;
893
894 VideoSendStream* send_stream = new VideoSendStream(
895 clock_, num_cpu_cores_, module_process_thread_->process_thread(),
896 task_queue_factory_, call_stats_->AsRtcpRttStats(), transport_send_ptr_,
897 bitrate_allocator_.get(), video_send_delay_stats_.get(), event_log_,
898 std::move(config), std::move(encoder_config), suspended_video_send_ssrcs_,
899 suspended_video_payload_states_, std::move(fec_controller));
900
901 for (uint32_t ssrc : ssrcs) {
902 RTC_DCHECK(video_send_ssrcs_.find(ssrc) == video_send_ssrcs_.end());
903 video_send_ssrcs_[ssrc] = send_stream;
904 }
905 video_send_streams_.insert(send_stream);
906 // Forward resources that were previously added to the call to the new stream.
907 for (const auto& resource_forwarder : adaptation_resource_forwarders_) {
908 resource_forwarder->OnCreateVideoSendStream(send_stream);
909 }
910
911 UpdateAggregateNetworkState();
912
913 return send_stream;
914 }
915
CreateVideoSendStream(webrtc::VideoSendStream::Config config,VideoEncoderConfig encoder_config)916 webrtc::VideoSendStream* Call::CreateVideoSendStream(
917 webrtc::VideoSendStream::Config config,
918 VideoEncoderConfig encoder_config) {
919 if (config_.fec_controller_factory) {
920 RTC_LOG(LS_INFO) << "External FEC Controller will be used.";
921 }
922 std::unique_ptr<FecController> fec_controller =
923 config_.fec_controller_factory
924 ? config_.fec_controller_factory->CreateFecController()
925 : std::make_unique<FecControllerDefault>(clock_);
926 return CreateVideoSendStream(std::move(config), std::move(encoder_config),
927 std::move(fec_controller));
928 }
929
DestroyVideoSendStream(webrtc::VideoSendStream * send_stream)930 void Call::DestroyVideoSendStream(webrtc::VideoSendStream* send_stream) {
931 TRACE_EVENT0("webrtc", "Call::DestroyVideoSendStream");
932 RTC_DCHECK(send_stream != nullptr);
933 RTC_DCHECK_RUN_ON(worker_thread_);
934
935 send_stream->Stop();
936
937 VideoSendStream* send_stream_impl = nullptr;
938
939 auto it = video_send_ssrcs_.begin();
940 while (it != video_send_ssrcs_.end()) {
941 if (it->second == static_cast<VideoSendStream*>(send_stream)) {
942 send_stream_impl = it->second;
943 video_send_ssrcs_.erase(it++);
944 } else {
945 ++it;
946 }
947 }
948 // Stop forwarding resources to the stream being destroyed.
949 for (const auto& resource_forwarder : adaptation_resource_forwarders_) {
950 resource_forwarder->OnDestroyVideoSendStream(send_stream_impl);
951 }
952 video_send_streams_.erase(send_stream_impl);
953
954 RTC_CHECK(send_stream_impl != nullptr);
955
956 VideoSendStream::RtpStateMap rtp_states;
957 VideoSendStream::RtpPayloadStateMap rtp_payload_states;
958 send_stream_impl->StopPermanentlyAndGetRtpStates(&rtp_states,
959 &rtp_payload_states);
960 for (const auto& kv : rtp_states) {
961 suspended_video_send_ssrcs_[kv.first] = kv.second;
962 }
963 for (const auto& kv : rtp_payload_states) {
964 suspended_video_payload_states_[kv.first] = kv.second;
965 }
966
967 UpdateAggregateNetworkState();
968 delete send_stream_impl;
969 }
970
CreateVideoReceiveStream(webrtc::VideoReceiveStream::Config configuration)971 webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream(
972 webrtc::VideoReceiveStream::Config configuration) {
973 TRACE_EVENT0("webrtc", "Call::CreateVideoReceiveStream");
974 RTC_DCHECK_RUN_ON(worker_thread_);
975
976 receive_side_cc_.SetSendPeriodicFeedback(
977 SendPeriodicFeedback(configuration.rtp.extensions));
978
979 RegisterRateObserver();
980
981 TaskQueueBase* current = GetCurrentTaskQueueOrThread();
982 RTC_CHECK(current);
983 VideoReceiveStream2* receive_stream = new VideoReceiveStream2(
984 task_queue_factory_, current, &video_receiver_controller_, num_cpu_cores_,
985 transport_send_ptr_->packet_router(), std::move(configuration),
986 module_process_thread_->process_thread(), call_stats_.get(), clock_,
987 new VCMTiming(clock_));
988
989 const webrtc::VideoReceiveStream::Config& config = receive_stream->config();
990 if (config.rtp.rtx_ssrc) {
991 // We record identical config for the rtx stream as for the main
992 // stream. Since the transport_send_cc negotiation is per payload
993 // type, we may get an incorrect value for the rtx stream, but
994 // that is unlikely to matter in practice.
995 receive_rtp_config_.emplace(config.rtp.rtx_ssrc, ReceiveRtpConfig(config));
996 }
997 receive_rtp_config_.emplace(config.rtp.remote_ssrc, ReceiveRtpConfig(config));
998 video_receive_streams_.insert(receive_stream);
999 ConfigureSync(config.sync_group);
1000
1001 receive_stream->SignalNetworkState(video_network_state_);
1002 UpdateAggregateNetworkState();
1003 event_log_->Log(std::make_unique<RtcEventVideoReceiveStreamConfig>(
1004 CreateRtcLogStreamConfig(config)));
1005 return receive_stream;
1006 }
1007
DestroyVideoReceiveStream(webrtc::VideoReceiveStream * receive_stream)1008 void Call::DestroyVideoReceiveStream(
1009 webrtc::VideoReceiveStream* receive_stream) {
1010 TRACE_EVENT0("webrtc", "Call::DestroyVideoReceiveStream");
1011 RTC_DCHECK_RUN_ON(worker_thread_);
1012 RTC_DCHECK(receive_stream != nullptr);
1013 VideoReceiveStream2* receive_stream_impl =
1014 static_cast<VideoReceiveStream2*>(receive_stream);
1015 const VideoReceiveStream::Config& config = receive_stream_impl->config();
1016
1017 // Remove all ssrcs pointing to a receive stream. As RTX retransmits on a
1018 // separate SSRC there can be either one or two.
1019 receive_rtp_config_.erase(config.rtp.remote_ssrc);
1020 if (config.rtp.rtx_ssrc) {
1021 receive_rtp_config_.erase(config.rtp.rtx_ssrc);
1022 }
1023 video_receive_streams_.erase(receive_stream_impl);
1024 ConfigureSync(config.sync_group);
1025
1026 receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
1027 ->RemoveStream(config.rtp.remote_ssrc);
1028
1029 UpdateAggregateNetworkState();
1030 delete receive_stream_impl;
1031 }
1032
CreateFlexfecReceiveStream(const FlexfecReceiveStream::Config & config)1033 FlexfecReceiveStream* Call::CreateFlexfecReceiveStream(
1034 const FlexfecReceiveStream::Config& config) {
1035 TRACE_EVENT0("webrtc", "Call::CreateFlexfecReceiveStream");
1036 RTC_DCHECK_RUN_ON(worker_thread_);
1037
1038 RecoveredPacketReceiver* recovered_packet_receiver = this;
1039
1040 FlexfecReceiveStreamImpl* receive_stream;
1041
1042 // Unlike the video and audio receive streams, FlexfecReceiveStream implements
1043 // RtpPacketSinkInterface itself, and hence its constructor passes its |this|
1044 // pointer to video_receiver_controller_->CreateStream(). Calling the
1045 // constructor while on the worker thread ensures that we don't call
1046 // OnRtpPacket until the constructor is finished and the object is
1047 // in a valid state, since OnRtpPacket runs on the same thread.
1048 receive_stream = new FlexfecReceiveStreamImpl(
1049 clock_, &video_receiver_controller_, config, recovered_packet_receiver,
1050 call_stats_->AsRtcpRttStats(), module_process_thread_->process_thread());
1051
1052 RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) ==
1053 receive_rtp_config_.end());
1054 receive_rtp_config_.emplace(config.remote_ssrc, ReceiveRtpConfig(config));
1055
1056 // TODO(brandtr): Store config in RtcEventLog here.
1057
1058 return receive_stream;
1059 }
1060
DestroyFlexfecReceiveStream(FlexfecReceiveStream * receive_stream)1061 void Call::DestroyFlexfecReceiveStream(FlexfecReceiveStream* receive_stream) {
1062 TRACE_EVENT0("webrtc", "Call::DestroyFlexfecReceiveStream");
1063 RTC_DCHECK_RUN_ON(worker_thread_);
1064
1065 RTC_DCHECK(receive_stream != nullptr);
1066 const FlexfecReceiveStream::Config& config = receive_stream->GetConfig();
1067 uint32_t ssrc = config.remote_ssrc;
1068 receive_rtp_config_.erase(ssrc);
1069
1070 // Remove all SSRCs pointing to the FlexfecReceiveStreamImpl to be
1071 // destroyed.
1072 receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
1073 ->RemoveStream(ssrc);
1074
1075 delete receive_stream;
1076 }
1077
AddAdaptationResource(rtc::scoped_refptr<Resource> resource)1078 void Call::AddAdaptationResource(rtc::scoped_refptr<Resource> resource) {
1079 RTC_DCHECK_RUN_ON(worker_thread_);
1080 adaptation_resource_forwarders_.push_back(
1081 std::make_unique<ResourceVideoSendStreamForwarder>(resource));
1082 const auto& resource_forwarder = adaptation_resource_forwarders_.back();
1083 for (VideoSendStream* send_stream : video_send_streams_) {
1084 resource_forwarder->OnCreateVideoSendStream(send_stream);
1085 }
1086 }
1087
GetTransportControllerSend()1088 RtpTransportControllerSendInterface* Call::GetTransportControllerSend() {
1089 return transport_send_ptr_;
1090 }
1091
GetStats() const1092 Call::Stats Call::GetStats() const {
1093 RTC_DCHECK_RUN_ON(worker_thread_);
1094
1095 Stats stats;
1096 // TODO(srte): It is unclear if we only want to report queues if network is
1097 // available.
1098 stats.pacer_delay_ms =
1099 aggregate_network_up_ ? transport_send_ptr_->GetPacerQueuingDelayMs() : 0;
1100
1101 stats.rtt_ms = call_stats_->LastProcessedRtt();
1102
1103 // Fetch available send/receive bitrates.
1104 std::vector<unsigned int> ssrcs;
1105 uint32_t recv_bandwidth = 0;
1106 receive_side_cc_.GetRemoteBitrateEstimator(false)->LatestEstimate(
1107 &ssrcs, &recv_bandwidth);
1108 stats.recv_bandwidth_bps = recv_bandwidth;
1109 stats.send_bandwidth_bps = last_bandwidth_bps_;
1110 stats.max_padding_bitrate_bps = configured_max_padding_bitrate_bps_;
1111
1112 return stats;
1113 }
1114
SignalChannelNetworkState(MediaType media,NetworkState state)1115 void Call::SignalChannelNetworkState(MediaType media, NetworkState state) {
1116 RTC_DCHECK_RUN_ON(worker_thread_);
1117 switch (media) {
1118 case MediaType::AUDIO:
1119 audio_network_state_ = state;
1120 break;
1121 case MediaType::VIDEO:
1122 video_network_state_ = state;
1123 break;
1124 case MediaType::ANY:
1125 case MediaType::DATA:
1126 RTC_NOTREACHED();
1127 break;
1128 }
1129
1130 UpdateAggregateNetworkState();
1131 for (VideoReceiveStream2* video_receive_stream : video_receive_streams_) {
1132 video_receive_stream->SignalNetworkState(video_network_state_);
1133 }
1134 }
1135
OnAudioTransportOverheadChanged(int transport_overhead_per_packet)1136 void Call::OnAudioTransportOverheadChanged(int transport_overhead_per_packet) {
1137 RTC_DCHECK_RUN_ON(worker_thread_);
1138 for (auto& kv : audio_send_ssrcs_) {
1139 kv.second->SetTransportOverhead(transport_overhead_per_packet);
1140 }
1141 }
1142
UpdateAggregateNetworkState()1143 void Call::UpdateAggregateNetworkState() {
1144 RTC_DCHECK_RUN_ON(worker_thread_);
1145
1146 bool have_audio =
1147 !audio_send_ssrcs_.empty() || !audio_receive_streams_.empty();
1148 bool have_video =
1149 !video_send_ssrcs_.empty() || !video_receive_streams_.empty();
1150
1151 bool aggregate_network_up =
1152 ((have_video && video_network_state_ == kNetworkUp) ||
1153 (have_audio && audio_network_state_ == kNetworkUp));
1154
1155 if (aggregate_network_up != aggregate_network_up_) {
1156 RTC_LOG(LS_INFO)
1157 << "UpdateAggregateNetworkState: aggregate_state change to "
1158 << (aggregate_network_up ? "up" : "down");
1159 } else {
1160 RTC_LOG(LS_VERBOSE)
1161 << "UpdateAggregateNetworkState: aggregate_state remains at "
1162 << (aggregate_network_up ? "up" : "down");
1163 }
1164 aggregate_network_up_ = aggregate_network_up;
1165
1166 transport_send_ptr_->OnNetworkAvailability(aggregate_network_up);
1167 }
1168
OnSentPacket(const rtc::SentPacket & sent_packet)1169 void Call::OnSentPacket(const rtc::SentPacket& sent_packet) {
1170 video_send_delay_stats_->OnSentPacket(sent_packet.packet_id,
1171 clock_->TimeInMilliseconds());
1172 transport_send_ptr_->OnSentPacket(sent_packet);
1173 }
1174
OnStartRateUpdate(DataRate start_rate)1175 void Call::OnStartRateUpdate(DataRate start_rate) {
1176 RTC_DCHECK_RUN_ON(send_transport_queue());
1177 bitrate_allocator_->UpdateStartRate(start_rate.bps<uint32_t>());
1178 }
1179
OnTargetTransferRate(TargetTransferRate msg)1180 void Call::OnTargetTransferRate(TargetTransferRate msg) {
1181 RTC_DCHECK_RUN_ON(send_transport_queue());
1182
1183 uint32_t target_bitrate_bps = msg.target_rate.bps();
1184 // For controlling the rate of feedback messages.
1185 receive_side_cc_.OnBitrateChanged(target_bitrate_bps);
1186 bitrate_allocator_->OnNetworkEstimateChanged(msg);
1187
1188 worker_thread_->PostTask(
1189 ToQueuedTask(task_safety_, [this, target_bitrate_bps]() {
1190 RTC_DCHECK_RUN_ON(worker_thread_);
1191 last_bandwidth_bps_ = target_bitrate_bps;
1192
1193 // Ignore updates if bitrate is zero (the aggregate network state is
1194 // down) or if we're not sending video.
1195 if (target_bitrate_bps == 0 || video_send_streams_.empty()) {
1196 estimated_send_bitrate_kbps_counter_.ProcessAndPause();
1197 pacer_bitrate_kbps_counter_.ProcessAndPause();
1198 return;
1199 }
1200
1201 estimated_send_bitrate_kbps_counter_.Add(target_bitrate_bps / 1000);
1202 // Pacer bitrate may be higher than bitrate estimate if enforcing min
1203 // bitrate.
1204 uint32_t pacer_bitrate_bps =
1205 std::max(target_bitrate_bps, min_allocated_send_bitrate_bps_);
1206 pacer_bitrate_kbps_counter_.Add(pacer_bitrate_bps / 1000);
1207 }));
1208 }
1209
OnAllocationLimitsChanged(BitrateAllocationLimits limits)1210 void Call::OnAllocationLimitsChanged(BitrateAllocationLimits limits) {
1211 RTC_DCHECK_RUN_ON(send_transport_queue());
1212
1213 transport_send_ptr_->SetAllocatedSendBitrateLimits(limits);
1214
1215 worker_thread_->PostTask(ToQueuedTask(task_safety_, [this, limits]() {
1216 RTC_DCHECK_RUN_ON(worker_thread_);
1217 min_allocated_send_bitrate_bps_ = limits.min_allocatable_rate.bps();
1218 configured_max_padding_bitrate_bps_ = limits.max_padding_rate.bps();
1219 }));
1220 }
1221
ConfigureSync(const std::string & sync_group)1222 void Call::ConfigureSync(const std::string& sync_group) {
1223 // Set sync only if there was no previous one.
1224 if (sync_group.empty())
1225 return;
1226
1227 AudioReceiveStream* sync_audio_stream = nullptr;
1228 // Find existing audio stream.
1229 const auto it = sync_stream_mapping_.find(sync_group);
1230 if (it != sync_stream_mapping_.end()) {
1231 sync_audio_stream = it->second;
1232 } else {
1233 // No configured audio stream, see if we can find one.
1234 for (AudioReceiveStream* stream : audio_receive_streams_) {
1235 if (stream->config().sync_group == sync_group) {
1236 if (sync_audio_stream != nullptr) {
1237 RTC_LOG(LS_WARNING)
1238 << "Attempting to sync more than one audio stream "
1239 "within the same sync group. This is not "
1240 "supported in the current implementation.";
1241 break;
1242 }
1243 sync_audio_stream = stream;
1244 }
1245 }
1246 }
1247 if (sync_audio_stream)
1248 sync_stream_mapping_[sync_group] = sync_audio_stream;
1249 size_t num_synced_streams = 0;
1250 for (VideoReceiveStream2* video_stream : video_receive_streams_) {
1251 if (video_stream->config().sync_group != sync_group)
1252 continue;
1253 ++num_synced_streams;
1254 if (num_synced_streams > 1) {
1255 // TODO(pbos): Support synchronizing more than one A/V pair.
1256 // https://code.google.com/p/webrtc/issues/detail?id=4762
1257 RTC_LOG(LS_WARNING)
1258 << "Attempting to sync more than one audio/video pair "
1259 "within the same sync group. This is not supported in "
1260 "the current implementation.";
1261 }
1262 // Only sync the first A/V pair within this sync group.
1263 if (num_synced_streams == 1) {
1264 // sync_audio_stream may be null and that's ok.
1265 video_stream->SetSync(sync_audio_stream);
1266 } else {
1267 video_stream->SetSync(nullptr);
1268 }
1269 }
1270 }
1271
DeliverRtcp(MediaType media_type,const uint8_t * packet,size_t length)1272 PacketReceiver::DeliveryStatus Call::DeliverRtcp(MediaType media_type,
1273 const uint8_t* packet,
1274 size_t length) {
1275 TRACE_EVENT0("webrtc", "Call::DeliverRtcp");
1276 // TODO(pbos): Make sure it's a valid packet.
1277 // Return DELIVERY_UNKNOWN_SSRC if it can be determined that
1278 // there's no receiver of the packet.
1279 if (received_bytes_per_second_counter_.HasSample()) {
1280 // First RTP packet has been received.
1281 received_bytes_per_second_counter_.Add(static_cast<int>(length));
1282 received_rtcp_bytes_per_second_counter_.Add(static_cast<int>(length));
1283 }
1284 bool rtcp_delivered = false;
1285 if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) {
1286 for (VideoReceiveStream2* stream : video_receive_streams_) {
1287 if (stream->DeliverRtcp(packet, length))
1288 rtcp_delivered = true;
1289 }
1290 }
1291 if (media_type == MediaType::ANY || media_type == MediaType::AUDIO) {
1292 for (AudioReceiveStream* stream : audio_receive_streams_) {
1293 stream->DeliverRtcp(packet, length);
1294 rtcp_delivered = true;
1295 }
1296 }
1297 if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) {
1298 for (VideoSendStream* stream : video_send_streams_) {
1299 stream->DeliverRtcp(packet, length);
1300 rtcp_delivered = true;
1301 }
1302 }
1303 if (media_type == MediaType::ANY || media_type == MediaType::AUDIO) {
1304 for (auto& kv : audio_send_ssrcs_) {
1305 kv.second->DeliverRtcp(packet, length);
1306 rtcp_delivered = true;
1307 }
1308 }
1309
1310 if (rtcp_delivered) {
1311 event_log_->Log(std::make_unique<RtcEventRtcpPacketIncoming>(
1312 rtc::MakeArrayView(packet, length)));
1313 }
1314
1315 return rtcp_delivered ? DELIVERY_OK : DELIVERY_PACKET_ERROR;
1316 }
1317
DeliverRtp(MediaType media_type,rtc::CopyOnWriteBuffer packet,int64_t packet_time_us)1318 PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type,
1319 rtc::CopyOnWriteBuffer packet,
1320 int64_t packet_time_us) {
1321 TRACE_EVENT0("webrtc", "Call::DeliverRtp");
1322
1323 RtpPacketReceived parsed_packet;
1324 if (!parsed_packet.Parse(std::move(packet)))
1325 return DELIVERY_PACKET_ERROR;
1326
1327 if (packet_time_us != -1) {
1328 if (receive_time_calculator_) {
1329 // Repair packet_time_us for clock resets by comparing a new read of
1330 // the same clock (TimeUTCMicros) to a monotonic clock reading.
1331 packet_time_us = receive_time_calculator_->ReconcileReceiveTimes(
1332 packet_time_us, rtc::TimeUTCMicros(), clock_->TimeInMicroseconds());
1333 }
1334 parsed_packet.set_arrival_time_ms((packet_time_us + 500) / 1000);
1335 } else {
1336 parsed_packet.set_arrival_time_ms(clock_->TimeInMilliseconds());
1337 }
1338
1339 // We might get RTP keep-alive packets in accordance with RFC6263 section 4.6.
1340 // These are empty (zero length payload) RTP packets with an unsignaled
1341 // payload type.
1342 const bool is_keep_alive_packet = parsed_packet.payload_size() == 0;
1343
1344 RTC_DCHECK(media_type == MediaType::AUDIO || media_type == MediaType::VIDEO ||
1345 is_keep_alive_packet);
1346
1347 auto it = receive_rtp_config_.find(parsed_packet.Ssrc());
1348 if (it == receive_rtp_config_.end()) {
1349 RTC_LOG(LS_ERROR) << "receive_rtp_config_ lookup failed for ssrc "
1350 << parsed_packet.Ssrc();
1351 // Destruction of the receive stream, including deregistering from the
1352 // RtpDemuxer, is not protected by the |worker_thread_|.
1353 // But deregistering in the |receive_rtp_config_| map is. So by not passing
1354 // the packet on to demuxing in this case, we prevent incoming packets to be
1355 // passed on via the demuxer to a receive stream which is being torned down.
1356 return DELIVERY_UNKNOWN_SSRC;
1357 }
1358
1359 parsed_packet.IdentifyExtensions(it->second.extensions);
1360
1361 NotifyBweOfReceivedPacket(parsed_packet, media_type);
1362
1363 // RateCounters expect input parameter as int, save it as int,
1364 // instead of converting each time it is passed to RateCounter::Add below.
1365 int length = static_cast<int>(parsed_packet.size());
1366 if (media_type == MediaType::AUDIO) {
1367 if (audio_receiver_controller_.OnRtpPacket(parsed_packet)) {
1368 received_bytes_per_second_counter_.Add(length);
1369 received_audio_bytes_per_second_counter_.Add(length);
1370 event_log_->Log(
1371 std::make_unique<RtcEventRtpPacketIncoming>(parsed_packet));
1372 const int64_t arrival_time_ms = parsed_packet.arrival_time_ms();
1373 if (!first_received_rtp_audio_ms_) {
1374 first_received_rtp_audio_ms_.emplace(arrival_time_ms);
1375 }
1376 last_received_rtp_audio_ms_.emplace(arrival_time_ms);
1377 return DELIVERY_OK;
1378 }
1379 } else if (media_type == MediaType::VIDEO) {
1380 parsed_packet.set_payload_type_frequency(kVideoPayloadTypeFrequency);
1381 if (video_receiver_controller_.OnRtpPacket(parsed_packet)) {
1382 received_bytes_per_second_counter_.Add(length);
1383 received_video_bytes_per_second_counter_.Add(length);
1384 event_log_->Log(
1385 std::make_unique<RtcEventRtpPacketIncoming>(parsed_packet));
1386 const int64_t arrival_time_ms = parsed_packet.arrival_time_ms();
1387 if (!first_received_rtp_video_ms_) {
1388 first_received_rtp_video_ms_.emplace(arrival_time_ms);
1389 }
1390 last_received_rtp_video_ms_.emplace(arrival_time_ms);
1391 return DELIVERY_OK;
1392 }
1393 }
1394 return DELIVERY_UNKNOWN_SSRC;
1395 }
1396
DeliverPacket(MediaType media_type,rtc::CopyOnWriteBuffer packet,int64_t packet_time_us)1397 PacketReceiver::DeliveryStatus Call::DeliverPacket(
1398 MediaType media_type,
1399 rtc::CopyOnWriteBuffer packet,
1400 int64_t packet_time_us) {
1401 RTC_DCHECK_RUN_ON(worker_thread_);
1402
1403 if (IsRtcp(packet.cdata(), packet.size()))
1404 return DeliverRtcp(media_type, packet.cdata(), packet.size());
1405
1406 return DeliverRtp(media_type, std::move(packet), packet_time_us);
1407 }
1408
OnRecoveredPacket(const uint8_t * packet,size_t length)1409 void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) {
1410 RTC_DCHECK_RUN_ON(worker_thread_);
1411 RtpPacketReceived parsed_packet;
1412 if (!parsed_packet.Parse(packet, length))
1413 return;
1414
1415 parsed_packet.set_recovered(true);
1416
1417 auto it = receive_rtp_config_.find(parsed_packet.Ssrc());
1418 if (it == receive_rtp_config_.end()) {
1419 RTC_LOG(LS_ERROR) << "receive_rtp_config_ lookup failed for ssrc "
1420 << parsed_packet.Ssrc();
1421 // Destruction of the receive stream, including deregistering from the
1422 // RtpDemuxer, is not protected by the |worker_thread_|.
1423 // But deregistering in the |receive_rtp_config_| map is.
1424 // So by not passing the packet on to demuxing in this case, we prevent
1425 // incoming packets to be passed on via the demuxer to a receive stream
1426 // which is being torn down.
1427 return;
1428 }
1429 parsed_packet.IdentifyExtensions(it->second.extensions);
1430
1431 // TODO(brandtr): Update here when we support protecting audio packets too.
1432 parsed_packet.set_payload_type_frequency(kVideoPayloadTypeFrequency);
1433 video_receiver_controller_.OnRtpPacket(parsed_packet);
1434 }
1435
NotifyBweOfReceivedPacket(const RtpPacketReceived & packet,MediaType media_type)1436 void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet,
1437 MediaType media_type) {
1438 auto it = receive_rtp_config_.find(packet.Ssrc());
1439 bool use_send_side_bwe =
1440 (it != receive_rtp_config_.end()) && it->second.use_send_side_bwe;
1441
1442 RTPHeader header;
1443 packet.GetHeader(&header);
1444
1445 ReceivedPacket packet_msg;
1446 packet_msg.size = DataSize::Bytes(packet.payload_size());
1447 packet_msg.receive_time = Timestamp::Millis(packet.arrival_time_ms());
1448 if (header.extension.hasAbsoluteSendTime) {
1449 packet_msg.send_time = header.extension.GetAbsoluteSendTimestamp();
1450 }
1451 transport_send_ptr_->OnReceivedPacket(packet_msg);
1452
1453 if (!use_send_side_bwe && header.extension.hasTransportSequenceNumber) {
1454 // Inconsistent configuration of send side BWE. Do nothing.
1455 // TODO(nisse): Without this check, we may produce RTCP feedback
1456 // packets even when not negotiated. But it would be cleaner to
1457 // move the check down to RTCPSender::SendFeedbackPacket, which
1458 // would also help the PacketRouter to select an appropriate rtp
1459 // module in the case that some, but not all, have RTCP feedback
1460 // enabled.
1461 return;
1462 }
1463 // For audio, we only support send side BWE.
1464 if (media_type == MediaType::VIDEO ||
1465 (use_send_side_bwe && header.extension.hasTransportSequenceNumber)) {
1466 receive_side_cc_.OnReceivedPacket(
1467 packet.arrival_time_ms(), packet.payload_size() + packet.padding_size(),
1468 header);
1469 }
1470 }
1471
1472 } // namespace internal
1473
1474 } // namespace webrtc
1475