1 /*
2 * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "modules/rtp_rtcp/source/rtcp_transceiver_impl.h"
12
13 #include <utility>
14
15 #include "absl/algorithm/container.h"
16 #include "absl/memory/memory.h"
17 #include "api/call/transport.h"
18 #include "api/video/video_bitrate_allocation.h"
19 #include "modules/rtp_rtcp/include/receive_statistics.h"
20 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
21 #include "modules/rtp_rtcp/source/rtcp_packet.h"
22 #include "modules/rtp_rtcp/source/rtcp_packet/bye.h"
23 #include "modules/rtp_rtcp/source/rtcp_packet/common_header.h"
24 #include "modules/rtp_rtcp/source/rtcp_packet/extended_reports.h"
25 #include "modules/rtp_rtcp/source/rtcp_packet/fir.h"
26 #include "modules/rtp_rtcp/source/rtcp_packet/nack.h"
27 #include "modules/rtp_rtcp/source/rtcp_packet/pli.h"
28 #include "modules/rtp_rtcp/source/rtcp_packet/receiver_report.h"
29 #include "modules/rtp_rtcp/source/rtcp_packet/report_block.h"
30 #include "modules/rtp_rtcp/source/rtcp_packet/sdes.h"
31 #include "modules/rtp_rtcp/source/rtcp_packet/sender_report.h"
32 #include "modules/rtp_rtcp/source/time_util.h"
33 #include "rtc_base/checks.h"
34 #include "rtc_base/logging.h"
35 #include "rtc_base/task_utils/repeating_task.h"
36 #include "rtc_base/task_utils/to_queued_task.h"
37 #include "rtc_base/time_utils.h"
38
39 namespace webrtc {
40 namespace {
41
42 struct SenderReportTimes {
43 int64_t local_received_time_us;
44 NtpTime remote_sent_time;
45 };
46
47 } // namespace
48
49 struct RtcpTransceiverImpl::RemoteSenderState {
50 uint8_t fir_sequence_number = 0;
51 absl::optional<SenderReportTimes> last_received_sender_report;
52 std::vector<MediaReceiverRtcpObserver*> observers;
53 };
54
55 // Helper to put several RTCP packets into lower layer datagram composing
56 // Compound or Reduced-Size RTCP packet, as defined by RFC 5506 section 2.
57 // TODO(danilchap): When in compound mode and packets are so many that several
58 // compound RTCP packets need to be generated, ensure each packet is compound.
59 class RtcpTransceiverImpl::PacketSender {
60 public:
PacketSender(rtcp::RtcpPacket::PacketReadyCallback callback,size_t max_packet_size)61 PacketSender(rtcp::RtcpPacket::PacketReadyCallback callback,
62 size_t max_packet_size)
63 : callback_(callback), max_packet_size_(max_packet_size) {
64 RTC_CHECK_LE(max_packet_size, IP_PACKET_SIZE);
65 }
~PacketSender()66 ~PacketSender() { RTC_DCHECK_EQ(index_, 0) << "Unsent rtcp packet."; }
67
68 // Appends a packet to pending compound packet.
69 // Sends rtcp compound packet if buffer was already full and resets buffer.
AppendPacket(const rtcp::RtcpPacket & packet)70 void AppendPacket(const rtcp::RtcpPacket& packet) {
71 packet.Create(buffer_, &index_, max_packet_size_, callback_);
72 }
73
74 // Sends pending rtcp compound packet.
Send()75 void Send() {
76 if (index_ > 0) {
77 callback_(rtc::ArrayView<const uint8_t>(buffer_, index_));
78 index_ = 0;
79 }
80 }
81
IsEmpty() const82 bool IsEmpty() const { return index_ == 0; }
83
84 private:
85 const rtcp::RtcpPacket::PacketReadyCallback callback_;
86 const size_t max_packet_size_;
87 size_t index_ = 0;
88 uint8_t buffer_[IP_PACKET_SIZE];
89 };
90
RtcpTransceiverImpl(const RtcpTransceiverConfig & config)91 RtcpTransceiverImpl::RtcpTransceiverImpl(const RtcpTransceiverConfig& config)
92 : config_(config), ready_to_send_(config.initial_ready_to_send) {
93 RTC_CHECK(config_.Validate());
94 if (ready_to_send_ && config_.schedule_periodic_compound_packets) {
95 config_.task_queue->PostTask(ToQueuedTask([this] {
96 SchedulePeriodicCompoundPackets(config_.initial_report_delay_ms);
97 }));
98 }
99 }
100
101 RtcpTransceiverImpl::~RtcpTransceiverImpl() = default;
102
AddMediaReceiverRtcpObserver(uint32_t remote_ssrc,MediaReceiverRtcpObserver * observer)103 void RtcpTransceiverImpl::AddMediaReceiverRtcpObserver(
104 uint32_t remote_ssrc,
105 MediaReceiverRtcpObserver* observer) {
106 auto& stored = remote_senders_[remote_ssrc].observers;
107 RTC_DCHECK(!absl::c_linear_search(stored, observer));
108 stored.push_back(observer);
109 }
110
RemoveMediaReceiverRtcpObserver(uint32_t remote_ssrc,MediaReceiverRtcpObserver * observer)111 void RtcpTransceiverImpl::RemoveMediaReceiverRtcpObserver(
112 uint32_t remote_ssrc,
113 MediaReceiverRtcpObserver* observer) {
114 auto remote_sender_it = remote_senders_.find(remote_ssrc);
115 if (remote_sender_it == remote_senders_.end())
116 return;
117 auto& stored = remote_sender_it->second.observers;
118 auto it = absl::c_find(stored, observer);
119 if (it == stored.end())
120 return;
121 stored.erase(it);
122 }
123
SetReadyToSend(bool ready)124 void RtcpTransceiverImpl::SetReadyToSend(bool ready) {
125 if (config_.schedule_periodic_compound_packets) {
126 if (ready_to_send_ && !ready)
127 periodic_task_handle_.Stop();
128
129 if (!ready_to_send_ && ready) // Restart periodic sending.
130 SchedulePeriodicCompoundPackets(config_.report_period_ms / 2);
131 }
132 ready_to_send_ = ready;
133 }
134
ReceivePacket(rtc::ArrayView<const uint8_t> packet,int64_t now_us)135 void RtcpTransceiverImpl::ReceivePacket(rtc::ArrayView<const uint8_t> packet,
136 int64_t now_us) {
137 while (!packet.empty()) {
138 rtcp::CommonHeader rtcp_block;
139 if (!rtcp_block.Parse(packet.data(), packet.size()))
140 return;
141
142 HandleReceivedPacket(rtcp_block, now_us);
143
144 // TODO(danilchap): Use packet.remove_prefix() when that function exists.
145 packet = packet.subview(rtcp_block.packet_size());
146 }
147 }
148
SendCompoundPacket()149 void RtcpTransceiverImpl::SendCompoundPacket() {
150 if (!ready_to_send_)
151 return;
152 SendPeriodicCompoundPacket();
153 ReschedulePeriodicCompoundPackets();
154 }
155
SetRemb(int64_t bitrate_bps,std::vector<uint32_t> ssrcs)156 void RtcpTransceiverImpl::SetRemb(int64_t bitrate_bps,
157 std::vector<uint32_t> ssrcs) {
158 RTC_DCHECK_GE(bitrate_bps, 0);
159
160 bool send_now = config_.send_remb_on_change &&
161 (!remb_.has_value() || bitrate_bps != remb_->bitrate_bps());
162 remb_.emplace();
163 remb_->SetSsrcs(std::move(ssrcs));
164 remb_->SetBitrateBps(bitrate_bps);
165 remb_->SetSenderSsrc(config_.feedback_ssrc);
166 // TODO(bugs.webrtc.org/8239): Move logic from PacketRouter for sending remb
167 // immideately on large bitrate change when there is one RtcpTransceiver per
168 // rtp transport.
169 if (send_now) {
170 absl::optional<rtcp::Remb> remb;
171 remb.swap(remb_);
172 SendImmediateFeedback(*remb);
173 remb.swap(remb_);
174 }
175 }
176
UnsetRemb()177 void RtcpTransceiverImpl::UnsetRemb() {
178 remb_.reset();
179 }
180
SendRawPacket(rtc::ArrayView<const uint8_t> packet)181 void RtcpTransceiverImpl::SendRawPacket(rtc::ArrayView<const uint8_t> packet) {
182 if (!ready_to_send_)
183 return;
184 // Unlike other senders, this functions just tries to send packet away and
185 // disregard rtcp_mode, max_packet_size or anything else.
186 // TODO(bugs.webrtc.org/8239): respect config_ by creating the
187 // TransportFeedback inside this class when there is one per rtp transport.
188 config_.outgoing_transport->SendRtcp(packet.data(), packet.size());
189 }
190
SendNack(uint32_t ssrc,std::vector<uint16_t> sequence_numbers)191 void RtcpTransceiverImpl::SendNack(uint32_t ssrc,
192 std::vector<uint16_t> sequence_numbers) {
193 RTC_DCHECK(!sequence_numbers.empty());
194 if (!ready_to_send_)
195 return;
196 rtcp::Nack nack;
197 nack.SetSenderSsrc(config_.feedback_ssrc);
198 nack.SetMediaSsrc(ssrc);
199 nack.SetPacketIds(std::move(sequence_numbers));
200 SendImmediateFeedback(nack);
201 }
202
SendPictureLossIndication(uint32_t ssrc)203 void RtcpTransceiverImpl::SendPictureLossIndication(uint32_t ssrc) {
204 if (!ready_to_send_)
205 return;
206 rtcp::Pli pli;
207 pli.SetSenderSsrc(config_.feedback_ssrc);
208 pli.SetMediaSsrc(ssrc);
209 SendImmediateFeedback(pli);
210 }
211
SendFullIntraRequest(rtc::ArrayView<const uint32_t> ssrcs,bool new_request)212 void RtcpTransceiverImpl::SendFullIntraRequest(
213 rtc::ArrayView<const uint32_t> ssrcs,
214 bool new_request) {
215 RTC_DCHECK(!ssrcs.empty());
216 if (!ready_to_send_)
217 return;
218 rtcp::Fir fir;
219 fir.SetSenderSsrc(config_.feedback_ssrc);
220 for (uint32_t media_ssrc : ssrcs) {
221 uint8_t& command_seq_num = remote_senders_[media_ssrc].fir_sequence_number;
222 if (new_request)
223 command_seq_num += 1;
224 fir.AddRequestTo(media_ssrc, command_seq_num);
225 }
226 SendImmediateFeedback(fir);
227 }
228
HandleReceivedPacket(const rtcp::CommonHeader & rtcp_packet_header,int64_t now_us)229 void RtcpTransceiverImpl::HandleReceivedPacket(
230 const rtcp::CommonHeader& rtcp_packet_header,
231 int64_t now_us) {
232 switch (rtcp_packet_header.type()) {
233 case rtcp::Bye::kPacketType:
234 HandleBye(rtcp_packet_header);
235 break;
236 case rtcp::SenderReport::kPacketType:
237 HandleSenderReport(rtcp_packet_header, now_us);
238 break;
239 case rtcp::ExtendedReports::kPacketType:
240 HandleExtendedReports(rtcp_packet_header, now_us);
241 break;
242 }
243 }
244
HandleBye(const rtcp::CommonHeader & rtcp_packet_header)245 void RtcpTransceiverImpl::HandleBye(
246 const rtcp::CommonHeader& rtcp_packet_header) {
247 rtcp::Bye bye;
248 if (!bye.Parse(rtcp_packet_header))
249 return;
250 auto remote_sender_it = remote_senders_.find(bye.sender_ssrc());
251 if (remote_sender_it == remote_senders_.end())
252 return;
253 for (MediaReceiverRtcpObserver* observer : remote_sender_it->second.observers)
254 observer->OnBye(bye.sender_ssrc());
255 }
256
HandleSenderReport(const rtcp::CommonHeader & rtcp_packet_header,int64_t now_us)257 void RtcpTransceiverImpl::HandleSenderReport(
258 const rtcp::CommonHeader& rtcp_packet_header,
259 int64_t now_us) {
260 rtcp::SenderReport sender_report;
261 if (!sender_report.Parse(rtcp_packet_header))
262 return;
263 RemoteSenderState& remote_sender =
264 remote_senders_[sender_report.sender_ssrc()];
265 absl::optional<SenderReportTimes>& last =
266 remote_sender.last_received_sender_report;
267 last.emplace();
268 last->local_received_time_us = now_us;
269 last->remote_sent_time = sender_report.ntp();
270
271 for (MediaReceiverRtcpObserver* observer : remote_sender.observers)
272 observer->OnSenderReport(sender_report.sender_ssrc(), sender_report.ntp(),
273 sender_report.rtp_timestamp());
274 }
275
HandleExtendedReports(const rtcp::CommonHeader & rtcp_packet_header,int64_t now_us)276 void RtcpTransceiverImpl::HandleExtendedReports(
277 const rtcp::CommonHeader& rtcp_packet_header,
278 int64_t now_us) {
279 rtcp::ExtendedReports extended_reports;
280 if (!extended_reports.Parse(rtcp_packet_header))
281 return;
282
283 if (extended_reports.dlrr())
284 HandleDlrr(extended_reports.dlrr(), now_us);
285
286 if (extended_reports.target_bitrate())
287 HandleTargetBitrate(*extended_reports.target_bitrate(),
288 extended_reports.sender_ssrc());
289 }
290
HandleDlrr(const rtcp::Dlrr & dlrr,int64_t now_us)291 void RtcpTransceiverImpl::HandleDlrr(const rtcp::Dlrr& dlrr, int64_t now_us) {
292 if (!config_.non_sender_rtt_measurement || config_.rtt_observer == nullptr)
293 return;
294
295 // Delay and last_rr are transferred using 32bit compact ntp resolution.
296 // Convert packet arrival time to same format through 64bit ntp format.
297 uint32_t receive_time_ntp = CompactNtp(TimeMicrosToNtp(now_us));
298 for (const rtcp::ReceiveTimeInfo& rti : dlrr.sub_blocks()) {
299 if (rti.ssrc != config_.feedback_ssrc)
300 continue;
301 uint32_t rtt_ntp = receive_time_ntp - rti.delay_since_last_rr - rti.last_rr;
302 int64_t rtt_ms = CompactNtpRttToMs(rtt_ntp);
303 config_.rtt_observer->OnRttUpdate(rtt_ms);
304 }
305 }
306
HandleTargetBitrate(const rtcp::TargetBitrate & target_bitrate,uint32_t remote_ssrc)307 void RtcpTransceiverImpl::HandleTargetBitrate(
308 const rtcp::TargetBitrate& target_bitrate,
309 uint32_t remote_ssrc) {
310 auto remote_sender_it = remote_senders_.find(remote_ssrc);
311 if (remote_sender_it == remote_senders_.end() ||
312 remote_sender_it->second.observers.empty())
313 return;
314
315 // Convert rtcp::TargetBitrate to VideoBitrateAllocation.
316 VideoBitrateAllocation bitrate_allocation;
317 for (const rtcp::TargetBitrate::BitrateItem& item :
318 target_bitrate.GetTargetBitrates()) {
319 if (item.spatial_layer >= kMaxSpatialLayers ||
320 item.temporal_layer >= kMaxTemporalStreams) {
321 RTC_DLOG(LS_WARNING)
322 << config_.debug_id
323 << "Invalid incoming TargetBitrate with spatial layer "
324 << item.spatial_layer << ", temporal layer " << item.temporal_layer;
325 continue;
326 }
327 bitrate_allocation.SetBitrate(item.spatial_layer, item.temporal_layer,
328 item.target_bitrate_kbps * 1000);
329 }
330
331 for (MediaReceiverRtcpObserver* observer : remote_sender_it->second.observers)
332 observer->OnBitrateAllocation(remote_ssrc, bitrate_allocation);
333 }
334
ReschedulePeriodicCompoundPackets()335 void RtcpTransceiverImpl::ReschedulePeriodicCompoundPackets() {
336 if (!config_.schedule_periodic_compound_packets)
337 return;
338 periodic_task_handle_.Stop();
339 RTC_DCHECK(ready_to_send_);
340 SchedulePeriodicCompoundPackets(config_.report_period_ms);
341 }
342
SchedulePeriodicCompoundPackets(int64_t delay_ms)343 void RtcpTransceiverImpl::SchedulePeriodicCompoundPackets(int64_t delay_ms) {
344 periodic_task_handle_ = RepeatingTaskHandle::DelayedStart(
345 config_.task_queue, TimeDelta::Millis(delay_ms), [this] {
346 RTC_DCHECK(config_.schedule_periodic_compound_packets);
347 RTC_DCHECK(ready_to_send_);
348 SendPeriodicCompoundPacket();
349 return TimeDelta::Millis(config_.report_period_ms);
350 });
351 }
352
CreateCompoundPacket(PacketSender * sender)353 void RtcpTransceiverImpl::CreateCompoundPacket(PacketSender* sender) {
354 RTC_DCHECK(sender->IsEmpty());
355 const uint32_t sender_ssrc = config_.feedback_ssrc;
356 int64_t now_us = rtc::TimeMicros();
357 rtcp::ReceiverReport receiver_report;
358 receiver_report.SetSenderSsrc(sender_ssrc);
359 receiver_report.SetReportBlocks(CreateReportBlocks(now_us));
360 sender->AppendPacket(receiver_report);
361
362 if (!config_.cname.empty()) {
363 rtcp::Sdes sdes;
364 bool added = sdes.AddCName(config_.feedback_ssrc, config_.cname);
365 RTC_DCHECK(added) << "Failed to add cname " << config_.cname
366 << " to rtcp sdes packet.";
367 sender->AppendPacket(sdes);
368 }
369 if (remb_) {
370 remb_->SetSenderSsrc(sender_ssrc);
371 sender->AppendPacket(*remb_);
372 }
373 // TODO(bugs.webrtc.org/8239): Do not send rrtr if this packet starts with
374 // SenderReport instead of ReceiverReport
375 // when RtcpTransceiver supports rtp senders.
376 if (config_.non_sender_rtt_measurement) {
377 rtcp::ExtendedReports xr;
378
379 rtcp::Rrtr rrtr;
380 rrtr.SetNtp(TimeMicrosToNtp(now_us));
381 xr.SetRrtr(rrtr);
382
383 xr.SetSenderSsrc(sender_ssrc);
384 sender->AppendPacket(xr);
385 }
386 }
387
SendPeriodicCompoundPacket()388 void RtcpTransceiverImpl::SendPeriodicCompoundPacket() {
389 auto send_packet = [this](rtc::ArrayView<const uint8_t> packet) {
390 config_.outgoing_transport->SendRtcp(packet.data(), packet.size());
391 };
392 PacketSender sender(send_packet, config_.max_packet_size);
393 CreateCompoundPacket(&sender);
394 sender.Send();
395 }
396
SendCombinedRtcpPacket(std::vector<std::unique_ptr<rtcp::RtcpPacket>> rtcp_packets)397 void RtcpTransceiverImpl::SendCombinedRtcpPacket(
398 std::vector<std::unique_ptr<rtcp::RtcpPacket>> rtcp_packets) {
399 auto send_packet = [this](rtc::ArrayView<const uint8_t> packet) {
400 config_.outgoing_transport->SendRtcp(packet.data(), packet.size());
401 };
402 PacketSender sender(send_packet, config_.max_packet_size);
403
404 for (auto& rtcp_packet : rtcp_packets) {
405 rtcp_packet->SetSenderSsrc(config_.feedback_ssrc);
406 sender.AppendPacket(*rtcp_packet);
407 }
408 sender.Send();
409 }
410
SendImmediateFeedback(const rtcp::RtcpPacket & rtcp_packet)411 void RtcpTransceiverImpl::SendImmediateFeedback(
412 const rtcp::RtcpPacket& rtcp_packet) {
413 auto send_packet = [this](rtc::ArrayView<const uint8_t> packet) {
414 config_.outgoing_transport->SendRtcp(packet.data(), packet.size());
415 };
416 PacketSender sender(send_packet, config_.max_packet_size);
417 // Compound mode requires every sent rtcp packet to be compound, i.e. start
418 // with a sender or receiver report.
419 if (config_.rtcp_mode == RtcpMode::kCompound)
420 CreateCompoundPacket(&sender);
421
422 sender.AppendPacket(rtcp_packet);
423 sender.Send();
424
425 // If compound packet was sent, delay (reschedule) the periodic one.
426 if (config_.rtcp_mode == RtcpMode::kCompound)
427 ReschedulePeriodicCompoundPackets();
428 }
429
CreateReportBlocks(int64_t now_us)430 std::vector<rtcp::ReportBlock> RtcpTransceiverImpl::CreateReportBlocks(
431 int64_t now_us) {
432 if (!config_.receive_statistics)
433 return {};
434 // TODO(danilchap): Support sending more than
435 // |ReceiverReport::kMaxNumberOfReportBlocks| per compound rtcp packet.
436 std::vector<rtcp::ReportBlock> report_blocks =
437 config_.receive_statistics->RtcpReportBlocks(
438 rtcp::ReceiverReport::kMaxNumberOfReportBlocks);
439 uint32_t last_sr = 0;
440 uint32_t last_delay = 0;
441 for (rtcp::ReportBlock& report_block : report_blocks) {
442 auto it = remote_senders_.find(report_block.source_ssrc());
443 if (it == remote_senders_.end() ||
444 !it->second.last_received_sender_report) {
445 continue;
446 }
447 const SenderReportTimes& last_sender_report =
448 *it->second.last_received_sender_report;
449 last_sr = CompactNtp(last_sender_report.remote_sent_time);
450 last_delay = SaturatedUsToCompactNtp(
451 now_us - last_sender_report.local_received_time_us);
452 report_block.SetLastSr(last_sr);
453 report_block.SetDelayLastSr(last_delay);
454 }
455 return report_blocks;
456 }
457
458 } // namespace webrtc
459