1 /*
2  *  Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "webrtc/modules/remote_bitrate_estimator/test/bwe_test_framework.h"
12 
13 #include <stdio.h>
14 
15 #include <sstream>
16 
17 namespace webrtc {
18 namespace testing {
19 namespace bwe {
20 
21 class DelayCapHelper {
22  public:
23   // Max delay = 0 stands for +infinite.
DelayCapHelper()24   DelayCapHelper() : max_delay_us_(0), delay_stats_() {}
25 
set_max_delay_ms(int64_t max_delay_ms)26   void set_max_delay_ms(int64_t max_delay_ms) {
27     BWE_TEST_LOGGING_ENABLE(false);
28     BWE_TEST_LOGGING_LOG1("Max Delay", "%d ms", static_cast<int>(max_delay_ms));
29     assert(max_delay_ms >= 0);
30     max_delay_us_ = max_delay_ms * 1000;
31   }
32 
ShouldSendPacket(int64_t send_time_us,int64_t arrival_time_us)33   bool ShouldSendPacket(int64_t send_time_us, int64_t arrival_time_us) {
34     int64_t packet_delay_us = send_time_us - arrival_time_us;
35     delay_stats_.Push((std::min(packet_delay_us, max_delay_us_) + 500) / 1000);
36     return (max_delay_us_ == 0 || max_delay_us_ >= packet_delay_us);
37   }
38 
delay_stats() const39   const Stats<double>& delay_stats() const {
40     return delay_stats_;
41   }
42 
43  private:
44   int64_t max_delay_us_;
45   Stats<double> delay_stats_;
46 
47   RTC_DISALLOW_COPY_AND_ASSIGN(DelayCapHelper);
48 };
49 
CreateFlowIds(const int * flow_ids_array,size_t num_flow_ids)50 const FlowIds CreateFlowIds(const int *flow_ids_array, size_t num_flow_ids) {
51   FlowIds flow_ids(&flow_ids_array[0], flow_ids_array + num_flow_ids);
52   return flow_ids;
53 }
54 
CreateFlowIdRange(int initial_value,int last_value)55 const FlowIds CreateFlowIdRange(int initial_value, int last_value) {
56   int size = last_value - initial_value + 1;
57   assert(size > 0);
58   int* flow_ids_array = new int[size];
59   for (int i = initial_value; i <= last_value; ++i) {
60     flow_ids_array[i - initial_value] = i;
61   }
62   return CreateFlowIds(flow_ids_array, size);
63 }
64 
UpdateRates(int64_t send_time_us,uint32_t payload_size)65 void RateCounter::UpdateRates(int64_t send_time_us, uint32_t payload_size) {
66   ++recently_received_packets_;
67   recently_received_bytes_ += payload_size;
68   last_accumulated_us_ = send_time_us;
69   window_.push_back(std::make_pair(send_time_us, payload_size));
70   while (!window_.empty()) {
71     const TimeSizePair& packet = window_.front();
72     if (packet.first > (last_accumulated_us_ - window_size_us_)) {
73       break;
74     }
75     assert(recently_received_packets_ >= 1);
76     assert(recently_received_bytes_ >= packet.second);
77     --recently_received_packets_;
78     recently_received_bytes_ -= packet.second;
79     window_.pop_front();
80   }
81 }
82 
bits_per_second() const83 uint32_t RateCounter::bits_per_second() const {
84   return (8 * recently_received_bytes_) / BitrateWindowS();
85 }
86 
packets_per_second() const87 uint32_t RateCounter::packets_per_second() const {
88   return recently_received_packets_ / BitrateWindowS();
89 }
90 
BitrateWindowS() const91 double RateCounter::BitrateWindowS() const {
92   return static_cast<double>(window_size_us_) / (1000 * 1000);
93 }
94 
Packet()95 Packet::Packet()
96     : flow_id_(0),
97       creation_time_us_(-1),
98       send_time_us_(-1),
99       sender_timestamp_us_(-1),
100       payload_size_(0),
101       paced_(false) {
102 }
103 
Packet(int flow_id,int64_t send_time_us,size_t payload_size)104 Packet::Packet(int flow_id, int64_t send_time_us, size_t payload_size)
105     : flow_id_(flow_id),
106       creation_time_us_(send_time_us),
107       send_time_us_(send_time_us),
108       sender_timestamp_us_(send_time_us),
109       payload_size_(payload_size),
110       paced_(false) {
111 }
112 
~Packet()113 Packet::~Packet() {
114 }
115 
operator <(const Packet & rhs) const116 bool Packet::operator<(const Packet& rhs) const {
117   return send_time_us_ < rhs.send_time_us_;
118 }
119 
set_send_time_us(int64_t send_time_us)120 void Packet::set_send_time_us(int64_t send_time_us) {
121   assert(send_time_us >= 0);
122   send_time_us_ = send_time_us;
123 }
124 
MediaPacket()125 MediaPacket::MediaPacket() {
126   memset(&header_, 0, sizeof(header_));
127 }
128 
MediaPacket(int flow_id,int64_t send_time_us,size_t payload_size,uint16_t sequence_number)129 MediaPacket::MediaPacket(int flow_id,
130                          int64_t send_time_us,
131                          size_t payload_size,
132                          uint16_t sequence_number)
133     : Packet(flow_id, send_time_us, payload_size) {
134   header_ = RTPHeader();
135   header_.sequenceNumber = sequence_number;
136 }
137 
MediaPacket(int flow_id,int64_t send_time_us,size_t payload_size,const RTPHeader & header)138 MediaPacket::MediaPacket(int flow_id,
139                          int64_t send_time_us,
140                          size_t payload_size,
141                          const RTPHeader& header)
142     : Packet(flow_id, send_time_us, payload_size), header_(header) {
143 }
144 
MediaPacket(int64_t send_time_us,uint16_t sequence_number)145 MediaPacket::MediaPacket(int64_t send_time_us, uint16_t sequence_number)
146     : Packet(0, send_time_us, 0) {
147   header_ = RTPHeader();
148   header_.sequenceNumber = sequence_number;
149 }
150 
SetAbsSendTimeMs(int64_t abs_send_time_ms)151 void MediaPacket::SetAbsSendTimeMs(int64_t abs_send_time_ms) {
152   header_.extension.hasAbsoluteSendTime = true;
153   header_.extension.absoluteSendTime = ((static_cast<int64_t>(abs_send_time_ms *
154     (1 << 18)) + 500) / 1000) & 0x00fffffful;
155 }
156 
RembFeedback(int flow_id,int64_t send_time_us,int64_t last_send_time_ms,uint32_t estimated_bps,RTCPReportBlock report_block)157 RembFeedback::RembFeedback(int flow_id,
158                            int64_t send_time_us,
159                            int64_t last_send_time_ms,
160                            uint32_t estimated_bps,
161                            RTCPReportBlock report_block)
162     : FeedbackPacket(flow_id, send_time_us, last_send_time_ms),
163       estimated_bps_(estimated_bps),
164       report_block_(report_block) {
165 }
166 
SendSideBweFeedback(int flow_id,int64_t send_time_us,int64_t last_send_time_ms,const std::vector<PacketInfo> & packet_feedback_vector)167 SendSideBweFeedback::SendSideBweFeedback(
168     int flow_id,
169     int64_t send_time_us,
170     int64_t last_send_time_ms,
171     const std::vector<PacketInfo>& packet_feedback_vector)
172     : FeedbackPacket(flow_id, send_time_us, last_send_time_ms),
173       packet_feedback_vector_(packet_feedback_vector) {
174 }
175 
IsTimeSorted(const Packets & packets)176 bool IsTimeSorted(const Packets& packets) {
177   PacketsConstIt last_it = packets.begin();
178   for (PacketsConstIt it = last_it; it != packets.end(); ++it) {
179     if (it != last_it && **it < **last_it) {
180       return false;
181     }
182     last_it = it;
183   }
184   return true;
185 }
186 
PacketProcessor(PacketProcessorListener * listener,int flow_id,ProcessorType type)187 PacketProcessor::PacketProcessor(PacketProcessorListener* listener,
188                                  int flow_id,
189                                  ProcessorType type)
190     : listener_(listener), flow_ids_(&flow_id, &flow_id + 1) {
191   if (listener_) {
192     listener_->AddPacketProcessor(this, type);
193   }
194 }
195 
PacketProcessor(PacketProcessorListener * listener,const FlowIds & flow_ids,ProcessorType type)196 PacketProcessor::PacketProcessor(PacketProcessorListener* listener,
197                                  const FlowIds& flow_ids,
198                                  ProcessorType type)
199     : listener_(listener), flow_ids_(flow_ids) {
200   if (listener_) {
201     listener_->AddPacketProcessor(this, type);
202   }
203 }
204 
~PacketProcessor()205 PacketProcessor::~PacketProcessor() {
206   if (listener_) {
207     listener_->RemovePacketProcessor(this);
208   }
209 }
210 
packets_per_second() const211 uint32_t PacketProcessor::packets_per_second() const {
212   return rate_counter_.packets_per_second();
213 }
214 
bits_per_second() const215 uint32_t PacketProcessor::bits_per_second() const {
216   return rate_counter_.bits_per_second();
217 }
218 
RateCounterFilter(PacketProcessorListener * listener,int flow_id,const char * name,const std::string & plot_name)219 RateCounterFilter::RateCounterFilter(PacketProcessorListener* listener,
220                                      int flow_id,
221                                      const char* name,
222                                      const std::string& plot_name)
223     : PacketProcessor(listener, flow_id, kRegular),
224       packets_per_second_stats_(),
225       kbps_stats_(),
226       start_plotting_time_ms_(0),
227       plot_name_(plot_name) {
228   std::stringstream ss;
229   ss << name << "_" << flow_id;
230   name_ = ss.str();
231 }
232 
RateCounterFilter(PacketProcessorListener * listener,const FlowIds & flow_ids,const char * name,const std::string & plot_name)233 RateCounterFilter::RateCounterFilter(PacketProcessorListener* listener,
234                                      const FlowIds& flow_ids,
235                                      const char* name,
236                                      const std::string& plot_name)
237     : PacketProcessor(listener, flow_ids, kRegular),
238       packets_per_second_stats_(),
239       kbps_stats_(),
240       start_plotting_time_ms_(0),
241       plot_name_(plot_name) {
242   std::stringstream ss;
243   ss << name;
244   char delimiter = '_';
245   for (int flow_id : flow_ids) {
246     ss << delimiter << flow_id;
247     delimiter = ',';
248   }
249   name_ = ss.str();
250 }
251 
RateCounterFilter(PacketProcessorListener * listener,const FlowIds & flow_ids,const char * name,int64_t start_plotting_time_ms,const std::string & plot_name)252 RateCounterFilter::RateCounterFilter(PacketProcessorListener* listener,
253                                      const FlowIds& flow_ids,
254                                      const char* name,
255                                      int64_t start_plotting_time_ms,
256                                      const std::string& plot_name)
257     : RateCounterFilter(listener, flow_ids, name, plot_name) {
258   start_plotting_time_ms_ = start_plotting_time_ms;
259 }
260 
~RateCounterFilter()261 RateCounterFilter::~RateCounterFilter() {
262   LogStats();
263 }
264 
265 
LogStats()266 void RateCounterFilter::LogStats() {
267   BWE_TEST_LOGGING_CONTEXT("RateCounterFilter");
268   packets_per_second_stats_.Log("pps");
269   kbps_stats_.Log("kbps");
270 }
271 
GetBitrateStats() const272 Stats<double> RateCounterFilter::GetBitrateStats() const {
273   return kbps_stats_;
274 }
275 
Plot(int64_t timestamp_ms)276 void RateCounterFilter::Plot(int64_t timestamp_ms) {
277   uint32_t plot_kbps = 0;
278   if (timestamp_ms >= start_plotting_time_ms_) {
279     plot_kbps = rate_counter_.bits_per_second() / 1000.0;
280   }
281   BWE_TEST_LOGGING_CONTEXT(name_.c_str());
282   if (plot_name_.empty()) {
283     BWE_TEST_LOGGING_PLOT(0, "Throughput_kbps#1", timestamp_ms, plot_kbps);
284   } else {
285     BWE_TEST_LOGGING_PLOT_WITH_NAME(0, "Throughput_kbps#1", timestamp_ms,
286                                     plot_kbps, plot_name_);
287   }
288 
289   RTC_UNUSED(plot_kbps);
290 }
291 
RunFor(int64_t,Packets * in_out)292 void RateCounterFilter::RunFor(int64_t /*time_ms*/, Packets* in_out) {
293   assert(in_out);
294   for (const Packet* packet : *in_out) {
295     rate_counter_.UpdateRates(packet->send_time_us(),
296                               static_cast<int>(packet->payload_size()));
297   }
298   packets_per_second_stats_.Push(rate_counter_.packets_per_second());
299   kbps_stats_.Push(rate_counter_.bits_per_second() / 1000.0);
300 }
301 
LossFilter(PacketProcessorListener * listener,int flow_id)302 LossFilter::LossFilter(PacketProcessorListener* listener, int flow_id)
303     : PacketProcessor(listener, flow_id, kRegular),
304       random_(0x12345678),
305       loss_fraction_(0.0f) {
306 }
307 
LossFilter(PacketProcessorListener * listener,const FlowIds & flow_ids)308 LossFilter::LossFilter(PacketProcessorListener* listener,
309                        const FlowIds& flow_ids)
310     : PacketProcessor(listener, flow_ids, kRegular),
311       random_(0x12345678),
312       loss_fraction_(0.0f) {
313 }
314 
SetLoss(float loss_percent)315 void LossFilter::SetLoss(float loss_percent) {
316   BWE_TEST_LOGGING_ENABLE(false);
317   BWE_TEST_LOGGING_LOG1("Loss", "%f%%", loss_percent);
318   assert(loss_percent >= 0.0f);
319   assert(loss_percent <= 100.0f);
320   loss_fraction_ = loss_percent * 0.01f;
321 }
322 
RunFor(int64_t,Packets * in_out)323 void LossFilter::RunFor(int64_t /*time_ms*/, Packets* in_out) {
324   assert(in_out);
325   for (PacketsIt it = in_out->begin(); it != in_out->end(); ) {
326     if (random_.Rand<float>() < loss_fraction_) {
327       delete *it;
328       it = in_out->erase(it);
329     } else {
330       ++it;
331     }
332   }
333 }
334 
335 const int64_t kDefaultOneWayDelayUs = 0;
336 
DelayFilter(PacketProcessorListener * listener,int flow_id)337 DelayFilter::DelayFilter(PacketProcessorListener* listener, int flow_id)
338     : PacketProcessor(listener, flow_id, kRegular),
339       one_way_delay_us_(kDefaultOneWayDelayUs),
340       last_send_time_us_(0) {
341 }
342 
DelayFilter(PacketProcessorListener * listener,const FlowIds & flow_ids)343 DelayFilter::DelayFilter(PacketProcessorListener* listener,
344                          const FlowIds& flow_ids)
345     : PacketProcessor(listener, flow_ids, kRegular),
346       one_way_delay_us_(kDefaultOneWayDelayUs),
347       last_send_time_us_(0) {
348 }
349 
SetOneWayDelayMs(int64_t one_way_delay_ms)350 void DelayFilter::SetOneWayDelayMs(int64_t one_way_delay_ms) {
351   BWE_TEST_LOGGING_ENABLE(false);
352   BWE_TEST_LOGGING_LOG1("Delay", "%d ms", static_cast<int>(one_way_delay_ms));
353   assert(one_way_delay_ms >= 0);
354   one_way_delay_us_ = one_way_delay_ms * 1000;
355 }
356 
RunFor(int64_t,Packets * in_out)357 void DelayFilter::RunFor(int64_t /*time_ms*/, Packets* in_out) {
358   assert(in_out);
359   for (Packet* packet : *in_out) {
360     int64_t new_send_time_us = packet->send_time_us() + one_way_delay_us_;
361     last_send_time_us_ = std::max(last_send_time_us_, new_send_time_us);
362     packet->set_send_time_us(last_send_time_us_);
363   }
364 }
365 
JitterFilter(PacketProcessorListener * listener,int flow_id)366 JitterFilter::JitterFilter(PacketProcessorListener* listener, int flow_id)
367     : PacketProcessor(listener, flow_id, kRegular),
368       random_(0x89674523),
369       stddev_jitter_us_(0),
370       last_send_time_us_(0),
371       reordering_(false) {
372 }
373 
JitterFilter(PacketProcessorListener * listener,const FlowIds & flow_ids)374 JitterFilter::JitterFilter(PacketProcessorListener* listener,
375                            const FlowIds& flow_ids)
376     : PacketProcessor(listener, flow_ids, kRegular),
377       random_(0x89674523),
378       stddev_jitter_us_(0),
379       last_send_time_us_(0),
380       reordering_(false) {
381 }
382 
383 const int kN = 3;  // Truncated N sigma gaussian.
384 
SetMaxJitter(int64_t max_jitter_ms)385 void JitterFilter::SetMaxJitter(int64_t max_jitter_ms) {
386   BWE_TEST_LOGGING_ENABLE(false);
387   BWE_TEST_LOGGING_LOG1("Max Jitter", "%d ms", static_cast<int>(max_jitter_ms));
388   assert(max_jitter_ms >= 0);
389   // Truncated gaussian, Max jitter = kN*sigma.
390   stddev_jitter_us_ = (max_jitter_ms * 1000 + kN / 2) / kN;
391 }
392 
393 namespace {
TruncatedNSigmaGaussian(Random * const random,int64_t mean,int64_t std_dev)394 inline int64_t TruncatedNSigmaGaussian(Random* const random,
395                                        int64_t mean,
396                                        int64_t std_dev) {
397   int64_t gaussian_random = random->Gaussian(mean, std_dev);
398   return std::max(std::min(gaussian_random, kN * std_dev), -kN * std_dev);
399 }
400 }
401 
RunFor(int64_t,Packets * in_out)402 void JitterFilter::RunFor(int64_t /*time_ms*/, Packets* in_out) {
403   assert(in_out);
404   for (Packet* packet : *in_out) {
405     int64_t jitter_us =
406         std::abs(TruncatedNSigmaGaussian(&random_, 0, stddev_jitter_us_));
407     int64_t new_send_time_us = packet->send_time_us() + jitter_us;
408 
409     if (!reordering_) {
410       new_send_time_us = std::max(last_send_time_us_, new_send_time_us);
411     }
412 
413     // Receiver timestamp cannot be lower than sender timestamp.
414     assert(new_send_time_us >= packet->sender_timestamp_us());
415 
416     packet->set_send_time_us(new_send_time_us);
417     last_send_time_us_ = new_send_time_us;
418   }
419 }
420 
421 // Computes the expected value for a right sided (abs) truncated gaussian.
422 // Does not take into account  possible reoerdering updates.
MeanUs()423 int64_t JitterFilter::MeanUs() {
424   const double kPi = 3.1415926535897932;
425   double max_jitter_us = static_cast<double>(kN * stddev_jitter_us_);
426   double right_sided_mean_us =
427       static_cast<double>(stddev_jitter_us_) / sqrt(kPi / 2.0);
428   double truncated_mean_us =
429       right_sided_mean_us *
430           (1.0 - exp(-pow(static_cast<double>(kN), 2.0) / 2.0)) +
431       max_jitter_us * erfc(static_cast<double>(kN));
432   return static_cast<int64_t>(truncated_mean_us + 0.5);
433 }
434 
ReorderFilter(PacketProcessorListener * listener,int flow_id)435 ReorderFilter::ReorderFilter(PacketProcessorListener* listener, int flow_id)
436     : PacketProcessor(listener, flow_id, kRegular),
437       random_(0x27452389),
438       reorder_fraction_(0.0f) {
439 }
440 
ReorderFilter(PacketProcessorListener * listener,const FlowIds & flow_ids)441 ReorderFilter::ReorderFilter(PacketProcessorListener* listener,
442                              const FlowIds& flow_ids)
443     : PacketProcessor(listener, flow_ids, kRegular),
444       random_(0x27452389),
445       reorder_fraction_(0.0f) {
446 }
447 
SetReorder(float reorder_percent)448 void ReorderFilter::SetReorder(float reorder_percent) {
449   BWE_TEST_LOGGING_ENABLE(false);
450   BWE_TEST_LOGGING_LOG1("Reordering", "%f%%", reorder_percent);
451   assert(reorder_percent >= 0.0f);
452   assert(reorder_percent <= 100.0f);
453   reorder_fraction_ = reorder_percent * 0.01f;
454 }
455 
RunFor(int64_t,Packets * in_out)456 void ReorderFilter::RunFor(int64_t /*time_ms*/, Packets* in_out) {
457   assert(in_out);
458   if (in_out->size() >= 2) {
459     PacketsIt last_it = in_out->begin();
460     PacketsIt it = last_it;
461     while (++it != in_out->end()) {
462       if (random_.Rand<float>() < reorder_fraction_) {
463         int64_t t1 = (*last_it)->send_time_us();
464         int64_t t2 = (*it)->send_time_us();
465         std::swap(*last_it, *it);
466         (*last_it)->set_send_time_us(t1);
467         (*it)->set_send_time_us(t2);
468       }
469       last_it = it;
470     }
471   }
472 }
473 
474 const uint32_t kDefaultKbps = 1200;
475 
ChokeFilter(PacketProcessorListener * listener,int flow_id)476 ChokeFilter::ChokeFilter(PacketProcessorListener* listener, int flow_id)
477     : PacketProcessor(listener, flow_id, kRegular),
478       capacity_kbps_(kDefaultKbps),
479       last_send_time_us_(0),
480       delay_cap_helper_(new DelayCapHelper()) {
481 }
482 
ChokeFilter(PacketProcessorListener * listener,const FlowIds & flow_ids)483 ChokeFilter::ChokeFilter(PacketProcessorListener* listener,
484                          const FlowIds& flow_ids)
485     : PacketProcessor(listener, flow_ids, kRegular),
486       capacity_kbps_(kDefaultKbps),
487       last_send_time_us_(0),
488       delay_cap_helper_(new DelayCapHelper()) {
489 }
490 
~ChokeFilter()491 ChokeFilter::~ChokeFilter() {}
492 
set_capacity_kbps(uint32_t kbps)493 void ChokeFilter::set_capacity_kbps(uint32_t kbps) {
494   BWE_TEST_LOGGING_ENABLE(false);
495   BWE_TEST_LOGGING_LOG1("BitrateChoke", "%d kbps", kbps);
496   capacity_kbps_ = kbps;
497 }
498 
capacity_kbps()499 uint32_t ChokeFilter::capacity_kbps() {
500   return capacity_kbps_;
501 }
502 
RunFor(int64_t,Packets * in_out)503 void ChokeFilter::RunFor(int64_t /*time_ms*/, Packets* in_out) {
504   assert(in_out);
505   for (PacketsIt it = in_out->begin(); it != in_out->end(); ) {
506     int64_t earliest_send_time_us =
507         std::max(last_send_time_us_, (*it)->send_time_us());
508 
509     int64_t new_send_time_us =
510         earliest_send_time_us +
511         ((*it)->payload_size() * 8 * 1000 + capacity_kbps_ / 2) /
512             capacity_kbps_;
513 
514     if (delay_cap_helper_->ShouldSendPacket(new_send_time_us,
515                                             (*it)->send_time_us())) {
516       (*it)->set_send_time_us(new_send_time_us);
517       last_send_time_us_ = new_send_time_us;
518       ++it;
519     } else {
520       delete *it;
521       it = in_out->erase(it);
522     }
523   }
524 }
525 
set_max_delay_ms(int64_t max_delay_ms)526 void ChokeFilter::set_max_delay_ms(int64_t max_delay_ms) {
527   delay_cap_helper_->set_max_delay_ms(max_delay_ms);
528 }
529 
GetDelayStats() const530 Stats<double> ChokeFilter::GetDelayStats() const {
531   return delay_cap_helper_->delay_stats();
532 }
533 
TraceBasedDeliveryFilter(PacketProcessorListener * listener,int flow_id)534 TraceBasedDeliveryFilter::TraceBasedDeliveryFilter(
535     PacketProcessorListener* listener,
536     int flow_id)
537     : PacketProcessor(listener, flow_id, kRegular),
538       current_offset_us_(0),
539       delivery_times_us_(),
540       next_delivery_it_(),
541       local_time_us_(-1),
542       rate_counter_(new RateCounter),
543       name_(""),
544       delay_cap_helper_(new DelayCapHelper()),
545       packets_per_second_stats_(),
546       kbps_stats_() {
547 }
548 
TraceBasedDeliveryFilter(PacketProcessorListener * listener,const FlowIds & flow_ids)549 TraceBasedDeliveryFilter::TraceBasedDeliveryFilter(
550     PacketProcessorListener* listener,
551     const FlowIds& flow_ids)
552     : PacketProcessor(listener, flow_ids, kRegular),
553       current_offset_us_(0),
554       delivery_times_us_(),
555       next_delivery_it_(),
556       local_time_us_(-1),
557       rate_counter_(new RateCounter),
558       name_(""),
559       delay_cap_helper_(new DelayCapHelper()),
560       packets_per_second_stats_(),
561       kbps_stats_() {
562 }
563 
TraceBasedDeliveryFilter(PacketProcessorListener * listener,int flow_id,const char * name)564 TraceBasedDeliveryFilter::TraceBasedDeliveryFilter(
565     PacketProcessorListener* listener,
566     int flow_id,
567     const char* name)
568     : PacketProcessor(listener, flow_id, kRegular),
569       current_offset_us_(0),
570       delivery_times_us_(),
571       next_delivery_it_(),
572       local_time_us_(-1),
573       rate_counter_(new RateCounter),
574       name_(name),
575       delay_cap_helper_(new DelayCapHelper()),
576       packets_per_second_stats_(),
577       kbps_stats_() {
578 }
579 
~TraceBasedDeliveryFilter()580 TraceBasedDeliveryFilter::~TraceBasedDeliveryFilter() {
581 }
582 
Init(const std::string & filename)583 bool TraceBasedDeliveryFilter::Init(const std::string& filename) {
584   FILE* trace_file = fopen(filename.c_str(), "r");
585   if (!trace_file) {
586     return false;
587   }
588   int64_t first_timestamp = -1;
589   while (!feof(trace_file)) {
590     const size_t kMaxLineLength = 100;
591     char line[kMaxLineLength];
592     if (fgets(line, kMaxLineLength, trace_file)) {
593       std::string line_string(line);
594       std::istringstream buffer(line_string);
595       int64_t timestamp;
596       buffer >> timestamp;
597       timestamp /= 1000;  // Convert to microseconds.
598       if (first_timestamp == -1)
599         first_timestamp = timestamp;
600       assert(delivery_times_us_.empty() ||
601              timestamp - first_timestamp - delivery_times_us_.back() >= 0);
602       delivery_times_us_.push_back(timestamp - first_timestamp);
603     }
604   }
605   assert(!delivery_times_us_.empty());
606   next_delivery_it_ = delivery_times_us_.begin();
607   fclose(trace_file);
608   return true;
609 }
610 
Plot(int64_t timestamp_ms)611 void TraceBasedDeliveryFilter::Plot(int64_t timestamp_ms) {
612   BWE_TEST_LOGGING_CONTEXT(name_.c_str());
613   // This plots the max possible throughput of the trace-based delivery filter,
614   // which will be reached if a packet sent on every packet slot of the trace.
615   BWE_TEST_LOGGING_PLOT(0, "MaxThroughput_#1", timestamp_ms,
616                         rate_counter_->bits_per_second() / 1000.0);
617 }
618 
RunFor(int64_t time_ms,Packets * in_out)619 void TraceBasedDeliveryFilter::RunFor(int64_t time_ms, Packets* in_out) {
620   assert(in_out);
621   for (PacketsIt it = in_out->begin(); it != in_out->end();) {
622     while (local_time_us_ < (*it)->send_time_us()) {
623       ProceedToNextSlot();
624     }
625     // Drop any packets that have been queued for too long.
626     while (!delay_cap_helper_->ShouldSendPacket(local_time_us_,
627                                                 (*it)->send_time_us())) {
628       delete *it;
629       it = in_out->erase(it);
630       if (it == in_out->end()) {
631         return;
632       }
633     }
634     if (local_time_us_ >= (*it)->send_time_us()) {
635       (*it)->set_send_time_us(local_time_us_);
636       ProceedToNextSlot();
637     }
638     ++it;
639   }
640   packets_per_second_stats_.Push(rate_counter_->packets_per_second());
641   kbps_stats_.Push(rate_counter_->bits_per_second() / 1000.0);
642 }
643 
set_max_delay_ms(int64_t max_delay_ms)644 void TraceBasedDeliveryFilter::set_max_delay_ms(int64_t max_delay_ms) {
645   delay_cap_helper_->set_max_delay_ms(max_delay_ms);
646 }
647 
GetDelayStats() const648 Stats<double> TraceBasedDeliveryFilter::GetDelayStats() const {
649   return delay_cap_helper_->delay_stats();
650 }
651 
GetBitrateStats() const652 Stats<double> TraceBasedDeliveryFilter::GetBitrateStats() const {
653   return kbps_stats_;
654 }
655 
ProceedToNextSlot()656 void TraceBasedDeliveryFilter::ProceedToNextSlot() {
657   if (*next_delivery_it_ <= local_time_us_) {
658     ++next_delivery_it_;
659     if (next_delivery_it_ == delivery_times_us_.end()) {
660       // When the trace wraps we allow two packets to be sent back-to-back.
661       for (int64_t& delivery_time_us : delivery_times_us_) {
662         delivery_time_us += local_time_us_ - current_offset_us_;
663       }
664       current_offset_us_ += local_time_us_ - current_offset_us_;
665       next_delivery_it_ = delivery_times_us_.begin();
666     }
667   }
668   local_time_us_ = *next_delivery_it_;
669   const int kPayloadSize = 1200;
670   rate_counter_->UpdateRates(local_time_us_, kPayloadSize);
671 }
672 
VideoSource(int flow_id,float fps,uint32_t kbps,uint32_t ssrc,int64_t first_frame_offset_ms)673 VideoSource::VideoSource(int flow_id,
674                          float fps,
675                          uint32_t kbps,
676                          uint32_t ssrc,
677                          int64_t first_frame_offset_ms)
678     : kMaxPayloadSizeBytes(1200),
679       kTimestampBase(0xff80ff00ul),
680       frame_period_ms_(1000.0 / fps),
681       bits_per_second_(1000 * kbps),
682       frame_size_bytes_(bits_per_second_ / 8 / fps),
683       random_(0x12345678),
684       flow_id_(flow_id),
685       next_frame_ms_(first_frame_offset_ms),
686       next_frame_rand_ms_(0),
687       now_ms_(0),
688       prototype_header_() {
689   memset(&prototype_header_, 0, sizeof(prototype_header_));
690   prototype_header_.ssrc = ssrc;
691   prototype_header_.sequenceNumber = 0xf000u;
692 }
693 
NextFrameSize()694 uint32_t VideoSource::NextFrameSize() {
695   return frame_size_bytes_;
696 }
697 
GetTimeUntilNextFrameMs() const698 int64_t VideoSource::GetTimeUntilNextFrameMs() const {
699   return next_frame_ms_ + next_frame_rand_ms_ - now_ms_;
700 }
701 
NextPacketSize(uint32_t frame_size,uint32_t remaining_payload)702 uint32_t VideoSource::NextPacketSize(uint32_t frame_size,
703                                      uint32_t remaining_payload) {
704   return std::min(kMaxPayloadSizeBytes, remaining_payload);
705 }
706 
RunFor(int64_t time_ms,Packets * in_out)707 void VideoSource::RunFor(int64_t time_ms, Packets* in_out) {
708   assert(in_out);
709 
710   now_ms_ += time_ms;
711   Packets new_packets;
712 
713   while (now_ms_ >= next_frame_ms_) {
714     const int64_t kRandAmplitude = 2;
715     // A variance picked uniformly from {-1, 0, 1} ms is added to the frame
716     // timestamp.
717     next_frame_rand_ms_ = kRandAmplitude * (random_.Rand<float>() - 0.5);
718 
719     // Ensure frame will not have a negative timestamp.
720     int64_t next_frame_ms =
721         std::max<int64_t>(next_frame_ms_ + next_frame_rand_ms_, 0);
722 
723     prototype_header_.timestamp =
724         kTimestampBase + static_cast<uint32_t>(next_frame_ms * 90.0);
725     prototype_header_.extension.transmissionTimeOffset = 0;
726 
727     // Generate new packets for this frame, all with the same timestamp,
728     // but the payload size is capped, so if the whole frame doesn't fit in
729     // one packet, we will see a number of equally sized packets followed by
730     // one smaller at the tail.
731 
732     int64_t send_time_us = next_frame_ms * 1000.0;
733 
734     uint32_t frame_size = NextFrameSize();
735     uint32_t payload_size = frame_size;
736 
737     while (payload_size > 0) {
738       ++prototype_header_.sequenceNumber;
739       uint32_t size = NextPacketSize(frame_size, payload_size);
740       MediaPacket* new_packet =
741           new MediaPacket(flow_id_, send_time_us, size, prototype_header_);
742       new_packets.push_back(new_packet);
743       new_packet->SetAbsSendTimeMs(next_frame_ms);
744       new_packet->set_sender_timestamp_us(send_time_us);
745       payload_size -= size;
746     }
747 
748     next_frame_ms_ += frame_period_ms_;
749   }
750 
751   in_out->merge(new_packets, DereferencingComparator<Packet>);
752 }
753 
AdaptiveVideoSource(int flow_id,float fps,uint32_t kbps,uint32_t ssrc,int64_t first_frame_offset_ms)754 AdaptiveVideoSource::AdaptiveVideoSource(int flow_id,
755                                          float fps,
756                                          uint32_t kbps,
757                                          uint32_t ssrc,
758                                          int64_t first_frame_offset_ms)
759     : VideoSource(flow_id, fps, kbps, ssrc, first_frame_offset_ms) {
760 }
761 
SetBitrateBps(int bitrate_bps)762 void AdaptiveVideoSource::SetBitrateBps(int bitrate_bps) {
763   bits_per_second_ = std::min(bitrate_bps, 2500000);
764   frame_size_bytes_ = (bits_per_second_ / 8 * frame_period_ms_ + 500) / 1000;
765 }
766 
PeriodicKeyFrameSource(int flow_id,float fps,uint32_t kbps,uint32_t ssrc,int64_t first_frame_offset_ms,int key_frame_interval)767 PeriodicKeyFrameSource::PeriodicKeyFrameSource(int flow_id,
768                                                float fps,
769                                                uint32_t kbps,
770                                                uint32_t ssrc,
771                                                int64_t first_frame_offset_ms,
772                                                int key_frame_interval)
773     : AdaptiveVideoSource(flow_id, fps, kbps, ssrc, first_frame_offset_ms),
774       key_frame_interval_(key_frame_interval),
775       frame_counter_(0),
776       compensation_bytes_(0),
777       compensation_per_frame_(0) {
778 }
779 
NextFrameSize()780 uint32_t PeriodicKeyFrameSource::NextFrameSize() {
781   uint32_t payload_size = frame_size_bytes_;
782   if (frame_counter_ == 0) {
783     payload_size = kMaxPayloadSizeBytes * 12;
784     compensation_bytes_ = 4 * frame_size_bytes_;
785     compensation_per_frame_ = compensation_bytes_ / 30;
786   } else if (key_frame_interval_ > 0 &&
787              (frame_counter_ % key_frame_interval_ == 0)) {
788     payload_size *= 5;
789     compensation_bytes_ = payload_size - frame_size_bytes_;
790     compensation_per_frame_ = compensation_bytes_ / 30;
791   } else if (compensation_bytes_ > 0) {
792     if (compensation_per_frame_ > static_cast<int>(payload_size)) {
793       // Skip this frame.
794       compensation_bytes_ -= payload_size;
795       payload_size = 0;
796     } else {
797       payload_size -= compensation_per_frame_;
798       compensation_bytes_ -= compensation_per_frame_;
799     }
800   }
801   if (compensation_bytes_ < 0)
802     compensation_bytes_ = 0;
803   ++frame_counter_;
804   return payload_size;
805 }
806 
NextPacketSize(uint32_t frame_size,uint32_t remaining_payload)807 uint32_t PeriodicKeyFrameSource::NextPacketSize(uint32_t frame_size,
808                                                 uint32_t remaining_payload) {
809   uint32_t fragments =
810       (frame_size + (kMaxPayloadSizeBytes - 1)) / kMaxPayloadSizeBytes;
811   uint32_t avg_size = (frame_size + fragments - 1) / fragments;
812   return std::min(avg_size, remaining_payload);
813 }
814 }  // namespace bwe
815 }  // namespace testing
816 }  // namespace webrtc
817