1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "cast/streaming/compound_rtcp_parser.h"
6 
7 #include <algorithm>
8 
9 #include "cast/streaming/packet_util.h"
10 #include "cast/streaming/rtcp_session.h"
11 #include "util/osp_logging.h"
12 #include "util/std_util.h"
13 
14 namespace openscreen {
15 namespace cast {
16 
17 namespace {
18 
19 // Use the Clock's minimum time value (an impossible value, waaaaay before epoch
20 // time) to represent unset time_point values.
21 constexpr auto kNullTimePoint = Clock::time_point::min();
22 
23 // Canonicalizes the just-parsed list of packet-specific NACKs so that the
24 // CompoundRtcpParser::Client can make several simplifying assumptions when
25 // processing the results.
CanonicalizePacketNackVector(std::vector<PacketNack> * packets)26 void CanonicalizePacketNackVector(std::vector<PacketNack>* packets) {
27   // First, sort all elements. The sort order is the normal lexicographical
28   // ordering, with one exception: The special kAllPacketsLost packet_id value
29   // should be treated as coming before all others. This special sort order
30   // allows the filtering algorithm below to be simpler, and only require one
31   // pass; and the final result will be the normal lexicographically-sorted
32   // output the CompoundRtcpParser::Client expects.
33   std::sort(packets->begin(), packets->end(),
34             [](const PacketNack& a, const PacketNack& b) {
35               // Since the comparator is a hot code path, use a simple modular
36               // arithmetic trick in lieu of extra branching: When comparing the
37               // tuples, map all packet_id values to packet_id + 1, mod 0x10000.
38               // This results in the desired sorting behavior since
39               // kAllPacketsLost (0xffff) wraps-around to 0x0000, and all other
40               // values become N + 1.
41               static_assert(static_cast<FramePacketId>(kAllPacketsLost + 1) <
42                                 FramePacketId{0x0000 + 1},
43                             "comparison requires integer wrap-around");
44               return PacketNack{a.frame_id,
45                                 static_cast<FramePacketId>(a.packet_id + 1)} <
46                      PacketNack{b.frame_id,
47                                 static_cast<FramePacketId>(b.packet_id + 1)};
48             });
49 
50   // De-duplicate elements. Two possible cases:
51   //
52   //   1. Identical elements (same FrameId+FramePacketId).
53   //   2. If there are any elements with kAllPacketsLost as the packet ID,
54   //      prune-out all other elements having the same frame ID, as they are
55   //      redundant.
56   //
57   // This is done by walking forwards over the sorted vector and deciding which
58   // elements to keep. Those that are kept are stacked-up at the front of the
59   // vector. After the "to-keep" pass, the vector is truncated to remove the
60   // left-over garbage at the end.
61   auto have_it = packets->begin();
62   if (have_it != packets->end()) {
63     auto kept_it = have_it;  // Always keep the first element.
64     for (++have_it; have_it != packets->end(); ++have_it) {
65       if (have_it->frame_id != kept_it->frame_id ||
66           (kept_it->packet_id != kAllPacketsLost &&
67            have_it->packet_id != kept_it->packet_id)) {  // Keep it.
68         ++kept_it;
69         *kept_it = *have_it;
70       }
71     }
72     packets->erase(++kept_it, packets->end());
73   }
74 }
75 
76 }  // namespace
77 
CompoundRtcpParser(RtcpSession * session,CompoundRtcpParser::Client * client)78 CompoundRtcpParser::CompoundRtcpParser(RtcpSession* session,
79                                        CompoundRtcpParser::Client* client)
80     : session_(session),
81       client_(client),
82       latest_receiver_timestamp_(kNullTimePoint) {
83   OSP_DCHECK(session_);
84   OSP_DCHECK(client_);
85 }
86 
87 CompoundRtcpParser::~CompoundRtcpParser() = default;
88 
Parse(absl::Span<const uint8_t> buffer,FrameId max_feedback_frame_id)89 bool CompoundRtcpParser::Parse(absl::Span<const uint8_t> buffer,
90                                FrameId max_feedback_frame_id) {
91   // These will contain the results from the various ParseXYZ() methods. None of
92   // the results will be dispatched to the Client until the entire parse
93   // succeeds.
94   Clock::time_point receiver_reference_time = kNullTimePoint;
95   absl::optional<RtcpReportBlock> receiver_report;
96   FrameId checkpoint_frame_id;
97   std::chrono::milliseconds target_playout_delay{};
98   std::vector<FrameId> received_frames;
99   std::vector<PacketNack> packet_nacks;
100   bool picture_loss_indicator = false;
101 
102   // The data contained in |buffer| can be a "compound packet," which means that
103   // it can be the concatenation of multiple RTCP packets. The loop here
104   // processes each one-by-one.
105   while (!buffer.empty()) {
106     const auto header = RtcpCommonHeader::Parse(buffer);
107     if (!header) {
108       return false;
109     }
110     buffer.remove_prefix(kRtcpCommonHeaderSize);
111     if (static_cast<int>(buffer.size()) < header->payload_size) {
112       return false;
113     }
114     const absl::Span<const uint8_t> payload =
115         buffer.subspan(0, header->payload_size);
116     buffer.remove_prefix(header->payload_size);
117 
118     switch (header->packet_type) {
119       case RtcpPacketType::kReceiverReport:
120         if (!ParseReceiverReport(payload, header->with.report_count,
121                                  &receiver_report)) {
122           return false;
123         }
124         break;
125 
126       case RtcpPacketType::kPayloadSpecific:
127         switch (header->with.subtype) {
128           case RtcpSubtype::kPictureLossIndicator:
129             if (!ParsePictureLossIndicator(payload, &picture_loss_indicator)) {
130               return false;
131             }
132             break;
133           case RtcpSubtype::kFeedback:
134             if (!ParseFeedback(payload, max_feedback_frame_id,
135                                &checkpoint_frame_id, &target_playout_delay,
136                                &received_frames, &packet_nacks)) {
137               return false;
138             }
139             break;
140           default:
141             // Ignore: Unimplemented or not part of the Cast Streaming spec.
142             break;
143         }
144         break;
145 
146       case RtcpPacketType::kExtendedReports:
147         if (!ParseExtendedReports(payload, &receiver_reference_time)) {
148           return false;
149         }
150         break;
151 
152       default:
153         // Ignored, unimplemented or not part of the Cast Streaming spec.
154         break;
155     }
156   }
157 
158   // A well-behaved Cast Streaming Receiver will always include a reference time
159   // report. This essentially "timestamps" the RTCP packets just parsed.
160   // However, the spec does not explicitly require this be included. When it is
161   // present, improve the stability of the system by ignoring stale/out-of-order
162   // RTCP packets.
163   if (receiver_reference_time != kNullTimePoint) {
164     // If the packet is out-of-order (e.g., it got delayed/shuffled when going
165     // through the network), just ignore it. Since RTCP packets always include
166     // all the necessary current state from the peer, dropping them does not
167     // mean important signals will be lost. In fact, it can actually be harmful
168     // to process compound RTCP packets out-of-order.
169     if (latest_receiver_timestamp_ != kNullTimePoint &&
170         receiver_reference_time < latest_receiver_timestamp_) {
171       return true;
172     }
173     latest_receiver_timestamp_ = receiver_reference_time;
174     client_->OnReceiverReferenceTimeAdvanced(latest_receiver_timestamp_);
175   }
176 
177   // At this point, the packet is known to be well-formed. Dispatch events of
178   // interest to the Client.
179   if (receiver_report) {
180     client_->OnReceiverReport(*receiver_report);
181   }
182   if (!checkpoint_frame_id.is_null()) {
183     client_->OnReceiverCheckpoint(checkpoint_frame_id, target_playout_delay);
184   }
185   if (!received_frames.empty()) {
186     OSP_DCHECK(AreElementsSortedAndUnique(received_frames));
187     client_->OnReceiverHasFrames(std::move(received_frames));
188   }
189   CanonicalizePacketNackVector(&packet_nacks);
190   if (!packet_nacks.empty()) {
191     client_->OnReceiverIsMissingPackets(std::move(packet_nacks));
192   }
193   if (picture_loss_indicator) {
194     client_->OnReceiverIndicatesPictureLoss();
195   }
196 
197   return true;
198 }
199 
ParseReceiverReport(absl::Span<const uint8_t> in,int num_report_blocks,absl::optional<RtcpReportBlock> * receiver_report)200 bool CompoundRtcpParser::ParseReceiverReport(
201     absl::Span<const uint8_t> in,
202     int num_report_blocks,
203     absl::optional<RtcpReportBlock>* receiver_report) {
204   if (in.size() < kRtcpReceiverReportSize) {
205     return false;
206   }
207   if (ConsumeField<uint32_t>(&in) == session_->receiver_ssrc()) {
208     *receiver_report = RtcpReportBlock::ParseOne(in, num_report_blocks,
209                                                  session_->sender_ssrc());
210   }
211   return true;
212 }
213 
ParseFeedback(absl::Span<const uint8_t> in,FrameId max_feedback_frame_id,FrameId * checkpoint_frame_id,std::chrono::milliseconds * target_playout_delay,std::vector<FrameId> * received_frames,std::vector<PacketNack> * packet_nacks)214 bool CompoundRtcpParser::ParseFeedback(
215     absl::Span<const uint8_t> in,
216     FrameId max_feedback_frame_id,
217     FrameId* checkpoint_frame_id,
218     std::chrono::milliseconds* target_playout_delay,
219     std::vector<FrameId>* received_frames,
220     std::vector<PacketNack>* packet_nacks) {
221   OSP_DCHECK(!max_feedback_frame_id.is_null());
222 
223   if (static_cast<int>(in.size()) < kRtcpFeedbackHeaderSize) {
224     return false;
225   }
226   if (ConsumeField<uint32_t>(&in) != session_->receiver_ssrc() ||
227       ConsumeField<uint32_t>(&in) != session_->sender_ssrc()) {
228     return true;  // Ignore report from mismatched SSRC(s).
229   }
230   if (ConsumeField<uint32_t>(&in) != kRtcpCastIdentifierWord) {
231     return false;
232   }
233 
234   const FrameId feedback_frame_id =
235       max_feedback_frame_id.ExpandLessThanOrEqual(ConsumeField<uint8_t>(&in));
236   const int loss_field_count = ConsumeField<uint8_t>(&in);
237   const auto playout_delay =
238       std::chrono::milliseconds(ConsumeField<uint16_t>(&in));
239   // Don't process feedback that would move the checkpoint backwards. The Client
240   // makes assumptions about what frame data and other tracking state can be
241   // discarded based on a monotonically non-decreasing checkpoint FrameId.
242   if (!checkpoint_frame_id->is_null() &&
243       *checkpoint_frame_id > feedback_frame_id) {
244     return true;
245   }
246   *checkpoint_frame_id = feedback_frame_id;
247   *target_playout_delay = playout_delay;
248   received_frames->clear();
249   packet_nacks->clear();
250   if (static_cast<int>(in.size()) <
251       (kRtcpFeedbackLossFieldSize * loss_field_count)) {
252     return false;
253   }
254 
255   // Parse the NACKs.
256   for (int i = 0; i < loss_field_count; ++i) {
257     const FrameId frame_id =
258         feedback_frame_id.ExpandGreaterThan(ConsumeField<uint8_t>(&in));
259     FramePacketId packet_id = ConsumeField<uint16_t>(&in);
260     uint8_t bits = ConsumeField<uint8_t>(&in);
261     packet_nacks->push_back(PacketNack{frame_id, packet_id});
262 
263     if (packet_id != kAllPacketsLost) {
264       // Translate each set bit in the bit vector into another missing
265       // FramePacketId.
266       while (bits) {
267         ++packet_id;
268         if (bits & 1) {
269           packet_nacks->push_back(PacketNack{frame_id, packet_id});
270         }
271         bits >>= 1;
272       }
273     }
274   }
275 
276   // Parse the optional CST2 feedback (frame-level ACKs).
277   if (static_cast<int>(in.size()) < kRtcpFeedbackAckHeaderSize ||
278       ConsumeField<uint32_t>(&in) != kRtcpCst2IdentifierWord) {
279     // Optional CST2 extended feedback is not present. For backwards-
280     // compatibility reasons, do not consider any extra "garbage" in the packet
281     // that doesn't match 'CST2' as corrupted input.
282     return true;
283   }
284   // Skip over the "Feedback Count" field. It's currently unused, though it
285   // might be useful for event tracing later...
286   in.remove_prefix(sizeof(uint8_t));
287   const int ack_bitvector_octet_count = ConsumeField<uint8_t>(&in);
288   if (static_cast<int>(in.size()) < ack_bitvector_octet_count) {
289     return false;
290   }
291   // Translate each set bit in the bit vector into a FrameId. See the
292   // explanation of this wire format in rtp_defines.h for where the "plus two"
293   // comes from.
294   FrameId starting_frame_id = feedback_frame_id + 2;
295   for (int i = 0; i < ack_bitvector_octet_count; ++i) {
296     uint8_t bits = ConsumeField<uint8_t>(&in);
297     FrameId frame_id = starting_frame_id;
298     while (bits) {
299       if (bits & 1) {
300         received_frames->push_back(frame_id);
301       }
302       ++frame_id;
303       bits >>= 1;
304     }
305     constexpr int kBitsPerOctet = 8;
306     starting_frame_id += kBitsPerOctet;
307   }
308 
309   return true;
310 }
311 
ParseExtendedReports(absl::Span<const uint8_t> in,Clock::time_point * receiver_reference_time)312 bool CompoundRtcpParser::ParseExtendedReports(
313     absl::Span<const uint8_t> in,
314     Clock::time_point* receiver_reference_time) {
315   if (static_cast<int>(in.size()) < kRtcpExtendedReportHeaderSize) {
316     return false;
317   }
318   if (ConsumeField<uint32_t>(&in) != session_->receiver_ssrc()) {
319     return true;  // Ignore report from unknown receiver.
320   }
321 
322   while (!in.empty()) {
323     // All extended report types have the same 4-byte subheader.
324     if (static_cast<int>(in.size()) < kRtcpExtendedReportBlockHeaderSize) {
325       return false;
326     }
327     const uint8_t block_type = ConsumeField<uint8_t>(&in);
328     in.remove_prefix(sizeof(uint8_t));  // Skip the "reserved" byte.
329     const int block_data_size =
330         static_cast<int>(ConsumeField<uint16_t>(&in)) * 4;
331     if (static_cast<int>(in.size()) < block_data_size) {
332       return false;
333     }
334     if (block_type == kRtcpReceiverReferenceTimeReportBlockType) {
335       if (block_data_size != sizeof(uint64_t)) {
336         return false;  // Length field must always be 2 words.
337       }
338       *receiver_reference_time = session_->ntp_converter().ToLocalTime(
339           ReadBigEndian<uint64_t>(in.data()));
340     } else {
341       // Ignore any other type of extended report.
342     }
343     in.remove_prefix(block_data_size);
344   }
345 
346   return true;
347 }
348 
ParsePictureLossIndicator(absl::Span<const uint8_t> in,bool * picture_loss_indicator)349 bool CompoundRtcpParser::ParsePictureLossIndicator(
350     absl::Span<const uint8_t> in,
351     bool* picture_loss_indicator) {
352   if (static_cast<int>(in.size()) < kRtcpPictureLossIndicatorHeaderSize) {
353     return false;
354   }
355   // Only set the flag if the PLI is from the Receiver and to this Sender.
356   if (ConsumeField<uint32_t>(&in) == session_->receiver_ssrc() &&
357       ConsumeField<uint32_t>(&in) == session_->sender_ssrc()) {
358     *picture_loss_indicator = true;
359   }
360   return true;
361 }
362 
363 CompoundRtcpParser::Client::Client() = default;
364 CompoundRtcpParser::Client::~Client() = default;
OnReceiverReferenceTimeAdvanced(Clock::time_point reference_time)365 void CompoundRtcpParser::Client::OnReceiverReferenceTimeAdvanced(
366     Clock::time_point reference_time) {}
OnReceiverReport(const RtcpReportBlock & receiver_report)367 void CompoundRtcpParser::Client::OnReceiverReport(
368     const RtcpReportBlock& receiver_report) {}
OnReceiverIndicatesPictureLoss()369 void CompoundRtcpParser::Client::OnReceiverIndicatesPictureLoss() {}
OnReceiverCheckpoint(FrameId frame_id,std::chrono::milliseconds playout_delay)370 void CompoundRtcpParser::Client::OnReceiverCheckpoint(
371     FrameId frame_id,
372     std::chrono::milliseconds playout_delay) {}
OnReceiverHasFrames(std::vector<FrameId> acks)373 void CompoundRtcpParser::Client::OnReceiverHasFrames(
374     std::vector<FrameId> acks) {}
OnReceiverIsMissingPackets(std::vector<PacketNack> nacks)375 void CompoundRtcpParser::Client::OnReceiverIsMissingPackets(
376     std::vector<PacketNack> nacks) {}
377 
378 }  // namespace cast
379 }  // namespace openscreen
380