1 /*
2  *  Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "modules/rtp_rtcp/source/rtcp_transceiver_impl.h"
12 
13 #include <utility>
14 
15 #include "absl/algorithm/container.h"
16 #include "absl/memory/memory.h"
17 #include "api/call/transport.h"
18 #include "api/video/video_bitrate_allocation.h"
19 #include "modules/rtp_rtcp/include/receive_statistics.h"
20 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
21 #include "modules/rtp_rtcp/source/rtcp_packet.h"
22 #include "modules/rtp_rtcp/source/rtcp_packet/bye.h"
23 #include "modules/rtp_rtcp/source/rtcp_packet/common_header.h"
24 #include "modules/rtp_rtcp/source/rtcp_packet/extended_reports.h"
25 #include "modules/rtp_rtcp/source/rtcp_packet/fir.h"
26 #include "modules/rtp_rtcp/source/rtcp_packet/nack.h"
27 #include "modules/rtp_rtcp/source/rtcp_packet/pli.h"
28 #include "modules/rtp_rtcp/source/rtcp_packet/receiver_report.h"
29 #include "modules/rtp_rtcp/source/rtcp_packet/report_block.h"
30 #include "modules/rtp_rtcp/source/rtcp_packet/sdes.h"
31 #include "modules/rtp_rtcp/source/rtcp_packet/sender_report.h"
32 #include "modules/rtp_rtcp/source/time_util.h"
33 #include "rtc_base/checks.h"
34 #include "rtc_base/logging.h"
35 #include "rtc_base/task_utils/repeating_task.h"
36 #include "rtc_base/task_utils/to_queued_task.h"
37 #include "rtc_base/time_utils.h"
38 
39 namespace webrtc {
40 namespace {
41 
42 struct SenderReportTimes {
43   int64_t local_received_time_us;
44   NtpTime remote_sent_time;
45 };
46 
47 }  // namespace
48 
49 struct RtcpTransceiverImpl::RemoteSenderState {
50   uint8_t fir_sequence_number = 0;
51   absl::optional<SenderReportTimes> last_received_sender_report;
52   std::vector<MediaReceiverRtcpObserver*> observers;
53 };
54 
55 // Helper to put several RTCP packets into lower layer datagram composing
56 // Compound or Reduced-Size RTCP packet, as defined by RFC 5506 section 2.
57 // TODO(danilchap): When in compound mode and packets are so many that several
58 // compound RTCP packets need to be generated, ensure each packet is compound.
59 class RtcpTransceiverImpl::PacketSender {
60  public:
PacketSender(rtcp::RtcpPacket::PacketReadyCallback callback,size_t max_packet_size)61   PacketSender(rtcp::RtcpPacket::PacketReadyCallback callback,
62                size_t max_packet_size)
63       : callback_(callback), max_packet_size_(max_packet_size) {
64     RTC_CHECK_LE(max_packet_size, IP_PACKET_SIZE);
65   }
~PacketSender()66   ~PacketSender() { RTC_DCHECK_EQ(index_, 0) << "Unsent rtcp packet."; }
67 
68   // Appends a packet to pending compound packet.
69   // Sends rtcp compound packet if buffer was already full and resets buffer.
AppendPacket(const rtcp::RtcpPacket & packet)70   void AppendPacket(const rtcp::RtcpPacket& packet) {
71     packet.Create(buffer_, &index_, max_packet_size_, callback_);
72   }
73 
74   // Sends pending rtcp compound packet.
Send()75   void Send() {
76     if (index_ > 0) {
77       callback_(rtc::ArrayView<const uint8_t>(buffer_, index_));
78       index_ = 0;
79     }
80   }
81 
IsEmpty() const82   bool IsEmpty() const { return index_ == 0; }
83 
84  private:
85   const rtcp::RtcpPacket::PacketReadyCallback callback_;
86   const size_t max_packet_size_;
87   size_t index_ = 0;
88   uint8_t buffer_[IP_PACKET_SIZE];
89 };
90 
RtcpTransceiverImpl(const RtcpTransceiverConfig & config)91 RtcpTransceiverImpl::RtcpTransceiverImpl(const RtcpTransceiverConfig& config)
92     : config_(config), ready_to_send_(config.initial_ready_to_send) {
93   RTC_CHECK(config_.Validate());
94   if (ready_to_send_ && config_.schedule_periodic_compound_packets) {
95     config_.task_queue->PostTask(ToQueuedTask([this] {
96       SchedulePeriodicCompoundPackets(config_.initial_report_delay_ms);
97     }));
98   }
99 }
100 
101 RtcpTransceiverImpl::~RtcpTransceiverImpl() = default;
102 
AddMediaReceiverRtcpObserver(uint32_t remote_ssrc,MediaReceiverRtcpObserver * observer)103 void RtcpTransceiverImpl::AddMediaReceiverRtcpObserver(
104     uint32_t remote_ssrc,
105     MediaReceiverRtcpObserver* observer) {
106   auto& stored = remote_senders_[remote_ssrc].observers;
107   RTC_DCHECK(!absl::c_linear_search(stored, observer));
108   stored.push_back(observer);
109 }
110 
RemoveMediaReceiverRtcpObserver(uint32_t remote_ssrc,MediaReceiverRtcpObserver * observer)111 void RtcpTransceiverImpl::RemoveMediaReceiverRtcpObserver(
112     uint32_t remote_ssrc,
113     MediaReceiverRtcpObserver* observer) {
114   auto remote_sender_it = remote_senders_.find(remote_ssrc);
115   if (remote_sender_it == remote_senders_.end())
116     return;
117   auto& stored = remote_sender_it->second.observers;
118   auto it = absl::c_find(stored, observer);
119   if (it == stored.end())
120     return;
121   stored.erase(it);
122 }
123 
SetReadyToSend(bool ready)124 void RtcpTransceiverImpl::SetReadyToSend(bool ready) {
125   if (config_.schedule_periodic_compound_packets) {
126     if (ready_to_send_ && !ready)
127       periodic_task_handle_.Stop();
128 
129     if (!ready_to_send_ && ready)  // Restart periodic sending.
130       SchedulePeriodicCompoundPackets(config_.report_period_ms / 2);
131   }
132   ready_to_send_ = ready;
133 }
134 
ReceivePacket(rtc::ArrayView<const uint8_t> packet,int64_t now_us)135 void RtcpTransceiverImpl::ReceivePacket(rtc::ArrayView<const uint8_t> packet,
136                                         int64_t now_us) {
137   while (!packet.empty()) {
138     rtcp::CommonHeader rtcp_block;
139     if (!rtcp_block.Parse(packet.data(), packet.size()))
140       return;
141 
142     HandleReceivedPacket(rtcp_block, now_us);
143 
144     // TODO(danilchap): Use packet.remove_prefix() when that function exists.
145     packet = packet.subview(rtcp_block.packet_size());
146   }
147 }
148 
SendCompoundPacket()149 void RtcpTransceiverImpl::SendCompoundPacket() {
150   if (!ready_to_send_)
151     return;
152   SendPeriodicCompoundPacket();
153   ReschedulePeriodicCompoundPackets();
154 }
155 
SetRemb(int64_t bitrate_bps,std::vector<uint32_t> ssrcs)156 void RtcpTransceiverImpl::SetRemb(int64_t bitrate_bps,
157                                   std::vector<uint32_t> ssrcs) {
158   RTC_DCHECK_GE(bitrate_bps, 0);
159 
160   bool send_now = config_.send_remb_on_change &&
161                   (!remb_.has_value() || bitrate_bps != remb_->bitrate_bps());
162   remb_.emplace();
163   remb_->SetSsrcs(std::move(ssrcs));
164   remb_->SetBitrateBps(bitrate_bps);
165   remb_->SetSenderSsrc(config_.feedback_ssrc);
166   // TODO(bugs.webrtc.org/8239): Move logic from PacketRouter for sending remb
167   // immideately on large bitrate change when there is one RtcpTransceiver per
168   // rtp transport.
169   if (send_now) {
170     absl::optional<rtcp::Remb> remb;
171     remb.swap(remb_);
172     SendImmediateFeedback(*remb);
173     remb.swap(remb_);
174   }
175 }
176 
UnsetRemb()177 void RtcpTransceiverImpl::UnsetRemb() {
178   remb_.reset();
179 }
180 
SendRawPacket(rtc::ArrayView<const uint8_t> packet)181 void RtcpTransceiverImpl::SendRawPacket(rtc::ArrayView<const uint8_t> packet) {
182   if (!ready_to_send_)
183     return;
184   // Unlike other senders, this functions just tries to send packet away and
185   // disregard rtcp_mode, max_packet_size or anything else.
186   // TODO(bugs.webrtc.org/8239): respect config_ by creating the
187   // TransportFeedback inside this class when there is one per rtp transport.
188   config_.outgoing_transport->SendRtcp(packet.data(), packet.size());
189 }
190 
SendNack(uint32_t ssrc,std::vector<uint16_t> sequence_numbers)191 void RtcpTransceiverImpl::SendNack(uint32_t ssrc,
192                                    std::vector<uint16_t> sequence_numbers) {
193   RTC_DCHECK(!sequence_numbers.empty());
194   if (!ready_to_send_)
195     return;
196   rtcp::Nack nack;
197   nack.SetSenderSsrc(config_.feedback_ssrc);
198   nack.SetMediaSsrc(ssrc);
199   nack.SetPacketIds(std::move(sequence_numbers));
200   SendImmediateFeedback(nack);
201 }
202 
SendPictureLossIndication(uint32_t ssrc)203 void RtcpTransceiverImpl::SendPictureLossIndication(uint32_t ssrc) {
204   if (!ready_to_send_)
205     return;
206   rtcp::Pli pli;
207   pli.SetSenderSsrc(config_.feedback_ssrc);
208   pli.SetMediaSsrc(ssrc);
209   SendImmediateFeedback(pli);
210 }
211 
SendFullIntraRequest(rtc::ArrayView<const uint32_t> ssrcs,bool new_request)212 void RtcpTransceiverImpl::SendFullIntraRequest(
213     rtc::ArrayView<const uint32_t> ssrcs,
214     bool new_request) {
215   RTC_DCHECK(!ssrcs.empty());
216   if (!ready_to_send_)
217     return;
218   rtcp::Fir fir;
219   fir.SetSenderSsrc(config_.feedback_ssrc);
220   for (uint32_t media_ssrc : ssrcs) {
221     uint8_t& command_seq_num = remote_senders_[media_ssrc].fir_sequence_number;
222     if (new_request)
223       command_seq_num += 1;
224     fir.AddRequestTo(media_ssrc, command_seq_num);
225   }
226   SendImmediateFeedback(fir);
227 }
228 
HandleReceivedPacket(const rtcp::CommonHeader & rtcp_packet_header,int64_t now_us)229 void RtcpTransceiverImpl::HandleReceivedPacket(
230     const rtcp::CommonHeader& rtcp_packet_header,
231     int64_t now_us) {
232   switch (rtcp_packet_header.type()) {
233     case rtcp::Bye::kPacketType:
234       HandleBye(rtcp_packet_header);
235       break;
236     case rtcp::SenderReport::kPacketType:
237       HandleSenderReport(rtcp_packet_header, now_us);
238       break;
239     case rtcp::ExtendedReports::kPacketType:
240       HandleExtendedReports(rtcp_packet_header, now_us);
241       break;
242   }
243 }
244 
HandleBye(const rtcp::CommonHeader & rtcp_packet_header)245 void RtcpTransceiverImpl::HandleBye(
246     const rtcp::CommonHeader& rtcp_packet_header) {
247   rtcp::Bye bye;
248   if (!bye.Parse(rtcp_packet_header))
249     return;
250   auto remote_sender_it = remote_senders_.find(bye.sender_ssrc());
251   if (remote_sender_it == remote_senders_.end())
252     return;
253   for (MediaReceiverRtcpObserver* observer : remote_sender_it->second.observers)
254     observer->OnBye(bye.sender_ssrc());
255 }
256 
HandleSenderReport(const rtcp::CommonHeader & rtcp_packet_header,int64_t now_us)257 void RtcpTransceiverImpl::HandleSenderReport(
258     const rtcp::CommonHeader& rtcp_packet_header,
259     int64_t now_us) {
260   rtcp::SenderReport sender_report;
261   if (!sender_report.Parse(rtcp_packet_header))
262     return;
263   RemoteSenderState& remote_sender =
264       remote_senders_[sender_report.sender_ssrc()];
265   absl::optional<SenderReportTimes>& last =
266       remote_sender.last_received_sender_report;
267   last.emplace();
268   last->local_received_time_us = now_us;
269   last->remote_sent_time = sender_report.ntp();
270 
271   for (MediaReceiverRtcpObserver* observer : remote_sender.observers)
272     observer->OnSenderReport(sender_report.sender_ssrc(), sender_report.ntp(),
273                              sender_report.rtp_timestamp());
274 }
275 
HandleExtendedReports(const rtcp::CommonHeader & rtcp_packet_header,int64_t now_us)276 void RtcpTransceiverImpl::HandleExtendedReports(
277     const rtcp::CommonHeader& rtcp_packet_header,
278     int64_t now_us) {
279   rtcp::ExtendedReports extended_reports;
280   if (!extended_reports.Parse(rtcp_packet_header))
281     return;
282 
283   if (extended_reports.dlrr())
284     HandleDlrr(extended_reports.dlrr(), now_us);
285 
286   if (extended_reports.target_bitrate())
287     HandleTargetBitrate(*extended_reports.target_bitrate(),
288                         extended_reports.sender_ssrc());
289 }
290 
HandleDlrr(const rtcp::Dlrr & dlrr,int64_t now_us)291 void RtcpTransceiverImpl::HandleDlrr(const rtcp::Dlrr& dlrr, int64_t now_us) {
292   if (!config_.non_sender_rtt_measurement || config_.rtt_observer == nullptr)
293     return;
294 
295   // Delay and last_rr are transferred using 32bit compact ntp resolution.
296   // Convert packet arrival time to same format through 64bit ntp format.
297   uint32_t receive_time_ntp = CompactNtp(TimeMicrosToNtp(now_us));
298   for (const rtcp::ReceiveTimeInfo& rti : dlrr.sub_blocks()) {
299     if (rti.ssrc != config_.feedback_ssrc)
300       continue;
301     uint32_t rtt_ntp = receive_time_ntp - rti.delay_since_last_rr - rti.last_rr;
302     int64_t rtt_ms = CompactNtpRttToMs(rtt_ntp);
303     config_.rtt_observer->OnRttUpdate(rtt_ms);
304   }
305 }
306 
HandleTargetBitrate(const rtcp::TargetBitrate & target_bitrate,uint32_t remote_ssrc)307 void RtcpTransceiverImpl::HandleTargetBitrate(
308     const rtcp::TargetBitrate& target_bitrate,
309     uint32_t remote_ssrc) {
310   auto remote_sender_it = remote_senders_.find(remote_ssrc);
311   if (remote_sender_it == remote_senders_.end() ||
312       remote_sender_it->second.observers.empty())
313     return;
314 
315   // Convert rtcp::TargetBitrate to VideoBitrateAllocation.
316   VideoBitrateAllocation bitrate_allocation;
317   for (const rtcp::TargetBitrate::BitrateItem& item :
318        target_bitrate.GetTargetBitrates()) {
319     if (item.spatial_layer >= kMaxSpatialLayers ||
320         item.temporal_layer >= kMaxTemporalStreams) {
321       RTC_DLOG(LS_WARNING)
322           << config_.debug_id
323           << "Invalid incoming TargetBitrate with spatial layer "
324           << item.spatial_layer << ", temporal layer " << item.temporal_layer;
325       continue;
326     }
327     bitrate_allocation.SetBitrate(item.spatial_layer, item.temporal_layer,
328                                   item.target_bitrate_kbps * 1000);
329   }
330 
331   for (MediaReceiverRtcpObserver* observer : remote_sender_it->second.observers)
332     observer->OnBitrateAllocation(remote_ssrc, bitrate_allocation);
333 }
334 
ReschedulePeriodicCompoundPackets()335 void RtcpTransceiverImpl::ReschedulePeriodicCompoundPackets() {
336   if (!config_.schedule_periodic_compound_packets)
337     return;
338   periodic_task_handle_.Stop();
339   RTC_DCHECK(ready_to_send_);
340   SchedulePeriodicCompoundPackets(config_.report_period_ms);
341 }
342 
SchedulePeriodicCompoundPackets(int64_t delay_ms)343 void RtcpTransceiverImpl::SchedulePeriodicCompoundPackets(int64_t delay_ms) {
344   periodic_task_handle_ = RepeatingTaskHandle::DelayedStart(
345       config_.task_queue, TimeDelta::Millis(delay_ms), [this] {
346         RTC_DCHECK(config_.schedule_periodic_compound_packets);
347         RTC_DCHECK(ready_to_send_);
348         SendPeriodicCompoundPacket();
349         return TimeDelta::Millis(config_.report_period_ms);
350       });
351 }
352 
CreateCompoundPacket(PacketSender * sender)353 void RtcpTransceiverImpl::CreateCompoundPacket(PacketSender* sender) {
354   RTC_DCHECK(sender->IsEmpty());
355   const uint32_t sender_ssrc = config_.feedback_ssrc;
356   int64_t now_us = rtc::TimeMicros();
357   rtcp::ReceiverReport receiver_report;
358   receiver_report.SetSenderSsrc(sender_ssrc);
359   receiver_report.SetReportBlocks(CreateReportBlocks(now_us));
360   sender->AppendPacket(receiver_report);
361 
362   if (!config_.cname.empty()) {
363     rtcp::Sdes sdes;
364     bool added = sdes.AddCName(config_.feedback_ssrc, config_.cname);
365     RTC_DCHECK(added) << "Failed to add cname " << config_.cname
366                       << " to rtcp sdes packet.";
367     sender->AppendPacket(sdes);
368   }
369   if (remb_) {
370     remb_->SetSenderSsrc(sender_ssrc);
371     sender->AppendPacket(*remb_);
372   }
373   // TODO(bugs.webrtc.org/8239): Do not send rrtr if this packet starts with
374   // SenderReport instead of ReceiverReport
375   // when RtcpTransceiver supports rtp senders.
376   if (config_.non_sender_rtt_measurement) {
377     rtcp::ExtendedReports xr;
378 
379     rtcp::Rrtr rrtr;
380     rrtr.SetNtp(TimeMicrosToNtp(now_us));
381     xr.SetRrtr(rrtr);
382 
383     xr.SetSenderSsrc(sender_ssrc);
384     sender->AppendPacket(xr);
385   }
386 }
387 
SendPeriodicCompoundPacket()388 void RtcpTransceiverImpl::SendPeriodicCompoundPacket() {
389   auto send_packet = [this](rtc::ArrayView<const uint8_t> packet) {
390     config_.outgoing_transport->SendRtcp(packet.data(), packet.size());
391   };
392   PacketSender sender(send_packet, config_.max_packet_size);
393   CreateCompoundPacket(&sender);
394   sender.Send();
395 }
396 
SendCombinedRtcpPacket(std::vector<std::unique_ptr<rtcp::RtcpPacket>> rtcp_packets)397 void RtcpTransceiverImpl::SendCombinedRtcpPacket(
398     std::vector<std::unique_ptr<rtcp::RtcpPacket>> rtcp_packets) {
399   auto send_packet = [this](rtc::ArrayView<const uint8_t> packet) {
400     config_.outgoing_transport->SendRtcp(packet.data(), packet.size());
401   };
402   PacketSender sender(send_packet, config_.max_packet_size);
403 
404   for (auto& rtcp_packet : rtcp_packets) {
405     rtcp_packet->SetSenderSsrc(config_.feedback_ssrc);
406     sender.AppendPacket(*rtcp_packet);
407   }
408   sender.Send();
409 }
410 
SendImmediateFeedback(const rtcp::RtcpPacket & rtcp_packet)411 void RtcpTransceiverImpl::SendImmediateFeedback(
412     const rtcp::RtcpPacket& rtcp_packet) {
413   auto send_packet = [this](rtc::ArrayView<const uint8_t> packet) {
414     config_.outgoing_transport->SendRtcp(packet.data(), packet.size());
415   };
416   PacketSender sender(send_packet, config_.max_packet_size);
417   // Compound mode requires every sent rtcp packet to be compound, i.e. start
418   // with a sender or receiver report.
419   if (config_.rtcp_mode == RtcpMode::kCompound)
420     CreateCompoundPacket(&sender);
421 
422   sender.AppendPacket(rtcp_packet);
423   sender.Send();
424 
425   // If compound packet was sent, delay (reschedule) the periodic one.
426   if (config_.rtcp_mode == RtcpMode::kCompound)
427     ReschedulePeriodicCompoundPackets();
428 }
429 
CreateReportBlocks(int64_t now_us)430 std::vector<rtcp::ReportBlock> RtcpTransceiverImpl::CreateReportBlocks(
431     int64_t now_us) {
432   if (!config_.receive_statistics)
433     return {};
434   // TODO(danilchap): Support sending more than
435   // |ReceiverReport::kMaxNumberOfReportBlocks| per compound rtcp packet.
436   std::vector<rtcp::ReportBlock> report_blocks =
437       config_.receive_statistics->RtcpReportBlocks(
438           rtcp::ReceiverReport::kMaxNumberOfReportBlocks);
439   uint32_t last_sr = 0;
440   uint32_t last_delay = 0;
441   for (rtcp::ReportBlock& report_block : report_blocks) {
442     auto it = remote_senders_.find(report_block.source_ssrc());
443     if (it == remote_senders_.end() ||
444         !it->second.last_received_sender_report) {
445       continue;
446     }
447     const SenderReportTimes& last_sender_report =
448         *it->second.last_received_sender_report;
449     last_sr = CompactNtp(last_sender_report.remote_sent_time);
450     last_delay = SaturatedUsToCompactNtp(
451         now_us - last_sender_report.local_received_time_us);
452     report_block.SetLastSr(last_sr);
453     report_block.SetDelayLastSr(last_delay);
454   }
455   return report_blocks;
456 }
457 
458 }  // namespace webrtc
459