1 /*
2  *  Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "webrtc/modules/remote_bitrate_estimator/remote_bitrate_estimator_abs_send_time.h"
12 
13 #include <math.h>
14 
15 #include <algorithm>
16 
17 #include "webrtc/base/constructormagic.h"
18 #include "webrtc/base/logging.h"
19 #include "webrtc/base/scoped_ptr.h"
20 #include "webrtc/base/thread_annotations.h"
21 #include "webrtc/modules/pacing/paced_sender.h"
22 #include "webrtc/modules/remote_bitrate_estimator/include/remote_bitrate_estimator.h"
23 #include "webrtc/system_wrappers/include/clock.h"
24 #include "webrtc/system_wrappers/include/critical_section_wrapper.h"
25 #include "webrtc/typedefs.h"
26 
27 namespace webrtc {
28 
29 enum {
30   kTimestampGroupLengthMs = 5,
31   kAbsSendTimeFraction = 18,
32   kAbsSendTimeInterArrivalUpshift = 8,
33   kInterArrivalShift = kAbsSendTimeFraction + kAbsSendTimeInterArrivalUpshift,
34   kInitialProbingIntervalMs = 2000,
35   kMinClusterSize = 4,
36   kMaxProbePackets = 15,
37   kExpectedNumberOfProbes = 3
38 };
39 
40 static const size_t kPropagationDeltaQueueMaxSize = 1000;
41 static const int64_t kPropagationDeltaQueueMaxTimeMs = 1000;
42 static const double kTimestampToMs = 1000.0 /
43     static_cast<double>(1 << kInterArrivalShift);
44 
45 // Removes the entries at |index| of |time| and |value|, if time[index] is
46 // smaller than or equal to |deadline|. |time| must be sorted ascendingly.
RemoveStaleEntries(std::vector<int64_t> * time,std::vector<int> * value,int64_t deadline)47 static void RemoveStaleEntries(
48     std::vector<int64_t>* time, std::vector<int>* value, int64_t deadline) {
49   assert(time->size() == value->size());
50   std::vector<int64_t>::iterator end_of_removal = std::upper_bound(
51       time->begin(), time->end(), deadline);
52   size_t end_of_removal_index = end_of_removal - time->begin();
53 
54   time->erase(time->begin(), end_of_removal);
55   value->erase(value->begin(), value->begin() + end_of_removal_index);
56 }
57 
58 template<typename K, typename V>
Keys(const std::map<K,V> & map)59 std::vector<K> Keys(const std::map<K, V>& map) {
60   std::vector<K> keys;
61   keys.reserve(map.size());
62   for (typename std::map<K, V>::const_iterator it = map.begin();
63       it != map.end(); ++it) {
64     keys.push_back(it->first);
65   }
66   return keys;
67 }
68 
ConvertMsTo24Bits(int64_t time_ms)69 uint32_t ConvertMsTo24Bits(int64_t time_ms) {
70   uint32_t time_24_bits =
71       static_cast<uint32_t>(
72           ((static_cast<uint64_t>(time_ms) << kAbsSendTimeFraction) + 500) /
73           1000) &
74       0x00FFFFFF;
75   return time_24_bits;
76 }
77 
IsWithinClusterBounds(int send_delta_ms,const Cluster & cluster_aggregate)78 bool RemoteBitrateEstimatorAbsSendTime::IsWithinClusterBounds(
79     int send_delta_ms,
80     const Cluster& cluster_aggregate) {
81     if (cluster_aggregate.count == 0)
82       return true;
83     float cluster_mean = cluster_aggregate.send_mean_ms /
84                          static_cast<float>(cluster_aggregate.count);
85     return fabs(static_cast<float>(send_delta_ms) - cluster_mean) < 2.5f;
86   }
87 
AddCluster(std::list<Cluster> * clusters,Cluster * cluster)88   void RemoteBitrateEstimatorAbsSendTime::AddCluster(
89       std::list<Cluster>* clusters,
90       Cluster* cluster) {
91     cluster->send_mean_ms /= static_cast<float>(cluster->count);
92     cluster->recv_mean_ms /= static_cast<float>(cluster->count);
93     cluster->mean_size /= cluster->count;
94     clusters->push_back(*cluster);
95   }
96 
Id() const97   int RemoteBitrateEstimatorAbsSendTime::Id() const {
98     return static_cast<int>(reinterpret_cast<uint64_t>(this));
99   }
100 
RemoteBitrateEstimatorAbsSendTime(RemoteBitrateObserver * observer,Clock * clock)101   RemoteBitrateEstimatorAbsSendTime::RemoteBitrateEstimatorAbsSendTime(
102       RemoteBitrateObserver* observer,
103       Clock* clock)
104       : crit_sect_(CriticalSectionWrapper::CreateCriticalSection()),
105         observer_(observer),
106         clock_(clock),
107         ssrcs_(),
108         inter_arrival_(),
109         estimator_(OverUseDetectorOptions()),
110         detector_(OverUseDetectorOptions()),
111         incoming_bitrate_(kBitrateWindowMs, 8000),
112         last_process_time_(-1),
113         process_interval_ms_(kProcessIntervalMs),
114         total_propagation_delta_ms_(0),
115         total_probes_received_(0),
116         first_packet_time_ms_(-1) {
117   assert(observer_);
118   assert(clock_);
119   LOG(LS_INFO) << "RemoteBitrateEstimatorAbsSendTime: Instantiating.";
120 }
121 
ComputeClusters(std::list<Cluster> * clusters) const122 void RemoteBitrateEstimatorAbsSendTime::ComputeClusters(
123     std::list<Cluster>* clusters) const {
124   Cluster current;
125   int64_t prev_send_time = -1;
126   int64_t prev_recv_time = -1;
127   for (std::list<Probe>::const_iterator it = probes_.begin();
128        it != probes_.end();
129        ++it) {
130     if (prev_send_time >= 0) {
131       int send_delta_ms = it->send_time_ms - prev_send_time;
132       int recv_delta_ms = it->recv_time_ms - prev_recv_time;
133       if (send_delta_ms >= 1 && recv_delta_ms >= 1) {
134         ++current.num_above_min_delta;
135       }
136       if (!IsWithinClusterBounds(send_delta_ms, current)) {
137         if (current.count >= kMinClusterSize)
138           AddCluster(clusters, &current);
139         current = Cluster();
140       }
141       current.send_mean_ms += send_delta_ms;
142       current.recv_mean_ms += recv_delta_ms;
143       current.mean_size += it->payload_size;
144       ++current.count;
145     }
146     prev_send_time = it->send_time_ms;
147     prev_recv_time = it->recv_time_ms;
148   }
149   if (current.count >= kMinClusterSize)
150     AddCluster(clusters, &current);
151 }
152 
153 std::list<Cluster>::const_iterator
FindBestProbe(const std::list<Cluster> & clusters) const154 RemoteBitrateEstimatorAbsSendTime::FindBestProbe(
155     const std::list<Cluster>& clusters) const {
156   int highest_probe_bitrate_bps = 0;
157   std::list<Cluster>::const_iterator best_it = clusters.end();
158   for (std::list<Cluster>::const_iterator it = clusters.begin();
159        it != clusters.end();
160        ++it) {
161     if (it->send_mean_ms == 0 || it->recv_mean_ms == 0)
162       continue;
163     int send_bitrate_bps = it->mean_size * 8 * 1000 / it->send_mean_ms;
164     int recv_bitrate_bps = it->mean_size * 8 * 1000 / it->recv_mean_ms;
165     if (it->num_above_min_delta > it->count / 2 &&
166         (it->recv_mean_ms - it->send_mean_ms <= 2.0f &&
167          it->send_mean_ms - it->recv_mean_ms <= 5.0f)) {
168       int probe_bitrate_bps =
169           std::min(it->GetSendBitrateBps(), it->GetRecvBitrateBps());
170       if (probe_bitrate_bps > highest_probe_bitrate_bps) {
171         highest_probe_bitrate_bps = probe_bitrate_bps;
172         best_it = it;
173       }
174     } else {
175       LOG(LS_INFO) << "Probe failed, sent at " << send_bitrate_bps
176                    << " bps, received at " << recv_bitrate_bps
177                    << " bps. Mean send delta: " << it->send_mean_ms
178                    << " ms, mean recv delta: " << it->recv_mean_ms
179                    << " ms, num probes: " << it->count;
180       break;
181     }
182   }
183   return best_it;
184 }
185 
ProcessClusters(int64_t now_ms)186 void RemoteBitrateEstimatorAbsSendTime::ProcessClusters(int64_t now_ms) {
187   std::list<Cluster> clusters;
188   ComputeClusters(&clusters);
189   if (clusters.empty()) {
190     // If we reach the max number of probe packets and still have no clusters,
191     // we will remove the oldest one.
192     if (probes_.size() >= kMaxProbePackets)
193       probes_.pop_front();
194     return;
195   }
196 
197   std::list<Cluster>::const_iterator best_it = FindBestProbe(clusters);
198   if (best_it != clusters.end()) {
199     int probe_bitrate_bps =
200         std::min(best_it->GetSendBitrateBps(), best_it->GetRecvBitrateBps());
201     // Make sure that a probe sent on a lower bitrate than our estimate can't
202     // reduce the estimate.
203     if (IsBitrateImproving(probe_bitrate_bps) &&
204         probe_bitrate_bps > static_cast<int>(incoming_bitrate_.Rate(now_ms))) {
205       LOG(LS_INFO) << "Probe successful, sent at "
206                    << best_it->GetSendBitrateBps() << " bps, received at "
207                    << best_it->GetRecvBitrateBps()
208                    << " bps. Mean send delta: " << best_it->send_mean_ms
209                    << " ms, mean recv delta: " << best_it->recv_mean_ms
210                    << " ms, num probes: " << best_it->count;
211       remote_rate_.SetEstimate(probe_bitrate_bps, now_ms);
212     }
213   }
214 
215   // Not probing and received non-probe packet, or finished with current set
216   // of probes.
217   if (clusters.size() >= kExpectedNumberOfProbes)
218     probes_.clear();
219 }
220 
IsBitrateImproving(int new_bitrate_bps) const221 bool RemoteBitrateEstimatorAbsSendTime::IsBitrateImproving(
222     int new_bitrate_bps) const {
223   bool initial_probe = !remote_rate_.ValidEstimate() && new_bitrate_bps > 0;
224   bool bitrate_above_estimate =
225       remote_rate_.ValidEstimate() &&
226       new_bitrate_bps > static_cast<int>(remote_rate_.LatestEstimate());
227   return initial_probe || bitrate_above_estimate;
228 }
229 
IncomingPacketFeedbackVector(const std::vector<PacketInfo> & packet_feedback_vector)230 void RemoteBitrateEstimatorAbsSendTime::IncomingPacketFeedbackVector(
231     const std::vector<PacketInfo>& packet_feedback_vector) {
232   for (const auto& packet_info : packet_feedback_vector) {
233     IncomingPacketInfo(packet_info.arrival_time_ms,
234                        ConvertMsTo24Bits(packet_info.send_time_ms),
235                        packet_info.payload_size, 0, packet_info.was_paced);
236   }
237 }
238 
IncomingPacket(int64_t arrival_time_ms,size_t payload_size,const RTPHeader & header,bool was_paced)239 void RemoteBitrateEstimatorAbsSendTime::IncomingPacket(int64_t arrival_time_ms,
240                                                        size_t payload_size,
241                                                        const RTPHeader& header,
242                                                        bool was_paced) {
243   if (!header.extension.hasAbsoluteSendTime) {
244     LOG(LS_WARNING) << "RemoteBitrateEstimatorAbsSendTimeImpl: Incoming packet "
245                        "is missing absolute send time extension!";
246     return;
247   }
248   IncomingPacketInfo(arrival_time_ms, header.extension.absoluteSendTime,
249                      payload_size, header.ssrc, was_paced);
250 }
251 
IncomingPacketInfo(int64_t arrival_time_ms,uint32_t send_time_24bits,size_t payload_size,uint32_t ssrc,bool was_paced)252 void RemoteBitrateEstimatorAbsSendTime::IncomingPacketInfo(
253     int64_t arrival_time_ms,
254     uint32_t send_time_24bits,
255     size_t payload_size,
256     uint32_t ssrc,
257     bool was_paced) {
258   assert(send_time_24bits < (1ul << 24));
259   // Shift up send time to use the full 32 bits that inter_arrival works with,
260   // so wrapping works properly.
261   uint32_t timestamp = send_time_24bits << kAbsSendTimeInterArrivalUpshift;
262   int64_t send_time_ms = static_cast<int64_t>(timestamp) * kTimestampToMs;
263 
264   CriticalSectionScoped cs(crit_sect_.get());
265   int64_t now_ms = clock_->TimeInMilliseconds();
266   // TODO(holmer): SSRCs are only needed for REMB, should be broken out from
267   // here.
268   ssrcs_[ssrc] = now_ms;
269   incoming_bitrate_.Update(payload_size, now_ms);
270   const BandwidthUsage prior_state = detector_.State();
271 
272   if (first_packet_time_ms_ == -1)
273     first_packet_time_ms_ = clock_->TimeInMilliseconds();
274 
275   uint32_t ts_delta = 0;
276   int64_t t_delta = 0;
277   int size_delta = 0;
278   // For now only try to detect probes while we don't have a valid estimate, and
279   // make sure the packet was paced. We currently assume that only packets
280   // larger than 200 bytes are paced by the sender.
281   was_paced = was_paced && payload_size > PacedSender::kMinProbePacketSize;
282   if (was_paced &&
283       (!remote_rate_.ValidEstimate() ||
284        now_ms - first_packet_time_ms_ < kInitialProbingIntervalMs)) {
285     // TODO(holmer): Use a map instead to get correct order?
286     if (total_probes_received_ < kMaxProbePackets) {
287       int send_delta_ms = -1;
288       int recv_delta_ms = -1;
289       if (!probes_.empty()) {
290         send_delta_ms = send_time_ms - probes_.back().send_time_ms;
291         recv_delta_ms = arrival_time_ms - probes_.back().recv_time_ms;
292       }
293       LOG(LS_INFO) << "Probe packet received: send time=" << send_time_ms
294                    << " ms, recv time=" << arrival_time_ms
295                    << " ms, send delta=" << send_delta_ms
296                    << " ms, recv delta=" << recv_delta_ms << " ms.";
297     }
298     probes_.push_back(Probe(send_time_ms, arrival_time_ms, payload_size));
299     ++total_probes_received_;
300     ProcessClusters(now_ms);
301   }
302   if (!inter_arrival_.get()) {
303     inter_arrival_.reset(
304         new InterArrival((kTimestampGroupLengthMs << kInterArrivalShift) / 1000,
305                          kTimestampToMs, true));
306   }
307   if (inter_arrival_->ComputeDeltas(timestamp, arrival_time_ms, payload_size,
308                                     &ts_delta, &t_delta, &size_delta)) {
309     double ts_delta_ms = (1000.0 * ts_delta) / (1 << kInterArrivalShift);
310     estimator_.Update(t_delta, ts_delta_ms, size_delta, detector_.State());
311     detector_.Detect(estimator_.offset(), ts_delta_ms,
312                      estimator_.num_of_deltas(), arrival_time_ms);
313     UpdateStats(static_cast<int>(t_delta - ts_delta_ms), now_ms);
314   }
315   if (detector_.State() == kBwOverusing) {
316     uint32_t incoming_bitrate_bps = incoming_bitrate_.Rate(now_ms);
317     if (prior_state != kBwOverusing ||
318         remote_rate_.TimeToReduceFurther(now_ms, incoming_bitrate_bps)) {
319       // The first overuse should immediately trigger a new estimate.
320       // We also have to update the estimate immediately if we are overusing
321       // and the target bitrate is too high compared to what we are receiving.
322       UpdateEstimate(now_ms);
323     }
324   }
325 }
326 
Process()327 int32_t RemoteBitrateEstimatorAbsSendTime::Process() {
328   if (TimeUntilNextProcess() > 0) {
329     return 0;
330   }
331   {
332     CriticalSectionScoped cs(crit_sect_.get());
333     UpdateEstimate(clock_->TimeInMilliseconds());
334   }
335   last_process_time_ = clock_->TimeInMilliseconds();
336   return 0;
337 }
338 
TimeUntilNextProcess()339 int64_t RemoteBitrateEstimatorAbsSendTime::TimeUntilNextProcess() {
340   if (last_process_time_ < 0) {
341     return 0;
342   }
343   {
344     CriticalSectionScoped cs(crit_sect_.get());
345     return last_process_time_ + process_interval_ms_ -
346         clock_->TimeInMilliseconds();
347   }
348 }
349 
UpdateEstimate(int64_t now_ms)350 void RemoteBitrateEstimatorAbsSendTime::UpdateEstimate(int64_t now_ms) {
351   if (!inter_arrival_.get()) {
352     // No packets have been received on the active streams.
353     return;
354   }
355   for (Ssrcs::iterator it = ssrcs_.begin(); it != ssrcs_.end();) {
356     if ((now_ms - it->second) > kStreamTimeOutMs) {
357       ssrcs_.erase(it++);
358     } else {
359       ++it;
360     }
361   }
362   if (ssrcs_.empty()) {
363     // We can't update the estimate if we don't have any active streams.
364     inter_arrival_.reset();
365     // We deliberately don't reset the first_packet_time_ms_ here for now since
366     // we only probe for bandwidth in the beginning of a call right now.
367     return;
368   }
369 
370   const RateControlInput input(detector_.State(),
371                                incoming_bitrate_.Rate(now_ms),
372                                estimator_.var_noise());
373   remote_rate_.Update(&input, now_ms);
374   unsigned int target_bitrate = remote_rate_.UpdateBandwidthEstimate(now_ms);
375   if (remote_rate_.ValidEstimate()) {
376     process_interval_ms_ = remote_rate_.GetFeedbackInterval();
377     observer_->OnReceiveBitrateChanged(Keys(ssrcs_), target_bitrate);
378   }
379 }
380 
OnRttUpdate(int64_t avg_rtt_ms,int64_t max_rtt_ms)381 void RemoteBitrateEstimatorAbsSendTime::OnRttUpdate(int64_t avg_rtt_ms,
382                                                     int64_t max_rtt_ms) {
383   CriticalSectionScoped cs(crit_sect_.get());
384   remote_rate_.SetRtt(avg_rtt_ms);
385 }
386 
RemoveStream(unsigned int ssrc)387 void RemoteBitrateEstimatorAbsSendTime::RemoveStream(unsigned int ssrc) {
388   CriticalSectionScoped cs(crit_sect_.get());
389   ssrcs_.erase(ssrc);
390 }
391 
LatestEstimate(std::vector<unsigned int> * ssrcs,unsigned int * bitrate_bps) const392 bool RemoteBitrateEstimatorAbsSendTime::LatestEstimate(
393     std::vector<unsigned int>* ssrcs,
394     unsigned int* bitrate_bps) const {
395   CriticalSectionScoped cs(crit_sect_.get());
396   assert(ssrcs);
397   assert(bitrate_bps);
398   if (!remote_rate_.ValidEstimate()) {
399     return false;
400   }
401   *ssrcs = Keys(ssrcs_);
402   if (ssrcs_.empty()) {
403     *bitrate_bps = 0;
404   } else {
405     *bitrate_bps = remote_rate_.LatestEstimate();
406   }
407   return true;
408 }
409 
GetStats(ReceiveBandwidthEstimatorStats * output) const410 bool RemoteBitrateEstimatorAbsSendTime::GetStats(
411     ReceiveBandwidthEstimatorStats* output) const {
412   {
413     CriticalSectionScoped cs(crit_sect_.get());
414     output->recent_propagation_time_delta_ms = recent_propagation_delta_ms_;
415     output->recent_arrival_time_ms = recent_update_time_ms_;
416     output->total_propagation_time_delta_ms = total_propagation_delta_ms_;
417   }
418   RemoveStaleEntries(
419       &output->recent_arrival_time_ms,
420       &output->recent_propagation_time_delta_ms,
421       clock_->TimeInMilliseconds() - kPropagationDeltaQueueMaxTimeMs);
422   return true;
423 }
424 
UpdateStats(int propagation_delta_ms,int64_t now_ms)425 void RemoteBitrateEstimatorAbsSendTime::UpdateStats(int propagation_delta_ms,
426                                                     int64_t now_ms) {
427   // The caller must enter crit_sect_ before the call.
428 
429   // Remove the oldest entry if the size limit is reached.
430   if (recent_update_time_ms_.size() == kPropagationDeltaQueueMaxSize) {
431     recent_update_time_ms_.erase(recent_update_time_ms_.begin());
432     recent_propagation_delta_ms_.erase(recent_propagation_delta_ms_.begin());
433   }
434 
435   recent_propagation_delta_ms_.push_back(propagation_delta_ms);
436   recent_update_time_ms_.push_back(now_ms);
437 
438   RemoveStaleEntries(
439       &recent_update_time_ms_,
440       &recent_propagation_delta_ms_,
441       now_ms - kPropagationDeltaQueueMaxTimeMs);
442 
443   total_propagation_delta_ms_ =
444       std::max(total_propagation_delta_ms_ + propagation_delta_ms, 0);
445 }
446 
SetMinBitrate(int min_bitrate_bps)447 void RemoteBitrateEstimatorAbsSendTime::SetMinBitrate(int min_bitrate_bps) {
448   CriticalSectionScoped cs(crit_sect_.get());
449   remote_rate_.SetMinBitrate(min_bitrate_bps);
450 }
451 }  // namespace webrtc
452