1 /*
2 * Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "modules/remote_bitrate_estimator/remote_bitrate_estimator_abs_send_time.h"
12
13 #include <math.h>
14
15 #include <algorithm>
16
17 #include "api/transport/field_trial_based_config.h"
18 #include "modules/remote_bitrate_estimator/include/remote_bitrate_estimator.h"
19 #include "rtc_base/checks.h"
20 #include "rtc_base/constructor_magic.h"
21 #include "rtc_base/logging.h"
22 #include "rtc_base/thread_annotations.h"
23 #include "system_wrappers/include/metrics.h"
24
25 namespace webrtc {
26 namespace {
OptionalRateFromOptionalBps(absl::optional<int> bitrate_bps)27 absl::optional<DataRate> OptionalRateFromOptionalBps(
28 absl::optional<int> bitrate_bps) {
29 if (bitrate_bps) {
30 return DataRate::BitsPerSec(*bitrate_bps);
31 } else {
32 return absl::nullopt;
33 }
34 }
35 } // namespace
36
37 enum {
38 kTimestampGroupLengthMs = 5,
39 kAbsSendTimeInterArrivalUpshift = 8,
40 kInterArrivalShift = RTPHeaderExtension::kAbsSendTimeFraction +
41 kAbsSendTimeInterArrivalUpshift,
42 kInitialProbingIntervalMs = 2000,
43 kMinClusterSize = 4,
44 kMaxProbePackets = 15,
45 kExpectedNumberOfProbes = 3
46 };
47
48 static const double kTimestampToMs =
49 1000.0 / static_cast<double>(1 << kInterArrivalShift);
50
51 template <typename K, typename V>
Keys(const std::map<K,V> & map)52 std::vector<K> Keys(const std::map<K, V>& map) {
53 std::vector<K> keys;
54 keys.reserve(map.size());
55 for (typename std::map<K, V>::const_iterator it = map.begin();
56 it != map.end(); ++it) {
57 keys.push_back(it->first);
58 }
59 return keys;
60 }
61
ConvertMsTo24Bits(int64_t time_ms)62 uint32_t ConvertMsTo24Bits(int64_t time_ms) {
63 uint32_t time_24_bits =
64 static_cast<uint32_t>(((static_cast<uint64_t>(time_ms)
65 << RTPHeaderExtension::kAbsSendTimeFraction) +
66 500) /
67 1000) &
68 0x00FFFFFF;
69 return time_24_bits;
70 }
71
72 RemoteBitrateEstimatorAbsSendTime::~RemoteBitrateEstimatorAbsSendTime() =
73 default;
74
IsWithinClusterBounds(int send_delta_ms,const Cluster & cluster_aggregate)75 bool RemoteBitrateEstimatorAbsSendTime::IsWithinClusterBounds(
76 int send_delta_ms,
77 const Cluster& cluster_aggregate) {
78 if (cluster_aggregate.count == 0)
79 return true;
80 float cluster_mean = cluster_aggregate.send_mean_ms /
81 static_cast<float>(cluster_aggregate.count);
82 return fabs(static_cast<float>(send_delta_ms) - cluster_mean) < 2.5f;
83 }
84
AddCluster(std::list<Cluster> * clusters,Cluster * cluster)85 void RemoteBitrateEstimatorAbsSendTime::AddCluster(std::list<Cluster>* clusters,
86 Cluster* cluster) {
87 cluster->send_mean_ms /= static_cast<float>(cluster->count);
88 cluster->recv_mean_ms /= static_cast<float>(cluster->count);
89 cluster->mean_size /= cluster->count;
90 clusters->push_back(*cluster);
91 }
92
RemoteBitrateEstimatorAbsSendTime(RemoteBitrateObserver * observer,Clock * clock)93 RemoteBitrateEstimatorAbsSendTime::RemoteBitrateEstimatorAbsSendTime(
94 RemoteBitrateObserver* observer,
95 Clock* clock)
96 : clock_(clock),
97 observer_(observer),
98 inter_arrival_(),
99 estimator_(),
100 detector_(&field_trials_),
101 incoming_bitrate_(kBitrateWindowMs, 8000),
102 incoming_bitrate_initialized_(false),
103 total_probes_received_(0),
104 first_packet_time_ms_(-1),
105 last_update_ms_(-1),
106 uma_recorded_(false),
107 remote_rate_(&field_trials_) {
108 RTC_DCHECK(clock_);
109 RTC_DCHECK(observer_);
110 RTC_LOG(LS_INFO) << "RemoteBitrateEstimatorAbsSendTime: Instantiating.";
111 }
112
ComputeClusters(std::list<Cluster> * clusters) const113 void RemoteBitrateEstimatorAbsSendTime::ComputeClusters(
114 std::list<Cluster>* clusters) const {
115 Cluster current;
116 int64_t prev_send_time = -1;
117 int64_t prev_recv_time = -1;
118 for (std::list<Probe>::const_iterator it = probes_.begin();
119 it != probes_.end(); ++it) {
120 if (prev_send_time >= 0) {
121 int send_delta_ms = it->send_time_ms - prev_send_time;
122 int recv_delta_ms = it->recv_time_ms - prev_recv_time;
123 if (send_delta_ms >= 1 && recv_delta_ms >= 1) {
124 ++current.num_above_min_delta;
125 }
126 if (!IsWithinClusterBounds(send_delta_ms, current)) {
127 if (current.count >= kMinClusterSize && current.send_mean_ms > 0.0f &&
128 current.recv_mean_ms > 0.0f) {
129 AddCluster(clusters, ¤t);
130 }
131 current = Cluster();
132 }
133 current.send_mean_ms += send_delta_ms;
134 current.recv_mean_ms += recv_delta_ms;
135 current.mean_size += it->payload_size;
136 ++current.count;
137 }
138 prev_send_time = it->send_time_ms;
139 prev_recv_time = it->recv_time_ms;
140 }
141 if (current.count >= kMinClusterSize && current.send_mean_ms > 0.0f &&
142 current.recv_mean_ms > 0.0f) {
143 AddCluster(clusters, ¤t);
144 }
145 }
146
147 std::list<Cluster>::const_iterator
FindBestProbe(const std::list<Cluster> & clusters) const148 RemoteBitrateEstimatorAbsSendTime::FindBestProbe(
149 const std::list<Cluster>& clusters) const {
150 int highest_probe_bitrate_bps = 0;
151 std::list<Cluster>::const_iterator best_it = clusters.end();
152 for (std::list<Cluster>::const_iterator it = clusters.begin();
153 it != clusters.end(); ++it) {
154 if (it->send_mean_ms == 0 || it->recv_mean_ms == 0)
155 continue;
156 if (it->num_above_min_delta > it->count / 2 &&
157 (it->recv_mean_ms - it->send_mean_ms <= 2.0f &&
158 it->send_mean_ms - it->recv_mean_ms <= 5.0f)) {
159 int probe_bitrate_bps =
160 std::min(it->GetSendBitrateBps(), it->GetRecvBitrateBps());
161 if (probe_bitrate_bps > highest_probe_bitrate_bps) {
162 highest_probe_bitrate_bps = probe_bitrate_bps;
163 best_it = it;
164 }
165 } else {
166 int send_bitrate_bps = it->mean_size * 8 * 1000 / it->send_mean_ms;
167 int recv_bitrate_bps = it->mean_size * 8 * 1000 / it->recv_mean_ms;
168 RTC_LOG(LS_INFO) << "Probe failed, sent at " << send_bitrate_bps
169 << " bps, received at " << recv_bitrate_bps
170 << " bps. Mean send delta: " << it->send_mean_ms
171 << " ms, mean recv delta: " << it->recv_mean_ms
172 << " ms, num probes: " << it->count;
173 break;
174 }
175 }
176 return best_it;
177 }
178
179 RemoteBitrateEstimatorAbsSendTime::ProbeResult
ProcessClusters(int64_t now_ms)180 RemoteBitrateEstimatorAbsSendTime::ProcessClusters(int64_t now_ms) {
181 std::list<Cluster> clusters;
182 ComputeClusters(&clusters);
183 if (clusters.empty()) {
184 // If we reach the max number of probe packets and still have no clusters,
185 // we will remove the oldest one.
186 if (probes_.size() >= kMaxProbePackets)
187 probes_.pop_front();
188 return ProbeResult::kNoUpdate;
189 }
190
191 std::list<Cluster>::const_iterator best_it = FindBestProbe(clusters);
192 if (best_it != clusters.end()) {
193 int probe_bitrate_bps =
194 std::min(best_it->GetSendBitrateBps(), best_it->GetRecvBitrateBps());
195 // Make sure that a probe sent on a lower bitrate than our estimate can't
196 // reduce the estimate.
197 if (IsBitrateImproving(probe_bitrate_bps)) {
198 RTC_LOG(LS_INFO) << "Probe successful, sent at "
199 << best_it->GetSendBitrateBps() << " bps, received at "
200 << best_it->GetRecvBitrateBps()
201 << " bps. Mean send delta: " << best_it->send_mean_ms
202 << " ms, mean recv delta: " << best_it->recv_mean_ms
203 << " ms, num probes: " << best_it->count;
204 remote_rate_.SetEstimate(DataRate::BitsPerSec(probe_bitrate_bps),
205 Timestamp::Millis(now_ms));
206 return ProbeResult::kBitrateUpdated;
207 }
208 }
209
210 // Not probing and received non-probe packet, or finished with current set
211 // of probes.
212 if (clusters.size() >= kExpectedNumberOfProbes)
213 probes_.clear();
214 return ProbeResult::kNoUpdate;
215 }
216
IsBitrateImproving(int new_bitrate_bps) const217 bool RemoteBitrateEstimatorAbsSendTime::IsBitrateImproving(
218 int new_bitrate_bps) const {
219 bool initial_probe = !remote_rate_.ValidEstimate() && new_bitrate_bps > 0;
220 bool bitrate_above_estimate =
221 remote_rate_.ValidEstimate() &&
222 new_bitrate_bps > remote_rate_.LatestEstimate().bps<int>();
223 return initial_probe || bitrate_above_estimate;
224 }
225
IncomingPacket(int64_t arrival_time_ms,size_t payload_size,const RTPHeader & header)226 void RemoteBitrateEstimatorAbsSendTime::IncomingPacket(
227 int64_t arrival_time_ms,
228 size_t payload_size,
229 const RTPHeader& header) {
230 RTC_DCHECK_RUNS_SERIALIZED(&network_race_);
231 if (!header.extension.hasAbsoluteSendTime) {
232 RTC_LOG(LS_WARNING)
233 << "RemoteBitrateEstimatorAbsSendTimeImpl: Incoming packet "
234 "is missing absolute send time extension!";
235 return;
236 }
237 IncomingPacketInfo(arrival_time_ms, header.extension.absoluteSendTime,
238 payload_size, header.ssrc);
239 }
240
IncomingPacketInfo(int64_t arrival_time_ms,uint32_t send_time_24bits,size_t payload_size,uint32_t ssrc)241 void RemoteBitrateEstimatorAbsSendTime::IncomingPacketInfo(
242 int64_t arrival_time_ms,
243 uint32_t send_time_24bits,
244 size_t payload_size,
245 uint32_t ssrc) {
246 RTC_CHECK(send_time_24bits < (1ul << 24));
247 if (!uma_recorded_) {
248 RTC_HISTOGRAM_ENUMERATION(kBweTypeHistogram, BweNames::kReceiverAbsSendTime,
249 BweNames::kBweNamesMax);
250 uma_recorded_ = true;
251 }
252 // Shift up send time to use the full 32 bits that inter_arrival works with,
253 // so wrapping works properly.
254 uint32_t timestamp = send_time_24bits << kAbsSendTimeInterArrivalUpshift;
255 int64_t send_time_ms = static_cast<int64_t>(timestamp) * kTimestampToMs;
256
257 int64_t now_ms = clock_->TimeInMilliseconds();
258 // TODO(holmer): SSRCs are only needed for REMB, should be broken out from
259 // here.
260
261 // Check if incoming bitrate estimate is valid, and if it needs to be reset.
262 absl::optional<uint32_t> incoming_bitrate =
263 incoming_bitrate_.Rate(arrival_time_ms);
264 if (incoming_bitrate) {
265 incoming_bitrate_initialized_ = true;
266 } else if (incoming_bitrate_initialized_) {
267 // Incoming bitrate had a previous valid value, but now not enough data
268 // point are left within the current window. Reset incoming bitrate
269 // estimator so that the window size will only contain new data points.
270 incoming_bitrate_.Reset();
271 incoming_bitrate_initialized_ = false;
272 }
273 incoming_bitrate_.Update(payload_size, arrival_time_ms);
274
275 if (first_packet_time_ms_ == -1)
276 first_packet_time_ms_ = now_ms;
277
278 uint32_t ts_delta = 0;
279 int64_t t_delta = 0;
280 int size_delta = 0;
281 bool update_estimate = false;
282 uint32_t target_bitrate_bps = 0;
283 std::vector<uint32_t> ssrcs;
284 {
285 MutexLock lock(&mutex_);
286
287 TimeoutStreams(now_ms);
288 RTC_DCHECK(inter_arrival_.get());
289 RTC_DCHECK(estimator_.get());
290 ssrcs_[ssrc] = now_ms;
291
292 // For now only try to detect probes while we don't have a valid estimate.
293 // We currently assume that only packets larger than 200 bytes are paced by
294 // the sender.
295 const size_t kMinProbePacketSize = 200;
296 if (payload_size > kMinProbePacketSize &&
297 (!remote_rate_.ValidEstimate() ||
298 now_ms - first_packet_time_ms_ < kInitialProbingIntervalMs)) {
299 // TODO(holmer): Use a map instead to get correct order?
300 if (total_probes_received_ < kMaxProbePackets) {
301 int send_delta_ms = -1;
302 int recv_delta_ms = -1;
303 if (!probes_.empty()) {
304 send_delta_ms = send_time_ms - probes_.back().send_time_ms;
305 recv_delta_ms = arrival_time_ms - probes_.back().recv_time_ms;
306 }
307 RTC_LOG(LS_INFO) << "Probe packet received: send time=" << send_time_ms
308 << " ms, recv time=" << arrival_time_ms
309 << " ms, send delta=" << send_delta_ms
310 << " ms, recv delta=" << recv_delta_ms << " ms.";
311 }
312 probes_.push_back(Probe(send_time_ms, arrival_time_ms, payload_size));
313 ++total_probes_received_;
314 // Make sure that a probe which updated the bitrate immediately has an
315 // effect by calling the OnReceiveBitrateChanged callback.
316 if (ProcessClusters(now_ms) == ProbeResult::kBitrateUpdated)
317 update_estimate = true;
318 }
319 if (inter_arrival_->ComputeDeltas(timestamp, arrival_time_ms, now_ms,
320 payload_size, &ts_delta, &t_delta,
321 &size_delta)) {
322 double ts_delta_ms = (1000.0 * ts_delta) / (1 << kInterArrivalShift);
323 estimator_->Update(t_delta, ts_delta_ms, size_delta, detector_.State(),
324 arrival_time_ms);
325 detector_.Detect(estimator_->offset(), ts_delta_ms,
326 estimator_->num_of_deltas(), arrival_time_ms);
327 }
328
329 if (!update_estimate) {
330 // Check if it's time for a periodic update or if we should update because
331 // of an over-use.
332 if (last_update_ms_ == -1 ||
333 now_ms - last_update_ms_ > remote_rate_.GetFeedbackInterval().ms()) {
334 update_estimate = true;
335 } else if (detector_.State() == BandwidthUsage::kBwOverusing) {
336 absl::optional<uint32_t> incoming_rate =
337 incoming_bitrate_.Rate(arrival_time_ms);
338 if (incoming_rate && remote_rate_.TimeToReduceFurther(
339 Timestamp::Millis(now_ms),
340 DataRate::BitsPerSec(*incoming_rate))) {
341 update_estimate = true;
342 }
343 }
344 }
345
346 if (update_estimate) {
347 // The first overuse should immediately trigger a new estimate.
348 // We also have to update the estimate immediately if we are overusing
349 // and the target bitrate is too high compared to what we are receiving.
350 const RateControlInput input(
351 detector_.State(),
352 OptionalRateFromOptionalBps(incoming_bitrate_.Rate(arrival_time_ms)));
353 target_bitrate_bps =
354 remote_rate_.Update(&input, Timestamp::Millis(now_ms))
355 .bps<uint32_t>();
356 update_estimate = remote_rate_.ValidEstimate();
357 ssrcs = Keys(ssrcs_);
358 }
359 }
360 if (update_estimate) {
361 last_update_ms_ = now_ms;
362 observer_->OnReceiveBitrateChanged(ssrcs, target_bitrate_bps);
363 }
364 }
365
Process()366 void RemoteBitrateEstimatorAbsSendTime::Process() {}
367
TimeUntilNextProcess()368 int64_t RemoteBitrateEstimatorAbsSendTime::TimeUntilNextProcess() {
369 const int64_t kDisabledModuleTime = 1000;
370 return kDisabledModuleTime;
371 }
372
TimeoutStreams(int64_t now_ms)373 void RemoteBitrateEstimatorAbsSendTime::TimeoutStreams(int64_t now_ms) {
374 for (Ssrcs::iterator it = ssrcs_.begin(); it != ssrcs_.end();) {
375 if ((now_ms - it->second) > kStreamTimeOutMs) {
376 ssrcs_.erase(it++);
377 } else {
378 ++it;
379 }
380 }
381 if (ssrcs_.empty()) {
382 // We can't update the estimate if we don't have any active streams.
383 inter_arrival_.reset(
384 new InterArrival((kTimestampGroupLengthMs << kInterArrivalShift) / 1000,
385 kTimestampToMs, true));
386 estimator_.reset(new OveruseEstimator(OverUseDetectorOptions()));
387 // We deliberately don't reset the first_packet_time_ms_ here for now since
388 // we only probe for bandwidth in the beginning of a call right now.
389 }
390 }
391
OnRttUpdate(int64_t avg_rtt_ms,int64_t max_rtt_ms)392 void RemoteBitrateEstimatorAbsSendTime::OnRttUpdate(int64_t avg_rtt_ms,
393 int64_t max_rtt_ms) {
394 MutexLock lock(&mutex_);
395 remote_rate_.SetRtt(TimeDelta::Millis(avg_rtt_ms));
396 }
397
RemoveStream(uint32_t ssrc)398 void RemoteBitrateEstimatorAbsSendTime::RemoveStream(uint32_t ssrc) {
399 MutexLock lock(&mutex_);
400 ssrcs_.erase(ssrc);
401 }
402
LatestEstimate(std::vector<uint32_t> * ssrcs,uint32_t * bitrate_bps) const403 bool RemoteBitrateEstimatorAbsSendTime::LatestEstimate(
404 std::vector<uint32_t>* ssrcs,
405 uint32_t* bitrate_bps) const {
406 // Currently accessed from both the process thread (see
407 // ModuleRtpRtcpImpl::Process()) and the configuration thread (see
408 // Call::GetStats()). Should in the future only be accessed from a single
409 // thread.
410 RTC_DCHECK(ssrcs);
411 RTC_DCHECK(bitrate_bps);
412 MutexLock lock(&mutex_);
413 if (!remote_rate_.ValidEstimate()) {
414 return false;
415 }
416 *ssrcs = Keys(ssrcs_);
417 if (ssrcs_.empty()) {
418 *bitrate_bps = 0;
419 } else {
420 *bitrate_bps = remote_rate_.LatestEstimate().bps<uint32_t>();
421 }
422 return true;
423 }
424
SetMinBitrate(int min_bitrate_bps)425 void RemoteBitrateEstimatorAbsSendTime::SetMinBitrate(int min_bitrate_bps) {
426 // Called from both the configuration thread and the network thread. Shouldn't
427 // be called from the network thread in the future.
428 MutexLock lock(&mutex_);
429 remote_rate_.SetMinBitrate(DataRate::BitsPerSec(min_bitrate_bps));
430 }
431 } // namespace webrtc
432