1 // Copyright 2019 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef CAST_STREAMING_RECEIVER_H_ 6 #define CAST_STREAMING_RECEIVER_H_ 7 8 #include <stdint.h> 9 10 #include <array> 11 #include <chrono> 12 #include <memory> 13 #include <utility> 14 #include <vector> 15 16 #include "absl/types/optional.h" 17 #include "absl/types/span.h" 18 #include "cast/streaming/clock_drift_smoother.h" 19 #include "cast/streaming/compound_rtcp_builder.h" 20 #include "cast/streaming/environment.h" 21 #include "cast/streaming/frame_collector.h" 22 #include "cast/streaming/frame_id.h" 23 #include "cast/streaming/packet_receive_stats_tracker.h" 24 #include "cast/streaming/rtcp_common.h" 25 #include "cast/streaming/rtcp_session.h" 26 #include "cast/streaming/rtp_packet_parser.h" 27 #include "cast/streaming/sender_report_parser.h" 28 #include "cast/streaming/session_config.h" 29 #include "cast/streaming/ssrc.h" 30 #include "platform/api/time.h" 31 #include "util/alarm.h" 32 33 namespace openscreen { 34 namespace cast { 35 36 struct EncodedFrame; 37 class ReceiverPacketRouter; 38 39 // The Cast Streaming Receiver, a peer corresponding to some Cast Streaming 40 // Sender at the other end of a network link. 41 // 42 // Cast Streaming is a transport protocol which divides up the frames for one 43 // media stream (e.g., audio or video) into multiple RTP packets containing an 44 // encrypted payload. The Receiver is the peer responsible for collecting the 45 // RTP packets, decrypting the payload, and re-assembling a frame that can be 46 // passed to a decoder and played out. 47 // 48 // A Sender ↔ Receiver pair is used to transport each media stream. Typically, 49 // there are two pairs in a normal system, one for the audio stream and one for 50 // video stream. A local player is responsible for synchronizing the playout of 51 // the frames of each stream to achieve lip-sync. See the discussion in 52 // encoded_frame.h for how the |reference_time| and |rtp_timestamp| of the 53 // EncodedFrames are used to achieve this. 54 // 55 // See the Receiver Demo app for a reference implementation that both shows and 56 // explains how Receivers are properly configured and started, integrated with a 57 // decoder, and the resulting decoded media is played out. Also, here is a 58 // general usage example: 59 // 60 // class MyPlayer : public openscreen::cast::Receiver::Consumer { 61 // public: 62 // explicit MyPlayer(Receiver* receiver) : receiver_(receiver) { 63 // recevier_->SetPlayerProcessingTime(std::chrono::milliseconds(10)); 64 // receiver_->SetConsumer(this); 65 // } 66 // 67 // ~MyPlayer() override { 68 // receiver_->SetConsumer(nullptr); 69 // } 70 // 71 // private: 72 // // Receiver::Consumer implementation. 73 // void OnFramesReady(int next_frame_buffer_size) override { 74 // std::vector<uint8_t> buffer; 75 // buffer.resize(next_frame_buffer_size); 76 // openscreen::cast::EncodedFrame encoded_frame = 77 // receiver_->ConsumeNextFrame(absl::Span<uint8_t>(buffer)); 78 // 79 // display_.RenderFrame(decoder_.DecodeFrame(encoded_frame.data)); 80 // 81 // // Note: An implementation could call receiver_->AdvanceToNextFrame() 82 // // and receiver_->ConsumeNextFrame() in a loop here, to consume all the 83 // // remaining frames that are ready. 84 // } 85 // 86 // Receiver* const receiver_; 87 // MyDecoder decoder_; 88 // MyDisplay display_; 89 // }; 90 // 91 // Internally, a queue of complete and partially-received frames is maintained. 92 // The queue is a circular queue of FrameCollectors that each maintain the 93 // individual receive state of each in-flight frame. There are three conceptual 94 // "pointers" that indicate what assumptions and operations are made on certain 95 // ranges of frames in the queue: 96 // 97 // 1. Latest Frame Expected: The FrameId of the latest frame whose existence 98 // is known to this Receiver. This is the highest FrameId seen in any 99 // successfully-parsed RTP packet. 100 // 2. Checkpoint Frame: Indicates that all of the RTP packets for all frames 101 // up to and including the one having this FrameId have been successfully 102 // received and processed. 103 // 3. Last Frame Consumed: The FrameId of last frame consumed (see 104 // ConsumeNextFrame()). Once a frame is consumed, all internal resources 105 // related to the frame can be freed and/or re-used for later frames. 106 class Receiver { 107 public: 108 class Consumer { 109 public: 110 virtual ~Consumer(); 111 112 // Called whenever one or more frames have become ready for consumption. The 113 // |next_frame_buffer_size| argument is identical to the result of calling 114 // AdvanceToNextFrame(), and so the Consumer only needs to prepare a buffer 115 // and call ConsumeNextFrame(). It may then call AdvanceToNextFrame() to 116 // check whether there are any more frames ready, but this is not mandatory. 117 // See usage example in class-level comments. 118 virtual void OnFramesReady(int next_frame_buffer_size) = 0; 119 }; 120 121 // Constructs a Receiver that attaches to the given |environment| and 122 // |packet_router|. The config contains the settings that were 123 // agreed-upon by both sides from the OFFER/ANSWER exchange (i.e., the part of 124 // the overall end-to-end connection process that occurs before Cast Streaming 125 // is started). 126 Receiver(Environment* environment, 127 ReceiverPacketRouter* packet_router, 128 SessionConfig config); 129 ~Receiver(); 130 config()131 const SessionConfig& config() const { return config_; } rtp_timebase()132 int rtp_timebase() const { return rtp_timebase_; } ssrc()133 Ssrc ssrc() const { return rtcp_session_.receiver_ssrc(); } 134 135 // Set the Consumer receiving notifications when new frames are ready for 136 // consumption. Frames received before this method is called will remain in 137 // the queue indefinitely. 138 void SetConsumer(Consumer* consumer); 139 140 // Sets how much time the consumer will need to decode/buffer/render/etc., and 141 // otherwise fully process a frame for on-time playback. This information is 142 // used by the Receiver to decide whether to skip past frames that have 143 // arrived too late. This method can be called repeatedly to make adjustments 144 // based on changing environmental conditions. 145 // 146 // Default setting: kDefaultPlayerProcessingTime 147 void SetPlayerProcessingTime(Clock::duration needed_time); 148 149 // Propagates a "picture loss indicator" notification to the Sender, 150 // requesting a key frame so that decode/playout can recover. It is safe to 151 // call this redundantly. The Receiver will clear the picture loss condition 152 // automatically, once a key frame is received (i.e., before 153 // ConsumeNextFrame() is called to access it). 154 void RequestKeyFrame(); 155 156 // Advances to the next frame ready for consumption. This may skip-over 157 // incomplete frames that will not play out on-time; but only if there are 158 // completed frames further down the queue that have no dependency 159 // relationship with them (e.g., key frames). 160 // 161 // This method returns kNoFramesReady if there is not currently a frame ready 162 // for consumption. The caller should wait for a Consumer::OnFramesReady() 163 // notification before trying again. Otherwise, the number of bytes of encoded 164 // data is returned, and the caller should use this to ensure the buffer it 165 // passes to ConsumeNextFrame() is large enough. 166 int AdvanceToNextFrame(); 167 168 // Returns the next frame, both metadata and payload data. The Consumer calls 169 // this method after being notified via OnFramesReady(), and it can also call 170 // this whenever AdvanceToNextFrame() indicates another frame is ready. 171 // |buffer| must point to a sufficiently-sized buffer that will be populated 172 // with the frame's payload data. Upon return |frame->data| will be set to the 173 // portion of the buffer that was populated. 174 EncodedFrame ConsumeNextFrame(absl::Span<uint8_t> buffer); 175 176 // Allows setting picture loss indication for testing. In production, this 177 // should be done using the config. SetPliEnabledForTesting(bool is_pli_enabled)178 void SetPliEnabledForTesting(bool is_pli_enabled) { 179 is_pli_enabled_ = is_pli_enabled; 180 } 181 182 // The default "player processing time" amount. See SetPlayerProcessingTime(). 183 static constexpr std::chrono::milliseconds kDefaultPlayerProcessingTime{5}; 184 185 // Returned by AdvanceToNextFrame() when there are no frames currently ready 186 // for consumption. 187 static constexpr int kNoFramesReady = -1; 188 189 protected: 190 friend class ReceiverPacketRouter; 191 192 // Called by ReceiverPacketRouter to provide this Receiver with what looks 193 // like a RTP/RTCP packet meant for it specifically (among other Receivers). 194 void OnReceivedRtpPacket(Clock::time_point arrival_time, 195 std::vector<uint8_t> packet); 196 void OnReceivedRtcpPacket(Clock::time_point arrival_time, 197 std::vector<uint8_t> packet); 198 199 private: 200 // An entry in the circular queue (see |pending_frames_|). 201 struct PendingFrame { 202 FrameCollector collector; 203 204 // The Receiver's [local] Clock time when this frame was originally captured 205 // at the Sender. This is computed and assigned when the RTP packet with ID 206 // 0 is processed. Add the target playout delay to this to get the target 207 // playout time. 208 absl::optional<Clock::time_point> estimated_capture_time; 209 210 PendingFrame(); 211 ~PendingFrame(); 212 213 // Reset this entry to its initial state, freeing resources. 214 void Reset(); 215 }; 216 217 // Get/Set the checkpoint FrameId. This indicates that all of the packets for 218 // all frames up to and including this FrameId have been successfully received 219 // (or otherwise do not need to be re-transmitted). checkpoint_frame()220 FrameId checkpoint_frame() const { return rtcp_builder_.checkpoint_frame(); } set_checkpoint_frame(FrameId frame_id)221 void set_checkpoint_frame(FrameId frame_id) { 222 rtcp_builder_.SetCheckpointFrame(frame_id); 223 } 224 225 // Send an RTCP packet to the Sender immediately, to acknowledge the complete 226 // reception of one or more additional frames, to reply to a Sender Report, or 227 // to request re-transmits. Calling this also schedules additional RTCP 228 // packets to be sent periodically for the life of this Receiver. 229 void SendRtcp(); 230 231 // Helpers to map the given |frame_id| to the element in the |pending_frames_| 232 // circular queue. There are both const and non-const versions, but neither 233 // mutate any state (i.e., they are just look-ups). 234 const PendingFrame& GetQueueEntry(FrameId frame_id) const; 235 PendingFrame& GetQueueEntry(FrameId frame_id); 236 237 // Record that the target playout delay has changed starting with the given 238 // FrameId. 239 void RecordNewTargetPlayoutDelay(FrameId as_of_frame, 240 std::chrono::milliseconds delay); 241 242 // Examine the known target playout delay changes to determine what setting is 243 // in-effect for the given frame. 244 std::chrono::milliseconds ResolveTargetPlayoutDelay(FrameId frame_id) const; 245 246 // Called to move the checkpoint forward. This scans the queue, starting from 247 // |new_checkpoint|, to find the latest in a contiguous sequence of completed 248 // frames. Then, it records that frame as the new checkpoint, and immediately 249 // sends a feedback RTCP packet to the Sender. 250 void AdvanceCheckpoint(FrameId new_checkpoint); 251 252 // Helper to force-drop all frames before |first_kept_frame|, even if they 253 // were never consumed. This will also auto-cancel frames that were never 254 // completely received, artificially moving the checkpoint forward, and 255 // notifying the Sender of that. The caller of this method is responsible for 256 // making sure that frame data dependencies will not be broken by dropping the 257 // frames. 258 void DropAllFramesBefore(FrameId first_kept_frame); 259 260 // Sets the |consumption_alarm_| to check whether any frames are ready, 261 // including possibly skipping over late frames in order to make not-yet-late 262 // frames become ready. The default argument value means "without delay." 263 void ScheduleFrameReadyCheck(Clock::time_point when = Alarm::kImmediately); 264 265 const ClockNowFunctionPtr now_; 266 ReceiverPacketRouter* const packet_router_; 267 const SessionConfig config_; 268 RtcpSession rtcp_session_; 269 SenderReportParser rtcp_parser_; 270 CompoundRtcpBuilder rtcp_builder_; 271 PacketReceiveStatsTracker stats_tracker_; // Tracks transmission stats. 272 RtpPacketParser rtp_parser_; 273 const int rtp_timebase_; // RTP timestamp ticks per second. 274 const FrameCrypto crypto_; // Decrypts assembled frames. 275 bool is_pli_enabled_; // Whether picture loss indication is enabled. 276 277 // Buffer for serializing/sending RTCP packets. 278 const int rtcp_buffer_capacity_; 279 const std::unique_ptr<uint8_t[]> rtcp_buffer_; 280 281 // Schedules tasks to ensure RTCP reports are sent within a bounded interval. 282 // Not scheduled until after this Receiver has processed the first packet from 283 // the Sender. 284 Alarm rtcp_alarm_; 285 Clock::time_point last_rtcp_send_time_ = Clock::time_point::min(); 286 287 // The last Sender Report received and when the packet containing it had 288 // arrived. This contains lip-sync timestamps used as part of the calculation 289 // of playout times for the received frames, as well as ping-pong data bounced 290 // back to the Sender in the Receiver Reports. It is nullopt until the first 291 // parseable Sender Report is received. 292 absl::optional<SenderReportParser::SenderReportWithId> last_sender_report_; 293 Clock::time_point last_sender_report_arrival_time_; 294 295 // Tracks the offset between the Receiver's [local] clock and the Sender's 296 // clock. This is invalid until the first Sender Report has been successfully 297 // processed (i.e., |last_sender_report_| is not nullopt). 298 ClockDriftSmoother smoothed_clock_offset_; 299 300 // The ID of the latest frame whose existence is known to this Receiver. This 301 // value must always be greater than or equal to |checkpoint_frame()|. 302 FrameId latest_frame_expected_ = FrameId::leader(); 303 304 // The ID of the last frame consumed. This value must always be less than or 305 // equal to |checkpoint_frame()|, since it's impossible to consume incomplete 306 // frames! 307 FrameId last_frame_consumed_ = FrameId::leader(); 308 309 // The ID of the latest key frame known to be in-flight. This is used by 310 // RequestKeyFrame() to ensure the PLI condition doesn't get set again until 311 // after the consumer has seen a key frame that would clear the condition. 312 FrameId last_key_frame_received_; 313 314 // The frame queue (circular), which tracks which frames are in-flight, stores 315 // data for partially-received frames, and holds onto completed frames until 316 // the consumer consumes them. 317 // 318 // Use GetQueueEntry() to access a slot. The currently-active slots are those 319 // for the frames after |last_frame_consumed_| and up-to/including 320 // |latest_frame_expected_|. 321 std::array<PendingFrame, kMaxUnackedFrames> pending_frames_{}; 322 323 // Tracks the recent changes to the target playout delay, which is controlled 324 // by the Sender. The FrameId indicates the first frame where a new delay 325 // setting takes effect. This vector is never empty, is kept sorted, and is 326 // pruned to remain as small as possible. 327 // 328 // The target playout delay is the amount of time between a frame's 329 // capture/recording on the Sender and when it should be played-out at the 330 // Receiver. 331 std::vector<std::pair<FrameId, std::chrono::milliseconds>> 332 playout_delay_changes_; 333 334 // The consumer to notify when there are one or more frames completed and 335 // ready to be consumed. 336 Consumer* consumer_ = nullptr; 337 338 // The additional time needed to decode/play-out each frame after being 339 // consumed from this Receiver. 340 Clock::duration player_processing_time_ = kDefaultPlayerProcessingTime; 341 342 // Scheduled to check whether there are frames ready and, if there are, to 343 // notify the Consumer via OnFramesReady(). 344 Alarm consumption_alarm_; 345 346 // The interval between sending ACK/NACK feedback RTCP messages while 347 // incomplete frames exist in the queue. 348 // 349 // TODO(miu): This should be a function of the current target playout delay, 350 // similar to the Sender's kickstart interval logic. 351 static constexpr std::chrono::milliseconds kNackFeedbackInterval{30}; 352 }; 353 354 } // namespace cast 355 } // namespace openscreen 356 357 #endif // CAST_STREAMING_RECEIVER_H_ 358