1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef CAST_STREAMING_RECEIVER_H_
6 #define CAST_STREAMING_RECEIVER_H_
7 
8 #include <stdint.h>
9 
10 #include <array>
11 #include <chrono>
12 #include <memory>
13 #include <utility>
14 #include <vector>
15 
16 #include "absl/types/optional.h"
17 #include "absl/types/span.h"
18 #include "cast/streaming/clock_drift_smoother.h"
19 #include "cast/streaming/compound_rtcp_builder.h"
20 #include "cast/streaming/environment.h"
21 #include "cast/streaming/frame_collector.h"
22 #include "cast/streaming/frame_id.h"
23 #include "cast/streaming/packet_receive_stats_tracker.h"
24 #include "cast/streaming/rtcp_common.h"
25 #include "cast/streaming/rtcp_session.h"
26 #include "cast/streaming/rtp_packet_parser.h"
27 #include "cast/streaming/sender_report_parser.h"
28 #include "cast/streaming/session_config.h"
29 #include "cast/streaming/ssrc.h"
30 #include "platform/api/time.h"
31 #include "util/alarm.h"
32 
33 namespace openscreen {
34 namespace cast {
35 
36 struct EncodedFrame;
37 class ReceiverPacketRouter;
38 
39 // The Cast Streaming Receiver, a peer corresponding to some Cast Streaming
40 // Sender at the other end of a network link.
41 //
42 // Cast Streaming is a transport protocol which divides up the frames for one
43 // media stream (e.g., audio or video) into multiple RTP packets containing an
44 // encrypted payload. The Receiver is the peer responsible for collecting the
45 // RTP packets, decrypting the payload, and re-assembling a frame that can be
46 // passed to a decoder and played out.
47 //
48 // A Sender ↔ Receiver pair is used to transport each media stream. Typically,
49 // there are two pairs in a normal system, one for the audio stream and one for
50 // video stream. A local player is responsible for synchronizing the playout of
51 // the frames of each stream to achieve lip-sync. See the discussion in
52 // encoded_frame.h for how the |reference_time| and |rtp_timestamp| of the
53 // EncodedFrames are used to achieve this.
54 //
55 // See the Receiver Demo app for a reference implementation that both shows and
56 // explains how Receivers are properly configured and started, integrated with a
57 // decoder, and the resulting decoded media is played out. Also, here is a
58 // general usage example:
59 //
60 //   class MyPlayer : public openscreen::cast::Receiver::Consumer {
61 //    public:
62 //     explicit MyPlayer(Receiver* receiver) : receiver_(receiver) {
63 //       recevier_->SetPlayerProcessingTime(std::chrono::milliseconds(10));
64 //       receiver_->SetConsumer(this);
65 //     }
66 //
67 //     ~MyPlayer() override {
68 //       receiver_->SetConsumer(nullptr);
69 //     }
70 //
71 //    private:
72 //     // Receiver::Consumer implementation.
73 //     void OnFramesReady(int next_frame_buffer_size) override {
74 //       std::vector<uint8_t> buffer;
75 //       buffer.resize(next_frame_buffer_size);
76 //       openscreen::cast::EncodedFrame encoded_frame =
77 //           receiver_->ConsumeNextFrame(absl::Span<uint8_t>(buffer));
78 //
79 //       display_.RenderFrame(decoder_.DecodeFrame(encoded_frame.data));
80 //
81 //       // Note: An implementation could call receiver_->AdvanceToNextFrame()
82 //       // and receiver_->ConsumeNextFrame() in a loop here, to consume all the
83 //       // remaining frames that are ready.
84 //     }
85 //
86 //     Receiver* const receiver_;
87 //     MyDecoder decoder_;
88 //     MyDisplay display_;
89 //   };
90 //
91 // Internally, a queue of complete and partially-received frames is maintained.
92 // The queue is a circular queue of FrameCollectors that each maintain the
93 // individual receive state of each in-flight frame. There are three conceptual
94 // "pointers" that indicate what assumptions and operations are made on certain
95 // ranges of frames in the queue:
96 //
97 //   1. Latest Frame Expected: The FrameId of the latest frame whose existence
98 //      is known to this Receiver. This is the highest FrameId seen in any
99 //      successfully-parsed RTP packet.
100 //   2. Checkpoint Frame: Indicates that all of the RTP packets for all frames
101 //      up to and including the one having this FrameId have been successfully
102 //      received and processed.
103 //   3. Last Frame Consumed: The FrameId of last frame consumed (see
104 //      ConsumeNextFrame()). Once a frame is consumed, all internal resources
105 //      related to the frame can be freed and/or re-used for later frames.
106 class Receiver {
107  public:
108   class Consumer {
109    public:
110     virtual ~Consumer();
111 
112     // Called whenever one or more frames have become ready for consumption. The
113     // |next_frame_buffer_size| argument is identical to the result of calling
114     // AdvanceToNextFrame(), and so the Consumer only needs to prepare a buffer
115     // and call ConsumeNextFrame(). It may then call AdvanceToNextFrame() to
116     // check whether there are any more frames ready, but this is not mandatory.
117     // See usage example in class-level comments.
118     virtual void OnFramesReady(int next_frame_buffer_size) = 0;
119   };
120 
121   // Constructs a Receiver that attaches to the given |environment| and
122   // |packet_router|. The config contains the settings that were
123   // agreed-upon by both sides from the OFFER/ANSWER exchange (i.e., the part of
124   // the overall end-to-end connection process that occurs before Cast Streaming
125   // is started).
126   Receiver(Environment* environment,
127            ReceiverPacketRouter* packet_router,
128            SessionConfig config);
129   ~Receiver();
130 
config()131   const SessionConfig& config() const { return config_; }
rtp_timebase()132   int rtp_timebase() const { return rtp_timebase_; }
ssrc()133   Ssrc ssrc() const { return rtcp_session_.receiver_ssrc(); }
134 
135   // Set the Consumer receiving notifications when new frames are ready for
136   // consumption. Frames received before this method is called will remain in
137   // the queue indefinitely.
138   void SetConsumer(Consumer* consumer);
139 
140   // Sets how much time the consumer will need to decode/buffer/render/etc., and
141   // otherwise fully process a frame for on-time playback. This information is
142   // used by the Receiver to decide whether to skip past frames that have
143   // arrived too late. This method can be called repeatedly to make adjustments
144   // based on changing environmental conditions.
145   //
146   // Default setting: kDefaultPlayerProcessingTime
147   void SetPlayerProcessingTime(Clock::duration needed_time);
148 
149   // Propagates a "picture loss indicator" notification to the Sender,
150   // requesting a key frame so that decode/playout can recover. It is safe to
151   // call this redundantly. The Receiver will clear the picture loss condition
152   // automatically, once a key frame is received (i.e., before
153   // ConsumeNextFrame() is called to access it).
154   void RequestKeyFrame();
155 
156   // Advances to the next frame ready for consumption. This may skip-over
157   // incomplete frames that will not play out on-time; but only if there are
158   // completed frames further down the queue that have no dependency
159   // relationship with them (e.g., key frames).
160   //
161   // This method returns kNoFramesReady if there is not currently a frame ready
162   // for consumption. The caller should wait for a Consumer::OnFramesReady()
163   // notification before trying again. Otherwise, the number of bytes of encoded
164   // data is returned, and the caller should use this to ensure the buffer it
165   // passes to ConsumeNextFrame() is large enough.
166   int AdvanceToNextFrame();
167 
168   // Returns the next frame, both metadata and payload data. The Consumer calls
169   // this method after being notified via OnFramesReady(), and it can also call
170   // this whenever AdvanceToNextFrame() indicates another frame is ready.
171   // |buffer| must point to a sufficiently-sized buffer that will be populated
172   // with the frame's payload data. Upon return |frame->data| will be set to the
173   // portion of the buffer that was populated.
174   EncodedFrame ConsumeNextFrame(absl::Span<uint8_t> buffer);
175 
176   // Allows setting picture loss indication for testing. In production, this
177   // should be done using the config.
SetPliEnabledForTesting(bool is_pli_enabled)178   void SetPliEnabledForTesting(bool is_pli_enabled) {
179     is_pli_enabled_ = is_pli_enabled;
180   }
181 
182   // The default "player processing time" amount. See SetPlayerProcessingTime().
183   static constexpr std::chrono::milliseconds kDefaultPlayerProcessingTime{5};
184 
185   // Returned by AdvanceToNextFrame() when there are no frames currently ready
186   // for consumption.
187   static constexpr int kNoFramesReady = -1;
188 
189  protected:
190   friend class ReceiverPacketRouter;
191 
192   // Called by ReceiverPacketRouter to provide this Receiver with what looks
193   // like a RTP/RTCP packet meant for it specifically (among other Receivers).
194   void OnReceivedRtpPacket(Clock::time_point arrival_time,
195                            std::vector<uint8_t> packet);
196   void OnReceivedRtcpPacket(Clock::time_point arrival_time,
197                             std::vector<uint8_t> packet);
198 
199  private:
200   // An entry in the circular queue (see |pending_frames_|).
201   struct PendingFrame {
202     FrameCollector collector;
203 
204     // The Receiver's [local] Clock time when this frame was originally captured
205     // at the Sender. This is computed and assigned when the RTP packet with ID
206     // 0 is processed. Add the target playout delay to this to get the target
207     // playout time.
208     absl::optional<Clock::time_point> estimated_capture_time;
209 
210     PendingFrame();
211     ~PendingFrame();
212 
213     // Reset this entry to its initial state, freeing resources.
214     void Reset();
215   };
216 
217   // Get/Set the checkpoint FrameId. This indicates that all of the packets for
218   // all frames up to and including this FrameId have been successfully received
219   // (or otherwise do not need to be re-transmitted).
checkpoint_frame()220   FrameId checkpoint_frame() const { return rtcp_builder_.checkpoint_frame(); }
set_checkpoint_frame(FrameId frame_id)221   void set_checkpoint_frame(FrameId frame_id) {
222     rtcp_builder_.SetCheckpointFrame(frame_id);
223   }
224 
225   // Send an RTCP packet to the Sender immediately, to acknowledge the complete
226   // reception of one or more additional frames, to reply to a Sender Report, or
227   // to request re-transmits. Calling this also schedules additional RTCP
228   // packets to be sent periodically for the life of this Receiver.
229   void SendRtcp();
230 
231   // Helpers to map the given |frame_id| to the element in the |pending_frames_|
232   // circular queue. There are both const and non-const versions, but neither
233   // mutate any state (i.e., they are just look-ups).
234   const PendingFrame& GetQueueEntry(FrameId frame_id) const;
235   PendingFrame& GetQueueEntry(FrameId frame_id);
236 
237   // Record that the target playout delay has changed starting with the given
238   // FrameId.
239   void RecordNewTargetPlayoutDelay(FrameId as_of_frame,
240                                    std::chrono::milliseconds delay);
241 
242   // Examine the known target playout delay changes to determine what setting is
243   // in-effect for the given frame.
244   std::chrono::milliseconds ResolveTargetPlayoutDelay(FrameId frame_id) const;
245 
246   // Called to move the checkpoint forward. This scans the queue, starting from
247   // |new_checkpoint|, to find the latest in a contiguous sequence of completed
248   // frames. Then, it records that frame as the new checkpoint, and immediately
249   // sends a feedback RTCP packet to the Sender.
250   void AdvanceCheckpoint(FrameId new_checkpoint);
251 
252   // Helper to force-drop all frames before |first_kept_frame|, even if they
253   // were never consumed. This will also auto-cancel frames that were never
254   // completely received, artificially moving the checkpoint forward, and
255   // notifying the Sender of that. The caller of this method is responsible for
256   // making sure that frame data dependencies will not be broken by dropping the
257   // frames.
258   void DropAllFramesBefore(FrameId first_kept_frame);
259 
260   // Sets the |consumption_alarm_| to check whether any frames are ready,
261   // including possibly skipping over late frames in order to make not-yet-late
262   // frames become ready. The default argument value means "without delay."
263   void ScheduleFrameReadyCheck(Clock::time_point when = Alarm::kImmediately);
264 
265   const ClockNowFunctionPtr now_;
266   ReceiverPacketRouter* const packet_router_;
267   const SessionConfig config_;
268   RtcpSession rtcp_session_;
269   SenderReportParser rtcp_parser_;
270   CompoundRtcpBuilder rtcp_builder_;
271   PacketReceiveStatsTracker stats_tracker_;  // Tracks transmission stats.
272   RtpPacketParser rtp_parser_;
273   const int rtp_timebase_;    // RTP timestamp ticks per second.
274   const FrameCrypto crypto_;  // Decrypts assembled frames.
275   bool is_pli_enabled_;       // Whether picture loss indication is enabled.
276 
277   // Buffer for serializing/sending RTCP packets.
278   const int rtcp_buffer_capacity_;
279   const std::unique_ptr<uint8_t[]> rtcp_buffer_;
280 
281   // Schedules tasks to ensure RTCP reports are sent within a bounded interval.
282   // Not scheduled until after this Receiver has processed the first packet from
283   // the Sender.
284   Alarm rtcp_alarm_;
285   Clock::time_point last_rtcp_send_time_ = Clock::time_point::min();
286 
287   // The last Sender Report received and when the packet containing it had
288   // arrived. This contains lip-sync timestamps used as part of the calculation
289   // of playout times for the received frames, as well as ping-pong data bounced
290   // back to the Sender in the Receiver Reports. It is nullopt until the first
291   // parseable Sender Report is received.
292   absl::optional<SenderReportParser::SenderReportWithId> last_sender_report_;
293   Clock::time_point last_sender_report_arrival_time_;
294 
295   // Tracks the offset between the Receiver's [local] clock and the Sender's
296   // clock. This is invalid until the first Sender Report has been successfully
297   // processed (i.e., |last_sender_report_| is not nullopt).
298   ClockDriftSmoother smoothed_clock_offset_;
299 
300   // The ID of the latest frame whose existence is known to this Receiver. This
301   // value must always be greater than or equal to |checkpoint_frame()|.
302   FrameId latest_frame_expected_ = FrameId::leader();
303 
304   // The ID of the last frame consumed. This value must always be less than or
305   // equal to |checkpoint_frame()|, since it's impossible to consume incomplete
306   // frames!
307   FrameId last_frame_consumed_ = FrameId::leader();
308 
309   // The ID of the latest key frame known to be in-flight. This is used by
310   // RequestKeyFrame() to ensure the PLI condition doesn't get set again until
311   // after the consumer has seen a key frame that would clear the condition.
312   FrameId last_key_frame_received_;
313 
314   // The frame queue (circular), which tracks which frames are in-flight, stores
315   // data for partially-received frames, and holds onto completed frames until
316   // the consumer consumes them.
317   //
318   // Use GetQueueEntry() to access a slot. The currently-active slots are those
319   // for the frames after |last_frame_consumed_| and up-to/including
320   // |latest_frame_expected_|.
321   std::array<PendingFrame, kMaxUnackedFrames> pending_frames_{};
322 
323   // Tracks the recent changes to the target playout delay, which is controlled
324   // by the Sender. The FrameId indicates the first frame where a new delay
325   // setting takes effect. This vector is never empty, is kept sorted, and is
326   // pruned to remain as small as possible.
327   //
328   // The target playout delay is the amount of time between a frame's
329   // capture/recording on the Sender and when it should be played-out at the
330   // Receiver.
331   std::vector<std::pair<FrameId, std::chrono::milliseconds>>
332       playout_delay_changes_;
333 
334   // The consumer to notify when there are one or more frames completed and
335   // ready to be consumed.
336   Consumer* consumer_ = nullptr;
337 
338   // The additional time needed to decode/play-out each frame after being
339   // consumed from this Receiver.
340   Clock::duration player_processing_time_ = kDefaultPlayerProcessingTime;
341 
342   // Scheduled to check whether there are frames ready and, if there are, to
343   // notify the Consumer via OnFramesReady().
344   Alarm consumption_alarm_;
345 
346   // The interval between sending ACK/NACK feedback RTCP messages while
347   // incomplete frames exist in the queue.
348   //
349   // TODO(miu): This should be a function of the current target playout delay,
350   // similar to the Sender's kickstart interval logic.
351   static constexpr std::chrono::milliseconds kNackFeedbackInterval{30};
352 };
353 
354 }  // namespace cast
355 }  // namespace openscreen
356 
357 #endif  // CAST_STREAMING_RECEIVER_H_
358