1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "cast/streaming/receiver.h"
6 
7 #include <algorithm>
8 #include <utility>
9 
10 #include "absl/types/span.h"
11 #include "cast/streaming/constants.h"
12 #include "cast/streaming/receiver_packet_router.h"
13 #include "cast/streaming/session_config.h"
14 #include "util/chrono_helpers.h"
15 #include "util/osp_logging.h"
16 #include "util/std_util.h"
17 
18 namespace openscreen {
19 namespace cast {
20 
21 // Conveniences for ensuring logging output includes the SSRC of the Receiver,
22 // to help distinguish one out of multiple instances in a Cast Streaming
23 // session.
24 //
25 // TODO(miu): Replace RECEIVER_VLOG's with trace event logging once the tracing
26 // infrastructure is ready.
27 #define RECEIVER_LOG(level) OSP_LOG_##level << "[SSRC:" << ssrc() << "] "
28 #define RECEIVER_VLOG OSP_VLOG << "[SSRC:" << ssrc() << "] "
29 
Receiver(Environment * environment,ReceiverPacketRouter * packet_router,SessionConfig config)30 Receiver::Receiver(Environment* environment,
31                    ReceiverPacketRouter* packet_router,
32                    SessionConfig config)
33     : now_(environment->now_function()),
34       packet_router_(packet_router),
35       config_(config),
36       rtcp_session_(config.sender_ssrc, config.receiver_ssrc, now_()),
37       rtcp_parser_(&rtcp_session_),
38       rtcp_builder_(&rtcp_session_),
39       stats_tracker_(config.rtp_timebase),
40       rtp_parser_(config.sender_ssrc),
41       rtp_timebase_(config.rtp_timebase),
42       crypto_(config.aes_secret_key, config.aes_iv_mask),
43       is_pli_enabled_(config.is_pli_enabled),
44       rtcp_buffer_capacity_(environment->GetMaxPacketSize()),
45       rtcp_buffer_(new uint8_t[rtcp_buffer_capacity_]),
46       rtcp_alarm_(environment->now_function(), environment->task_runner()),
47       smoothed_clock_offset_(ClockDriftSmoother::kDefaultTimeConstant),
48       consumption_alarm_(environment->now_function(),
49                          environment->task_runner()) {
50   OSP_DCHECK(packet_router_);
51   OSP_DCHECK_EQ(checkpoint_frame(), FrameId::leader());
52   OSP_CHECK_GT(rtcp_buffer_capacity_, 0);
53   OSP_CHECK(rtcp_buffer_);
54 
55   rtcp_builder_.SetPlayoutDelay(config.target_playout_delay);
56   playout_delay_changes_.emplace_back(FrameId::leader(),
57                                       config.target_playout_delay);
58 
59   packet_router_->OnReceiverCreated(rtcp_session_.sender_ssrc(), this);
60 }
61 
~Receiver()62 Receiver::~Receiver() {
63   packet_router_->OnReceiverDestroyed(rtcp_session_.sender_ssrc());
64 }
65 
SetConsumer(Consumer * consumer)66 void Receiver::SetConsumer(Consumer* consumer) {
67   consumer_ = consumer;
68   ScheduleFrameReadyCheck();
69 }
70 
SetPlayerProcessingTime(Clock::duration needed_time)71 void Receiver::SetPlayerProcessingTime(Clock::duration needed_time) {
72   player_processing_time_ = std::max(Clock::duration::zero(), needed_time);
73 }
74 
RequestKeyFrame()75 void Receiver::RequestKeyFrame() {
76   // If we don't have picture loss indication enabled, we should not request
77   // any key frames.
78   OSP_DCHECK(is_pli_enabled_) << "PLI is not enabled.";
79   if (is_pli_enabled_ && !last_key_frame_received_.is_null() &&
80       last_frame_consumed_ >= last_key_frame_received_ &&
81       !rtcp_builder_.is_picture_loss_indicator_set()) {
82     rtcp_builder_.SetPictureLossIndicator(true);
83     SendRtcp();
84   }
85 }
86 
AdvanceToNextFrame()87 int Receiver::AdvanceToNextFrame() {
88   const FrameId immediate_next_frame = last_frame_consumed_ + 1;
89 
90   // Scan the queue for the next frame that should be consumed. Typically, this
91   // is the very next frame; but if it is incomplete and already late for
92   // playout, consider skipping-ahead.
93   for (FrameId f = immediate_next_frame; f <= latest_frame_expected_; ++f) {
94     PendingFrame& entry = GetQueueEntry(f);
95     if (entry.collector.is_complete()) {
96       const EncryptedFrame& encrypted_frame =
97           entry.collector.PeekAtAssembledFrame();
98       if (f == immediate_next_frame) {  // Typical case.
99         RECEIVER_VLOG << "AdvanceToNextFrame: Next in sequence (" << f << ')';
100         return FrameCrypto::GetPlaintextSize(encrypted_frame);
101       }
102       if (encrypted_frame.dependency != EncodedFrame::DEPENDS_ON_ANOTHER) {
103         // Found a frame after skipping past some frames. Drop the ones being
104         // skipped, advancing |last_frame_consumed_| before returning.
105         RECEIVER_VLOG << "AdvanceToNextFrame: Skipping-ahead → " << f;
106         DropAllFramesBefore(f);
107         return FrameCrypto::GetPlaintextSize(encrypted_frame);
108       }
109       // Conclusion: The frame in the current queue entry is complete, but
110       // depends on a prior incomplete frame. Continue scanning...
111     }
112 
113     // Do not consider skipping past this frame if its estimated capture time is
114     // unknown. The implication here is that, if |estimated_capture_time| is
115     // set, the Receiver also knows whether any target playout delay changes
116     // were communicated from the Sender in the frame's first RTP packet.
117     if (!entry.estimated_capture_time) {
118       break;
119     }
120 
121     // If this incomplete frame is not yet late for playout, simply wait for the
122     // rest of its packets to come in. However, do schedule a check to
123     // re-examine things at the time it would become a late frame, to possibly
124     // skip-over it.
125     const auto playout_time =
126         *entry.estimated_capture_time + ResolveTargetPlayoutDelay(f);
127     if (playout_time > (now_() + player_processing_time_)) {
128       ScheduleFrameReadyCheck(playout_time);
129       break;
130     }
131   }
132 
133   RECEIVER_VLOG << "AdvanceToNextFrame: No frames ready. Last consumed was "
134                 << last_frame_consumed_ << '.';
135   return kNoFramesReady;
136 }
137 
ConsumeNextFrame(absl::Span<uint8_t> buffer)138 EncodedFrame Receiver::ConsumeNextFrame(absl::Span<uint8_t> buffer) {
139   // Assumption: The required call to AdvanceToNextFrame() ensures that
140   // |last_frame_consumed_| is set to one before the frame to be consumed here.
141   const FrameId frame_id = last_frame_consumed_ + 1;
142   OSP_CHECK_LE(frame_id, checkpoint_frame());
143 
144   // Decrypt the frame, populating the given output |frame|.
145   PendingFrame& entry = GetQueueEntry(frame_id);
146   OSP_DCHECK(entry.collector.is_complete());
147   EncodedFrame frame;
148   frame.data = buffer;
149   crypto_.Decrypt(entry.collector.PeekAtAssembledFrame(), &frame);
150   OSP_DCHECK(entry.estimated_capture_time);
151   frame.reference_time =
152       *entry.estimated_capture_time + ResolveTargetPlayoutDelay(frame_id);
153 
154   RECEIVER_VLOG << "ConsumeNextFrame → " << frame.frame_id << ": "
155                 << frame.data.size() << " payload bytes, RTP Timestamp "
156                 << frame.rtp_timestamp
157                        .ToTimeSinceOrigin<microseconds>(rtp_timebase_)
158                        .count()
159                 << " µs, to play-out "
160                 << to_microseconds(frame.reference_time - now_()).count()
161                 << " µs from now.";
162 
163   entry.Reset();
164   last_frame_consumed_ = frame_id;
165 
166   // Ensure the Consumer is notified if there are already more frames ready for
167   // consumption, and it hasn't explicitly called AdvanceToNextFrame() to check
168   // for itself.
169   ScheduleFrameReadyCheck();
170 
171   return frame;
172 }
173 
OnReceivedRtpPacket(Clock::time_point arrival_time,std::vector<uint8_t> packet)174 void Receiver::OnReceivedRtpPacket(Clock::time_point arrival_time,
175                                    std::vector<uint8_t> packet) {
176   const absl::optional<RtpPacketParser::ParseResult> part =
177       rtp_parser_.Parse(packet);
178   if (!part) {
179     RECEIVER_LOG(WARN) << "Parsing of " << packet.size()
180                        << " bytes as an RTP packet failed.";
181     return;
182   }
183   stats_tracker_.OnReceivedValidRtpPacket(part->sequence_number,
184                                           part->rtp_timestamp, arrival_time);
185 
186   // Ignore packets for frames the Receiver is no longer interested in.
187   if (part->frame_id <= checkpoint_frame()) {
188     return;
189   }
190 
191   // Extend the range of frames known to this Receiver, within the capacity of
192   // this Receiver's queue. Prepare the FrameCollectors to receive any
193   // newly-discovered frames.
194   if (part->frame_id > latest_frame_expected_) {
195     const FrameId max_allowed_frame_id =
196         last_frame_consumed_ + kMaxUnackedFrames;
197     if (part->frame_id > max_allowed_frame_id) {
198       RECEIVER_VLOG << "Dropping RTP packet for " << part->frame_id
199                     << ": Too many frames are already in-flight.";
200       return;
201     }
202     do {
203       ++latest_frame_expected_;
204       GetQueueEntry(latest_frame_expected_)
205           .collector.set_frame_id(latest_frame_expected_);
206     } while (latest_frame_expected_ < part->frame_id);
207     RECEIVER_VLOG << "Advanced latest frame expected to "
208                   << latest_frame_expected_;
209   }
210 
211   // Start-up edge case: Blatantly drop the first packet of all frames until the
212   // Receiver has processed at least one Sender Report containing the necessary
213   // clock-drift and lip-sync information (see OnReceivedRtcpPacket()). This is
214   // an inescapable data dependency. Note that this special case should almost
215   // never trigger, since a well-behaving Sender will send the first Sender
216   // Report RTCP packet before any of the RTP packets.
217   if (!last_sender_report_ && part->packet_id == FramePacketId{0}) {
218     RECEIVER_LOG(WARN) << "Dropping packet 0 of frame " << part->frame_id
219                        << " because it arrived before the first Sender Report.";
220     // Note: The Sender will have to re-transmit this dropped packet after the
221     // Sender Report to allow the Receiver to move forward.
222     return;
223   }
224 
225   PendingFrame& pending_frame = GetQueueEntry(part->frame_id);
226   FrameCollector& collector = pending_frame.collector;
227   if (collector.is_complete()) {
228     // An extra, redundant |packet| was received. Do nothing since the frame was
229     // already complete.
230     return;
231   }
232 
233   if (!collector.CollectRtpPacket(*part, &packet)) {
234     return;  // Bad data in the parsed packet. Ignore it.
235   }
236 
237   // The first packet in a frame contains timing information critical for
238   // computing this frame's (and all future frames') playout time. Process that,
239   // but only once.
240   if (part->packet_id == FramePacketId{0} &&
241       !pending_frame.estimated_capture_time) {
242     // Estimate the original capture time of this frame (at the Sender), in
243     // terms of the Receiver's clock: First, start with a reference time point
244     // from the Sender's clock (the one from the last Sender Report). Then,
245     // translate it into the equivalent reference time point in terms of the
246     // Receiver's clock by applying the measured offset between the two clocks.
247     // Finally, apply the RTP timestamp difference between the Sender Report and
248     // this frame to determine what the original capture time of this frame was.
249     pending_frame.estimated_capture_time =
250         last_sender_report_->reference_time + smoothed_clock_offset_.Current() +
251         (part->rtp_timestamp - last_sender_report_->rtp_timestamp)
252             .ToDuration<Clock::duration>(rtp_timebase_);
253 
254     // If a target playout delay change was included in this packet, record it.
255     if (part->new_playout_delay > milliseconds::zero()) {
256       RECEIVER_VLOG << "Target playout delay changes to "
257                     << part->new_playout_delay.count() << " ms, as of "
258                     << part->frame_id;
259       RecordNewTargetPlayoutDelay(part->frame_id, part->new_playout_delay);
260     }
261 
262     // Now that the estimated capture time is known, other frames may have just
263     // become ready, per the frame-skipping logic in AdvanceToNextFrame().
264     ScheduleFrameReadyCheck();
265   }
266 
267   if (!collector.is_complete()) {
268     return;  // Wait for the rest of the packets to come in.
269   }
270   const EncryptedFrame& encrypted_frame = collector.PeekAtAssembledFrame();
271 
272   // Whenever a key frame has been received, the decoder has what it needs to
273   // recover. In this case, clear the PLI condition.
274   if (encrypted_frame.dependency == EncryptedFrame::KEY_FRAME) {
275     rtcp_builder_.SetPictureLossIndicator(false);
276     last_key_frame_received_ = part->frame_id;
277   }
278 
279   // If this just-completed frame is the one right after the checkpoint frame,
280   // advance the checkpoint forward.
281   if (part->frame_id == (checkpoint_frame() + 1)) {
282     AdvanceCheckpoint(part->frame_id);
283   }
284 
285   // Since a frame has become complete, schedule a check to see whether this or
286   // any other frames have become ready for consumption.
287   ScheduleFrameReadyCheck();
288 }
289 
OnReceivedRtcpPacket(Clock::time_point arrival_time,std::vector<uint8_t> packet)290 void Receiver::OnReceivedRtcpPacket(Clock::time_point arrival_time,
291                                     std::vector<uint8_t> packet) {
292   absl::optional<SenderReportParser::SenderReportWithId> parsed_report =
293       rtcp_parser_.Parse(packet);
294   if (!parsed_report) {
295     RECEIVER_LOG(WARN) << "Parsing of " << packet.size()
296                        << " bytes as an RTCP packet failed.";
297     return;
298   }
299   last_sender_report_ = std::move(parsed_report);
300   last_sender_report_arrival_time_ = arrival_time;
301 
302   // Measure the offset between the Sender's clock and the Receiver's Clock.
303   // This will be used to translate reference timestamps from the Sender into
304   // timestamps that represent the exact same moment in time at the Receiver.
305   //
306   // Note: Due to design limitations in the Cast Streaming spec, the Receiver
307   // has no way to compute how long it took the Sender Report to travel over the
308   // network. The calculation here just ignores that, and so the
309   // |measured_offset| below will be larger than the true value by that amount.
310   // This will have the effect of a later-than-configured playout delay.
311   const Clock::duration measured_offset =
312       arrival_time - last_sender_report_->reference_time;
313   smoothed_clock_offset_.Update(arrival_time, measured_offset);
314   RECEIVER_VLOG
315       << "Received Sender Report: Local clock is ahead of Sender's by "
316       << to_microseconds(smoothed_clock_offset_.Current()).count()
317       << " µs (minus one-way network transit time).";
318 
319   RtcpReportBlock report;
320   report.ssrc = rtcp_session_.sender_ssrc();
321   stats_tracker_.PopulateNextReport(&report);
322   report.last_status_report_id = last_sender_report_->report_id;
323   report.SetDelaySinceLastReport(now_() - last_sender_report_arrival_time_);
324   rtcp_builder_.IncludeReceiverReportInNextPacket(report);
325 
326   SendRtcp();
327 }
328 
SendRtcp()329 void Receiver::SendRtcp() {
330   // Collect ACK/NACK feedback for all active frames in the queue.
331   std::vector<PacketNack> packet_nacks;
332   std::vector<FrameId> frame_acks;
333   for (FrameId f = checkpoint_frame() + 1; f <= latest_frame_expected_; ++f) {
334     const FrameCollector& collector = GetQueueEntry(f).collector;
335     if (collector.is_complete()) {
336       frame_acks.push_back(f);
337     } else {
338       collector.GetMissingPackets(&packet_nacks);
339     }
340   }
341 
342   // Build and send a compound RTCP packet.
343   const bool no_nacks = packet_nacks.empty();
344   rtcp_builder_.IncludeFeedbackInNextPacket(std::move(packet_nacks),
345                                             std::move(frame_acks));
346   last_rtcp_send_time_ = now_();
347   packet_router_->SendRtcpPacket(rtcp_builder_.BuildPacket(
348       last_rtcp_send_time_,
349       absl::Span<uint8_t>(rtcp_buffer_.get(), rtcp_buffer_capacity_)));
350   RECEIVER_VLOG << "Sent RTCP packet.";
351 
352   // Schedule the automatic sending of another RTCP packet, if this method is
353   // not called within some bounded amount of time. While incomplete frames
354   // exist in the queue, send RTCP packets (with ACK/NACK feedback) frequently.
355   // When there are no incomplete frames, use a longer "keepalive" interval.
356   const Clock::duration interval =
357       (no_nacks ? kRtcpReportInterval : kNackFeedbackInterval);
358   rtcp_alarm_.Schedule([this] { SendRtcp(); }, last_rtcp_send_time_ + interval);
359 }
360 
GetQueueEntry(FrameId frame_id) const361 const Receiver::PendingFrame& Receiver::GetQueueEntry(FrameId frame_id) const {
362   return const_cast<Receiver*>(this)->GetQueueEntry(frame_id);
363 }
364 
GetQueueEntry(FrameId frame_id)365 Receiver::PendingFrame& Receiver::GetQueueEntry(FrameId frame_id) {
366   return pending_frames_[(frame_id - FrameId::first()) %
367                          pending_frames_.size()];
368 }
369 
RecordNewTargetPlayoutDelay(FrameId as_of_frame,milliseconds delay)370 void Receiver::RecordNewTargetPlayoutDelay(FrameId as_of_frame,
371                                            milliseconds delay) {
372   OSP_DCHECK_GT(as_of_frame, checkpoint_frame());
373 
374   // Prune-out entries from |playout_delay_changes_| that are no longer needed.
375   // At least one entry must always be kept (i.e., there must always be a
376   // "current" setting).
377   const FrameId next_frame = last_frame_consumed_ + 1;
378   const auto keep_one_before_it = std::find_if(
379       std::next(playout_delay_changes_.begin()), playout_delay_changes_.end(),
380       [&](const auto& entry) { return entry.first > next_frame; });
381   playout_delay_changes_.erase(playout_delay_changes_.begin(),
382                                std::prev(keep_one_before_it));
383 
384   // Insert the delay change entry, maintaining the ascending ordering of the
385   // vector.
386   const auto insert_it = std::find_if(
387       playout_delay_changes_.begin(), playout_delay_changes_.end(),
388       [&](const auto& entry) { return entry.first > as_of_frame; });
389   playout_delay_changes_.emplace(insert_it, as_of_frame, delay);
390 
391   OSP_DCHECK(AreElementsSortedAndUnique(playout_delay_changes_));
392 }
393 
ResolveTargetPlayoutDelay(FrameId frame_id) const394 milliseconds Receiver::ResolveTargetPlayoutDelay(FrameId frame_id) const {
395   OSP_DCHECK_GT(frame_id, last_frame_consumed_);
396 
397 #if OSP_DCHECK_IS_ON()
398   // Extra precaution: Ensure all possible playout delay changes are known. In
399   // other words, every unconsumed frame in the queue, up to (and including)
400   // |frame_id|, must have an assigned estimated_capture_time.
401   for (FrameId f = last_frame_consumed_ + 1; f <= frame_id; ++f) {
402     OSP_DCHECK(GetQueueEntry(f).estimated_capture_time)
403         << " don't know whether there was a playout delay change for frame "
404         << f;
405   }
406 #endif
407 
408   const auto it = std::find_if(
409       playout_delay_changes_.crbegin(), playout_delay_changes_.crend(),
410       [&](const auto& entry) { return entry.first <= frame_id; });
411   OSP_DCHECK(it != playout_delay_changes_.crend());
412   return it->second;
413 }
414 
AdvanceCheckpoint(FrameId new_checkpoint)415 void Receiver::AdvanceCheckpoint(FrameId new_checkpoint) {
416   OSP_DCHECK_GT(new_checkpoint, checkpoint_frame());
417   OSP_DCHECK_LE(new_checkpoint, latest_frame_expected_);
418 
419   while (new_checkpoint < latest_frame_expected_) {
420     const FrameId next = new_checkpoint + 1;
421     if (!GetQueueEntry(next).collector.is_complete()) {
422       break;
423     }
424     new_checkpoint = next;
425   }
426 
427   RECEIVER_VLOG << "Advancing checkpoint to " << new_checkpoint;
428   set_checkpoint_frame(new_checkpoint);
429   rtcp_builder_.SetPlayoutDelay(ResolveTargetPlayoutDelay(new_checkpoint));
430   SendRtcp();
431 }
432 
DropAllFramesBefore(FrameId first_kept_frame)433 void Receiver::DropAllFramesBefore(FrameId first_kept_frame) {
434   // The following DCHECKs are verifying that this method is only being called
435   // because one or more incomplete frames are being skipped-over.
436   const FrameId first_to_drop = last_frame_consumed_ + 1;
437   OSP_DCHECK_GT(first_kept_frame, first_to_drop);
438   OSP_DCHECK_GT(first_kept_frame, checkpoint_frame());
439   OSP_DCHECK_LE(first_kept_frame, latest_frame_expected_);
440 
441   // Reset each of the frames being dropped, pretending that they were consumed.
442   for (FrameId f = first_to_drop; f < first_kept_frame; ++f) {
443     PendingFrame& entry = GetQueueEntry(f);
444     // Pedantic sanity-check: Ensure the "target playout delay change" data
445     // dependency was satisfied. See comments in AdvanceToNextFrame().
446     OSP_DCHECK(entry.estimated_capture_time);
447     entry.Reset();
448   }
449   last_frame_consumed_ = first_kept_frame - 1;
450 
451   RECEIVER_LOG(INFO) << "Artificially advancing checkpoint after skipping.";
452   AdvanceCheckpoint(first_kept_frame);
453 }
454 
ScheduleFrameReadyCheck(Clock::time_point when)455 void Receiver::ScheduleFrameReadyCheck(Clock::time_point when) {
456   consumption_alarm_.Schedule(
457       [this] {
458         if (consumer_) {
459           const int next_frame_buffer_size = AdvanceToNextFrame();
460           if (next_frame_buffer_size != kNoFramesReady) {
461             consumer_->OnFramesReady(next_frame_buffer_size);
462           }
463         }
464       },
465       when);
466 }
467 
468 Receiver::Consumer::~Consumer() = default;
469 
470 Receiver::PendingFrame::PendingFrame() = default;
471 Receiver::PendingFrame::~PendingFrame() = default;
472 
Reset()473 void Receiver::PendingFrame::Reset() {
474   collector.Reset();
475   estimated_capture_time = absl::nullopt;
476 }
477 
478 // static
479 constexpr milliseconds Receiver::kDefaultPlayerProcessingTime;
480 constexpr int Receiver::kNoFramesReady;
481 constexpr milliseconds Receiver::kNackFeedbackInterval;
482 
483 }  // namespace cast
484 }  // namespace openscreen
485