1 /*
2  *  Copyright (c) 2015 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "modules/pacing/packet_router.h"
12 
13 #include <algorithm>
14 #include <cstdint>
15 #include <limits>
16 #include <memory>
17 #include <utility>
18 
19 #include "absl/types/optional.h"
20 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
21 #include "modules/rtp_rtcp/source/rtcp_packet.h"
22 #include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h"
23 #include "modules/rtp_rtcp/source/rtp_rtcp_interface.h"
24 #include "rtc_base/checks.h"
25 #include "rtc_base/logging.h"
26 #include "rtc_base/time_utils.h"
27 #include "rtc_base/trace_event.h"
28 
29 namespace webrtc {
30 namespace {
31 
32 constexpr int kRembSendIntervalMs = 200;
33 
34 }  // namespace
35 
PacketRouter()36 PacketRouter::PacketRouter() : PacketRouter(0) {}
37 
PacketRouter(uint16_t start_transport_seq)38 PacketRouter::PacketRouter(uint16_t start_transport_seq)
39     : last_send_module_(nullptr),
40       last_remb_time_ms_(rtc::TimeMillis()),
41       last_send_bitrate_bps_(0),
42       bitrate_bps_(0),
43       max_bitrate_bps_(std::numeric_limits<decltype(max_bitrate_bps_)>::max()),
44       active_remb_module_(nullptr),
45       transport_seq_(start_transport_seq) {}
46 
~PacketRouter()47 PacketRouter::~PacketRouter() {
48   RTC_DCHECK(send_modules_map_.empty());
49   RTC_DCHECK(send_modules_list_.empty());
50   RTC_DCHECK(rtcp_feedback_senders_.empty());
51   RTC_DCHECK(sender_remb_candidates_.empty());
52   RTC_DCHECK(receiver_remb_candidates_.empty());
53   RTC_DCHECK(active_remb_module_ == nullptr);
54 }
55 
AddSendRtpModule(RtpRtcpInterface * rtp_module,bool remb_candidate)56 void PacketRouter::AddSendRtpModule(RtpRtcpInterface* rtp_module,
57                                     bool remb_candidate) {
58   MutexLock lock(&modules_mutex_);
59 
60   AddSendRtpModuleToMap(rtp_module, rtp_module->SSRC());
61   if (absl::optional<uint32_t> rtx_ssrc = rtp_module->RtxSsrc()) {
62     AddSendRtpModuleToMap(rtp_module, *rtx_ssrc);
63   }
64   if (absl::optional<uint32_t> flexfec_ssrc = rtp_module->FlexfecSsrc()) {
65     AddSendRtpModuleToMap(rtp_module, *flexfec_ssrc);
66   }
67 
68   if (rtp_module->SupportsRtxPayloadPadding()) {
69     last_send_module_ = rtp_module;
70   }
71 
72   if (remb_candidate) {
73     AddRembModuleCandidate(rtp_module, /* media_sender = */ true);
74   }
75 }
76 
AddSendRtpModuleToMap(RtpRtcpInterface * rtp_module,uint32_t ssrc)77 void PacketRouter::AddSendRtpModuleToMap(RtpRtcpInterface* rtp_module,
78                                          uint32_t ssrc) {
79   RTC_DCHECK(send_modules_map_.find(ssrc) == send_modules_map_.end());
80   // Always keep the audio modules at the back of the list, so that when we
81   // iterate over the modules in order to find one that can send padding we
82   // will prioritize video. This is important to make sure they are counted
83   // into the bandwidth estimate properly.
84   if (rtp_module->IsAudioConfigured()) {
85     send_modules_list_.push_back(rtp_module);
86   } else {
87     send_modules_list_.push_front(rtp_module);
88   }
89   send_modules_map_[ssrc] = rtp_module;
90 }
91 
RemoveSendRtpModuleFromMap(uint32_t ssrc)92 void PacketRouter::RemoveSendRtpModuleFromMap(uint32_t ssrc) {
93   auto kv = send_modules_map_.find(ssrc);
94   RTC_DCHECK(kv != send_modules_map_.end());
95   send_modules_list_.remove(kv->second);
96   send_modules_map_.erase(kv);
97 }
98 
RemoveSendRtpModule(RtpRtcpInterface * rtp_module)99 void PacketRouter::RemoveSendRtpModule(RtpRtcpInterface* rtp_module) {
100   MutexLock lock(&modules_mutex_);
101   MaybeRemoveRembModuleCandidate(rtp_module, /* media_sender = */ true);
102 
103   RemoveSendRtpModuleFromMap(rtp_module->SSRC());
104   if (absl::optional<uint32_t> rtx_ssrc = rtp_module->RtxSsrc()) {
105     RemoveSendRtpModuleFromMap(*rtx_ssrc);
106   }
107   if (absl::optional<uint32_t> flexfec_ssrc = rtp_module->FlexfecSsrc()) {
108     RemoveSendRtpModuleFromMap(*flexfec_ssrc);
109   }
110 
111   if (last_send_module_ == rtp_module) {
112     last_send_module_ = nullptr;
113   }
114 }
115 
AddReceiveRtpModule(RtcpFeedbackSenderInterface * rtcp_sender,bool remb_candidate)116 void PacketRouter::AddReceiveRtpModule(RtcpFeedbackSenderInterface* rtcp_sender,
117                                        bool remb_candidate) {
118   MutexLock lock(&modules_mutex_);
119   RTC_DCHECK(std::find(rtcp_feedback_senders_.begin(),
120                        rtcp_feedback_senders_.end(),
121                        rtcp_sender) == rtcp_feedback_senders_.end());
122 
123   rtcp_feedback_senders_.push_back(rtcp_sender);
124 
125   if (remb_candidate) {
126     AddRembModuleCandidate(rtcp_sender, /* media_sender = */ false);
127   }
128 }
129 
RemoveReceiveRtpModule(RtcpFeedbackSenderInterface * rtcp_sender)130 void PacketRouter::RemoveReceiveRtpModule(
131     RtcpFeedbackSenderInterface* rtcp_sender) {
132   MutexLock lock(&modules_mutex_);
133   MaybeRemoveRembModuleCandidate(rtcp_sender, /* media_sender = */ false);
134   auto it = std::find(rtcp_feedback_senders_.begin(),
135                       rtcp_feedback_senders_.end(), rtcp_sender);
136   RTC_DCHECK(it != rtcp_feedback_senders_.end());
137   rtcp_feedback_senders_.erase(it);
138 }
139 
SendPacket(std::unique_ptr<RtpPacketToSend> packet,const PacedPacketInfo & cluster_info)140 void PacketRouter::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
141                               const PacedPacketInfo& cluster_info) {
142   TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc"), "PacketRouter::SendPacket",
143                "sequence_number", packet->SequenceNumber(), "rtp_timestamp",
144                packet->Timestamp());
145 
146   MutexLock lock(&modules_mutex_);
147   // With the new pacer code path, transport sequence numbers are only set here,
148   // on the pacer thread. Therefore we don't need atomics/synchronization.
149   if (packet->HasExtension<TransportSequenceNumber>()) {
150     packet->SetExtension<TransportSequenceNumber>((++transport_seq_) & 0xFFFF);
151   }
152 
153   uint32_t ssrc = packet->Ssrc();
154   auto kv = send_modules_map_.find(ssrc);
155   if (kv == send_modules_map_.end()) {
156     RTC_LOG(LS_WARNING)
157         << "Failed to send packet, matching RTP module not found "
158            "or transport error. SSRC = "
159         << packet->Ssrc() << ", sequence number " << packet->SequenceNumber();
160     return;
161   }
162 
163   RtpRtcpInterface* rtp_module = kv->second;
164   if (!rtp_module->TrySendPacket(packet.get(), cluster_info)) {
165     RTC_LOG(LS_WARNING) << "Failed to send packet, rejected by RTP module.";
166     return;
167   }
168 
169   if (rtp_module->SupportsRtxPayloadPadding()) {
170     // This is now the last module to send media, and has the desired
171     // properties needed for payload based padding. Cache it for later use.
172     last_send_module_ = rtp_module;
173   }
174 
175   for (auto& packet : rtp_module->FetchFecPackets()) {
176     pending_fec_packets_.push_back(std::move(packet));
177   }
178 }
179 
FetchFec()180 std::vector<std::unique_ptr<RtpPacketToSend>> PacketRouter::FetchFec() {
181   MutexLock lock(&modules_mutex_);
182   std::vector<std::unique_ptr<RtpPacketToSend>> fec_packets =
183       std::move(pending_fec_packets_);
184   pending_fec_packets_.clear();
185   return fec_packets;
186 }
187 
GeneratePadding(DataSize size)188 std::vector<std::unique_ptr<RtpPacketToSend>> PacketRouter::GeneratePadding(
189     DataSize size) {
190   TRACE_EVENT1(TRACE_DISABLED_BY_DEFAULT("webrtc"),
191                "PacketRouter::GeneratePadding", "bytes", size.bytes());
192 
193   MutexLock lock(&modules_mutex_);
194   // First try on the last rtp module to have sent media. This increases the
195   // the chance that any payload based padding will be useful as it will be
196   // somewhat distributed over modules according the packet rate, even if it
197   // will be more skewed towards the highest bitrate stream. At the very least
198   // this prevents sending payload padding on a disabled stream where it's
199   // guaranteed not to be useful.
200   std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets;
201   if (last_send_module_ != nullptr &&
202       last_send_module_->SupportsRtxPayloadPadding()) {
203     padding_packets = last_send_module_->GeneratePadding(size.bytes());
204   }
205 
206   if (padding_packets.empty()) {
207     // Iterate over all modules send module. Video modules will be at the front
208     // and so will be prioritized. This is important since audio packets may not
209     // be taken into account by the bandwidth estimator, e.g. in FF.
210     for (RtpRtcpInterface* rtp_module : send_modules_list_) {
211       if (rtp_module->SupportsPadding()) {
212         padding_packets = rtp_module->GeneratePadding(size.bytes());
213         if (!padding_packets.empty()) {
214           last_send_module_ = rtp_module;
215           break;
216         }
217       }
218     }
219   }
220 
221 #if RTC_TRACE_EVENTS_ENABLED
222   for (auto& packet : padding_packets) {
223     TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc"),
224                  "PacketRouter::GeneratePadding::Loop", "sequence_number",
225                  packet->SequenceNumber(), "rtp_timestamp",
226                  packet->Timestamp());
227   }
228 #endif
229 
230   return padding_packets;
231 }
232 
CurrentTransportSequenceNumber() const233 uint16_t PacketRouter::CurrentTransportSequenceNumber() const {
234   MutexLock lock(&modules_mutex_);
235   return transport_seq_ & 0xFFFF;
236 }
237 
OnReceiveBitrateChanged(const std::vector<uint32_t> & ssrcs,uint32_t bitrate_bps)238 void PacketRouter::OnReceiveBitrateChanged(const std::vector<uint32_t>& ssrcs,
239                                            uint32_t bitrate_bps) {
240   // % threshold for if we should send a new REMB asap.
241   const int64_t kSendThresholdPercent = 97;
242   // TODO(danilchap): Remove receive_bitrate_bps variable and the cast
243   // when OnReceiveBitrateChanged takes bitrate as int64_t.
244   int64_t receive_bitrate_bps = static_cast<int64_t>(bitrate_bps);
245 
246   int64_t now_ms = rtc::TimeMillis();
247   {
248     MutexLock lock(&remb_mutex_);
249 
250     // If we already have an estimate, check if the new total estimate is below
251     // kSendThresholdPercent of the previous estimate.
252     if (last_send_bitrate_bps_ > 0) {
253       int64_t new_remb_bitrate_bps =
254           last_send_bitrate_bps_ - bitrate_bps_ + receive_bitrate_bps;
255 
256       if (new_remb_bitrate_bps <
257           kSendThresholdPercent * last_send_bitrate_bps_ / 100) {
258         // The new bitrate estimate is less than kSendThresholdPercent % of the
259         // last report. Send a REMB asap.
260         last_remb_time_ms_ = now_ms - kRembSendIntervalMs;
261       }
262     }
263     bitrate_bps_ = receive_bitrate_bps;
264 
265     if (now_ms - last_remb_time_ms_ < kRembSendIntervalMs) {
266       return;
267     }
268     // NOTE: Updated if we intend to send the data; we might not have
269     // a module to actually send it.
270     last_remb_time_ms_ = now_ms;
271     last_send_bitrate_bps_ = receive_bitrate_bps;
272     // Cap the value to send in remb with configured value.
273     receive_bitrate_bps = std::min(receive_bitrate_bps, max_bitrate_bps_);
274   }
275   SendRemb(receive_bitrate_bps, ssrcs);
276 }
277 
SetMaxDesiredReceiveBitrate(int64_t bitrate_bps)278 void PacketRouter::SetMaxDesiredReceiveBitrate(int64_t bitrate_bps) {
279   RTC_DCHECK_GE(bitrate_bps, 0);
280   {
281     MutexLock lock(&remb_mutex_);
282     max_bitrate_bps_ = bitrate_bps;
283     if (rtc::TimeMillis() - last_remb_time_ms_ < kRembSendIntervalMs &&
284         last_send_bitrate_bps_ > 0 &&
285         last_send_bitrate_bps_ <= max_bitrate_bps_) {
286       // Recent measured bitrate is already below the cap.
287       return;
288     }
289   }
290   SendRemb(bitrate_bps, /*ssrcs=*/{});
291 }
292 
SendRemb(int64_t bitrate_bps,const std::vector<uint32_t> & ssrcs)293 bool PacketRouter::SendRemb(int64_t bitrate_bps,
294                             const std::vector<uint32_t>& ssrcs) {
295   MutexLock lock(&modules_mutex_);
296 
297   if (!active_remb_module_) {
298     return false;
299   }
300 
301   // The Add* and Remove* methods above ensure that REMB is disabled on all
302   // other modules, because otherwise, they will send REMB with stale info.
303   active_remb_module_->SetRemb(bitrate_bps, ssrcs);
304 
305   return true;
306 }
307 
SendCombinedRtcpPacket(std::vector<std::unique_ptr<rtcp::RtcpPacket>> packets)308 bool PacketRouter::SendCombinedRtcpPacket(
309     std::vector<std::unique_ptr<rtcp::RtcpPacket>> packets) {
310   MutexLock lock(&modules_mutex_);
311 
312   // Prefer send modules.
313   for (RtpRtcpInterface* rtp_module : send_modules_list_) {
314     if (rtp_module->RTCP() == RtcpMode::kOff) {
315       continue;
316     }
317     rtp_module->SendCombinedRtcpPacket(std::move(packets));
318     return true;
319   }
320 
321   if (rtcp_feedback_senders_.empty()) {
322     return false;
323   }
324   auto* rtcp_sender = rtcp_feedback_senders_[0];
325   rtcp_sender->SendCombinedRtcpPacket(std::move(packets));
326   return true;
327 }
328 
AddRembModuleCandidate(RtcpFeedbackSenderInterface * candidate_module,bool media_sender)329 void PacketRouter::AddRembModuleCandidate(
330     RtcpFeedbackSenderInterface* candidate_module,
331     bool media_sender) {
332   RTC_DCHECK(candidate_module);
333   std::vector<RtcpFeedbackSenderInterface*>& candidates =
334       media_sender ? sender_remb_candidates_ : receiver_remb_candidates_;
335   RTC_DCHECK(std::find(candidates.cbegin(), candidates.cend(),
336                        candidate_module) == candidates.cend());
337   candidates.push_back(candidate_module);
338   DetermineActiveRembModule();
339 }
340 
MaybeRemoveRembModuleCandidate(RtcpFeedbackSenderInterface * candidate_module,bool media_sender)341 void PacketRouter::MaybeRemoveRembModuleCandidate(
342     RtcpFeedbackSenderInterface* candidate_module,
343     bool media_sender) {
344   RTC_DCHECK(candidate_module);
345   std::vector<RtcpFeedbackSenderInterface*>& candidates =
346       media_sender ? sender_remb_candidates_ : receiver_remb_candidates_;
347   auto it = std::find(candidates.begin(), candidates.end(), candidate_module);
348 
349   if (it == candidates.end()) {
350     return;  // Function called due to removal of non-REMB-candidate module.
351   }
352 
353   if (*it == active_remb_module_) {
354     UnsetActiveRembModule();
355   }
356   candidates.erase(it);
357   DetermineActiveRembModule();
358 }
359 
UnsetActiveRembModule()360 void PacketRouter::UnsetActiveRembModule() {
361   RTC_CHECK(active_remb_module_);
362   active_remb_module_->UnsetRemb();
363   active_remb_module_ = nullptr;
364 }
365 
DetermineActiveRembModule()366 void PacketRouter::DetermineActiveRembModule() {
367   // Sender modules take precedence over receiver modules, because SRs (sender
368   // reports) are sent more frequently than RR (receiver reports).
369   // When adding the first sender module, we should change the active REMB
370   // module to be that. Otherwise, we remain with the current active module.
371 
372   RtcpFeedbackSenderInterface* new_active_remb_module;
373 
374   if (!sender_remb_candidates_.empty()) {
375     new_active_remb_module = sender_remb_candidates_.front();
376   } else if (!receiver_remb_candidates_.empty()) {
377     new_active_remb_module = receiver_remb_candidates_.front();
378   } else {
379     new_active_remb_module = nullptr;
380   }
381 
382   if (new_active_remb_module != active_remb_module_ && active_remb_module_) {
383     UnsetActiveRembModule();
384   }
385 
386   active_remb_module_ = new_active_remb_module;
387 }
388 
389 }  // namespace webrtc
390