1 /*
2  *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 #include "modules/congestion_controller/rtp/transport_feedback_demuxer.h"
11 #include "absl/algorithm/container.h"
12 #include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h"
13 
14 namespace webrtc {
15 namespace {
16 static const size_t kMaxPacketsInHistory = 5000;
17 }
RegisterStreamFeedbackObserver(std::vector<uint32_t> ssrcs,StreamFeedbackObserver * observer)18 void TransportFeedbackDemuxer::RegisterStreamFeedbackObserver(
19     std::vector<uint32_t> ssrcs,
20     StreamFeedbackObserver* observer) {
21   MutexLock lock(&observers_lock_);
22   RTC_DCHECK(observer);
23   RTC_DCHECK(absl::c_find_if(observers_, [=](const auto& pair) {
24                return pair.second == observer;
25              }) == observers_.end());
26   observers_.push_back({ssrcs, observer});
27 }
28 
DeRegisterStreamFeedbackObserver(StreamFeedbackObserver * observer)29 void TransportFeedbackDemuxer::DeRegisterStreamFeedbackObserver(
30     StreamFeedbackObserver* observer) {
31   MutexLock lock(&observers_lock_);
32   RTC_DCHECK(observer);
33   const auto it = absl::c_find_if(
34       observers_, [=](const auto& pair) { return pair.second == observer; });
35   RTC_DCHECK(it != observers_.end());
36   observers_.erase(it);
37 }
38 
AddPacket(const RtpPacketSendInfo & packet_info)39 void TransportFeedbackDemuxer::AddPacket(const RtpPacketSendInfo& packet_info) {
40   MutexLock lock(&lock_);
41   if (packet_info.ssrc != 0) {
42     StreamFeedbackObserver::StreamPacketInfo info;
43     info.ssrc = packet_info.ssrc;
44     info.rtp_sequence_number = packet_info.rtp_sequence_number;
45     info.received = false;
46     history_.insert(
47         {seq_num_unwrapper_.Unwrap(packet_info.transport_sequence_number),
48          info});
49   }
50   while (history_.size() > kMaxPacketsInHistory) {
51     history_.erase(history_.begin());
52   }
53 }
54 
OnTransportFeedback(const rtcp::TransportFeedback & feedback)55 void TransportFeedbackDemuxer::OnTransportFeedback(
56     const rtcp::TransportFeedback& feedback) {
57   std::vector<StreamFeedbackObserver::StreamPacketInfo> stream_feedbacks;
58   {
59     MutexLock lock(&lock_);
60     for (const auto& packet : feedback.GetAllPackets()) {
61       int64_t seq_num =
62           seq_num_unwrapper_.UnwrapWithoutUpdate(packet.sequence_number());
63       auto it = history_.find(seq_num);
64       if (it != history_.end()) {
65         auto packet_info = it->second;
66         packet_info.received = packet.received();
67         stream_feedbacks.push_back(packet_info);
68         if (packet.received())
69           history_.erase(it);
70       }
71     }
72   }
73 
74   MutexLock lock(&observers_lock_);
75   for (auto& observer : observers_) {
76     std::vector<StreamFeedbackObserver::StreamPacketInfo> selected_feedback;
77     for (const auto& packet_info : stream_feedbacks) {
78       if (absl::c_count(observer.first, packet_info.ssrc) > 0) {
79         selected_feedback.push_back(packet_info);
80       }
81     }
82     if (!selected_feedback.empty()) {
83       observer.second->OnPacketFeedbackVector(std::move(selected_feedback));
84     }
85   }
86 }
87 
88 }  // namespace webrtc
89