1 /*
2  *  Copyright (c) 2015 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "webrtc/modules/remote_bitrate_estimator/test/packet_sender.h"
12 
13 #include <algorithm>
14 #include <list>
15 #include <sstream>
16 
17 #include "webrtc/base/checks.h"
18 #include "webrtc/modules/include/module_common_types.h"
19 #include "webrtc/modules/remote_bitrate_estimator/test/bwe.h"
20 #include "webrtc/modules/remote_bitrate_estimator/test/metric_recorder.h"
21 
22 namespace webrtc {
23 namespace testing {
24 namespace bwe {
25 
Pause()26 void PacketSender::Pause() {
27   running_ = false;
28   if (metric_recorder_ != nullptr) {
29     metric_recorder_->PauseFlow();
30   }
31 }
32 
Resume(int64_t paused_time_ms)33 void PacketSender::Resume(int64_t paused_time_ms) {
34   running_ = true;
35   if (metric_recorder_ != nullptr) {
36     metric_recorder_->ResumeFlow(paused_time_ms);
37   }
38 }
39 
set_metric_recorder(MetricRecorder * metric_recorder)40 void PacketSender::set_metric_recorder(MetricRecorder* metric_recorder) {
41   metric_recorder_ = metric_recorder;
42 }
43 
RecordBitrate()44 void PacketSender::RecordBitrate() {
45   if (metric_recorder_ != nullptr) {
46     BWE_TEST_LOGGING_CONTEXT("Sender");
47     BWE_TEST_LOGGING_CONTEXT(*flow_ids().begin());
48     metric_recorder_->UpdateTimeMs(clock_.TimeInMilliseconds());
49     metric_recorder_->UpdateSendingEstimateKbps(TargetBitrateKbps());
50   }
51 }
52 
GetFeedbackPackets(Packets * in_out,int64_t end_time_ms,int flow_id)53 std::list<FeedbackPacket*> GetFeedbackPackets(Packets* in_out,
54                                               int64_t end_time_ms,
55                                               int flow_id) {
56   std::list<FeedbackPacket*> fb_packets;
57   for (auto it = in_out->begin(); it != in_out->end();) {
58     if ((*it)->send_time_us() > 1000 * end_time_ms)
59       break;
60     if ((*it)->GetPacketType() == Packet::kFeedback &&
61         flow_id == (*it)->flow_id()) {
62       fb_packets.push_back(static_cast<FeedbackPacket*>(*it));
63       it = in_out->erase(it);
64     } else {
65       ++it;
66     }
67   }
68   return fb_packets;
69 }
70 
VideoSender(PacketProcessorListener * listener,VideoSource * source,BandwidthEstimatorType estimator_type)71 VideoSender::VideoSender(PacketProcessorListener* listener,
72                          VideoSource* source,
73                          BandwidthEstimatorType estimator_type)
74     : PacketSender(listener, source->flow_id()),
75       source_(source),
76       bwe_(CreateBweSender(estimator_type,
77                            source_->bits_per_second() / 1000,
78                            this,
79                            &clock_)),
80       previous_sending_bitrate_(0) {
81   modules_.push_back(bwe_.get());
82 }
83 
~VideoSender()84 VideoSender::~VideoSender() {
85 }
86 
Pause()87 void VideoSender::Pause() {
88   previous_sending_bitrate_ = TargetBitrateKbps();
89   PacketSender::Pause();
90 }
91 
Resume(int64_t paused_time_ms)92 void VideoSender::Resume(int64_t paused_time_ms) {
93   source_->SetBitrateBps(previous_sending_bitrate_);
94   PacketSender::Resume(paused_time_ms);
95 }
96 
RunFor(int64_t time_ms,Packets * in_out)97 void VideoSender::RunFor(int64_t time_ms, Packets* in_out) {
98   std::list<FeedbackPacket*> feedbacks = GetFeedbackPackets(
99       in_out, clock_.TimeInMilliseconds() + time_ms, source_->flow_id());
100   ProcessFeedbackAndGeneratePackets(time_ms, &feedbacks, in_out);
101 }
102 
ProcessFeedbackAndGeneratePackets(int64_t time_ms,std::list<FeedbackPacket * > * feedbacks,Packets * packets)103 void VideoSender::ProcessFeedbackAndGeneratePackets(
104     int64_t time_ms,
105     std::list<FeedbackPacket*>* feedbacks,
106     Packets* packets) {
107   do {
108     // Make sure to at least run Process() below every 100 ms.
109     int64_t time_to_run_ms = std::min<int64_t>(time_ms, 100);
110     if (!feedbacks->empty()) {
111       int64_t time_until_feedback_ms =
112           feedbacks->front()->send_time_ms() - clock_.TimeInMilliseconds();
113       time_to_run_ms =
114           std::max<int64_t>(std::min(time_ms, time_until_feedback_ms), 0);
115     }
116 
117     if (!running_) {
118       source_->SetBitrateBps(0);
119     }
120 
121     Packets generated;
122     source_->RunFor(time_to_run_ms, &generated);
123     bwe_->OnPacketsSent(generated);
124     packets->merge(generated, DereferencingComparator<Packet>);
125 
126     clock_.AdvanceTimeMilliseconds(time_to_run_ms);
127 
128     if (!feedbacks->empty()) {
129       bwe_->GiveFeedback(*feedbacks->front());
130       delete feedbacks->front();
131       feedbacks->pop_front();
132     }
133 
134     bwe_->Process();
135 
136     time_ms -= time_to_run_ms;
137   } while (time_ms > 0);
138   assert(feedbacks->empty());
139 }
140 
GetFeedbackIntervalMs() const141 int VideoSender::GetFeedbackIntervalMs() const {
142   return bwe_->GetFeedbackIntervalMs();
143 }
144 
OnNetworkChanged(uint32_t target_bitrate_bps,uint8_t fraction_lost,int64_t rtt)145 void VideoSender::OnNetworkChanged(uint32_t target_bitrate_bps,
146                                    uint8_t fraction_lost,
147                                    int64_t rtt) {
148   source_->SetBitrateBps(target_bitrate_bps);
149   RecordBitrate();
150 }
151 
TargetBitrateKbps()152 uint32_t VideoSender::TargetBitrateKbps() {
153   return (source_->bits_per_second() + 500) / 1000;
154 }
155 
PacedVideoSender(PacketProcessorListener * listener,VideoSource * source,BandwidthEstimatorType estimator)156 PacedVideoSender::PacedVideoSender(PacketProcessorListener* listener,
157                                    VideoSource* source,
158                                    BandwidthEstimatorType estimator)
159     : VideoSender(listener, source, estimator),
160       pacer_(&clock_,
161              this,
162              source->bits_per_second() / 1000,
163              PacedSender::kDefaultPaceMultiplier * source->bits_per_second() /
164                  1000,
165              0) {
166   modules_.push_back(&pacer_);
167 }
168 
~PacedVideoSender()169 PacedVideoSender::~PacedVideoSender() {
170   for (Packet* packet : pacer_queue_)
171     delete packet;
172   for (Packet* packet : queue_)
173     delete packet;
174 }
175 
RunFor(int64_t time_ms,Packets * in_out)176 void PacedVideoSender::RunFor(int64_t time_ms, Packets* in_out) {
177   int64_t end_time_ms = clock_.TimeInMilliseconds() + time_ms;
178   // Run process periodically to allow the packets to be paced out.
179   std::list<FeedbackPacket*> feedbacks =
180       GetFeedbackPackets(in_out, end_time_ms, source_->flow_id());
181   int64_t last_run_time_ms = -1;
182   BWE_TEST_LOGGING_CONTEXT("Sender");
183   BWE_TEST_LOGGING_CONTEXT(source_->flow_id());
184   do {
185     int64_t time_until_process_ms = TimeUntilNextProcess(modules_);
186     int64_t time_until_feedback_ms = time_ms;
187     if (!feedbacks.empty())
188       time_until_feedback_ms = std::max<int64_t>(
189           feedbacks.front()->send_time_ms() - clock_.TimeInMilliseconds(), 0);
190 
191     int64_t time_until_next_event_ms =
192         std::min(time_until_feedback_ms, time_until_process_ms);
193 
194     time_until_next_event_ms =
195         std::min(source_->GetTimeUntilNextFrameMs(), time_until_next_event_ms);
196 
197     // Never run for longer than we have been asked for.
198     if (clock_.TimeInMilliseconds() + time_until_next_event_ms > end_time_ms)
199       time_until_next_event_ms = end_time_ms - clock_.TimeInMilliseconds();
200 
201     // Make sure we don't get stuck if an event doesn't trigger. This typically
202     // happens if the prober wants to probe, but there's no packet to send.
203     if (time_until_next_event_ms == 0 && last_run_time_ms == 0)
204       time_until_next_event_ms = 1;
205     last_run_time_ms = time_until_next_event_ms;
206 
207     Packets generated_packets;
208     source_->RunFor(time_until_next_event_ms, &generated_packets);
209     if (!generated_packets.empty()) {
210       for (Packet* packet : generated_packets) {
211         MediaPacket* media_packet = static_cast<MediaPacket*>(packet);
212         pacer_.InsertPacket(
213             PacedSender::kNormalPriority, media_packet->header().ssrc,
214             media_packet->header().sequenceNumber, media_packet->send_time_ms(),
215             media_packet->payload_size(), false);
216         pacer_queue_.push_back(packet);
217         assert(pacer_queue_.size() < 10000);
218       }
219     }
220 
221     clock_.AdvanceTimeMilliseconds(time_until_next_event_ms);
222 
223     if (time_until_next_event_ms == time_until_feedback_ms) {
224       if (!feedbacks.empty()) {
225         bwe_->GiveFeedback(*feedbacks.front());
226         delete feedbacks.front();
227         feedbacks.pop_front();
228       }
229       bwe_->Process();
230     }
231 
232     if (time_until_next_event_ms == time_until_process_ms) {
233       CallProcess(modules_);
234     }
235   } while (clock_.TimeInMilliseconds() < end_time_ms);
236   QueuePackets(in_out, end_time_ms * 1000);
237 }
238 
TimeUntilNextProcess(const std::list<Module * > & modules)239 int64_t PacedVideoSender::TimeUntilNextProcess(
240     const std::list<Module*>& modules) {
241   int64_t time_until_next_process_ms = 10;
242   for (Module* module : modules) {
243     int64_t next_process_ms = module->TimeUntilNextProcess();
244     if (next_process_ms < time_until_next_process_ms)
245       time_until_next_process_ms = next_process_ms;
246   }
247   if (time_until_next_process_ms < 0)
248     time_until_next_process_ms = 0;
249   return time_until_next_process_ms;
250 }
251 
CallProcess(const std::list<Module * > & modules)252 void PacedVideoSender::CallProcess(const std::list<Module*>& modules) {
253   for (Module* module : modules) {
254     if (module->TimeUntilNextProcess() <= 0) {
255       module->Process();
256     }
257   }
258 }
259 
QueuePackets(Packets * batch,int64_t end_of_batch_time_us)260 void PacedVideoSender::QueuePackets(Packets* batch,
261                                     int64_t end_of_batch_time_us) {
262   queue_.merge(*batch, DereferencingComparator<Packet>);
263   if (queue_.empty()) {
264     return;
265   }
266   Packets::iterator it = queue_.begin();
267   for (; it != queue_.end(); ++it) {
268     if ((*it)->send_time_us() > end_of_batch_time_us) {
269       break;
270     }
271   }
272   Packets to_transfer;
273   to_transfer.splice(to_transfer.begin(), queue_, queue_.begin(), it);
274   for (Packet* packet : to_transfer)
275     packet->set_paced(true);
276   bwe_->OnPacketsSent(to_transfer);
277   batch->merge(to_transfer, DereferencingComparator<Packet>);
278 }
279 
TimeToSendPacket(uint32_t ssrc,uint16_t sequence_number,int64_t capture_time_ms,bool retransmission)280 bool PacedVideoSender::TimeToSendPacket(uint32_t ssrc,
281                                         uint16_t sequence_number,
282                                         int64_t capture_time_ms,
283                                         bool retransmission) {
284   for (Packets::iterator it = pacer_queue_.begin(); it != pacer_queue_.end();
285        ++it) {
286     MediaPacket* media_packet = static_cast<MediaPacket*>(*it);
287     if (media_packet->header().sequenceNumber == sequence_number) {
288       int64_t pace_out_time_ms = clock_.TimeInMilliseconds();
289 
290       // Make sure a packet is never paced out earlier than when it was put into
291       // the pacer.
292       assert(pace_out_time_ms >= media_packet->send_time_ms());
293 
294       media_packet->SetAbsSendTimeMs(pace_out_time_ms);
295       media_packet->set_send_time_us(1000 * pace_out_time_ms);
296       media_packet->set_sender_timestamp_us(1000 * pace_out_time_ms);
297       queue_.push_back(media_packet);
298       pacer_queue_.erase(it);
299       return true;
300     }
301   }
302   return false;
303 }
304 
TimeToSendPadding(size_t bytes)305 size_t PacedVideoSender::TimeToSendPadding(size_t bytes) {
306   return 0;
307 }
308 
OnNetworkChanged(uint32_t target_bitrate_bps,uint8_t fraction_lost,int64_t rtt)309 void PacedVideoSender::OnNetworkChanged(uint32_t target_bitrate_bps,
310                                         uint8_t fraction_lost,
311                                         int64_t rtt) {
312   VideoSender::OnNetworkChanged(target_bitrate_bps, fraction_lost, rtt);
313   pacer_.UpdateBitrate(
314       target_bitrate_bps / 1000,
315       PacedSender::kDefaultPaceMultiplier * target_bitrate_bps / 1000, 0);
316 }
317 
318 const int kNoLimit = std::numeric_limits<int>::max();
319 const int kPacketSizeBytes = 1200;
320 
TcpSender(PacketProcessorListener * listener,int flow_id,int64_t offset_ms)321 TcpSender::TcpSender(PacketProcessorListener* listener,
322                      int flow_id,
323                      int64_t offset_ms)
324     : TcpSender(listener, flow_id, offset_ms, kNoLimit) {
325 }
326 
TcpSender(PacketProcessorListener * listener,int flow_id,int64_t offset_ms,int send_limit_bytes)327 TcpSender::TcpSender(PacketProcessorListener* listener,
328                      int flow_id,
329                      int64_t offset_ms,
330                      int send_limit_bytes)
331     : PacketSender(listener, flow_id),
332       cwnd_(10),
333       ssthresh_(kNoLimit),
334       ack_received_(false),
335       last_acked_seq_num_(0),
336       next_sequence_number_(0),
337       offset_ms_(offset_ms),
338       last_reduction_time_ms_(-1),
339       last_rtt_ms_(0),
340       total_sent_bytes_(0),
341       send_limit_bytes_(send_limit_bytes),
342       last_generated_packets_ms_(0),
343       num_recent_sent_packets_(0),
344       bitrate_kbps_(0) {
345 }
346 
RunFor(int64_t time_ms,Packets * in_out)347 void TcpSender::RunFor(int64_t time_ms, Packets* in_out) {
348   if (clock_.TimeInMilliseconds() + time_ms < offset_ms_) {
349     clock_.AdvanceTimeMilliseconds(time_ms);
350     if (running_) {
351       Pause();
352     }
353     return;
354   }
355 
356   if (!running_ && total_sent_bytes_ == 0) {
357     Resume(offset_ms_);
358   }
359 
360   int64_t start_time_ms = clock_.TimeInMilliseconds();
361 
362   std::list<FeedbackPacket*> feedbacks = GetFeedbackPackets(
363       in_out, clock_.TimeInMilliseconds() + time_ms, *flow_ids().begin());
364   // The number of packets which are sent in during time_ms depends on the
365   // number of packets in_flight_ and the max number of packets in flight
366   // (cwnd_). Therefore SendPackets() isn't directly dependent on time_ms.
367   for (FeedbackPacket* fb : feedbacks) {
368     clock_.AdvanceTimeMilliseconds(fb->send_time_ms() -
369                                    clock_.TimeInMilliseconds());
370     last_rtt_ms_ = fb->send_time_ms() - fb->latest_send_time_ms();
371     UpdateCongestionControl(fb);
372     SendPackets(in_out);
373   }
374 
375   for (auto it = in_flight_.begin(); it != in_flight_.end();) {
376     if (it->time_ms < clock_.TimeInMilliseconds() - 1000)
377       in_flight_.erase(it++);
378     else
379       ++it;
380   }
381 
382   clock_.AdvanceTimeMilliseconds(time_ms -
383                                  (clock_.TimeInMilliseconds() - start_time_ms));
384   SendPackets(in_out);
385 }
386 
SendPackets(Packets * in_out)387 void TcpSender::SendPackets(Packets* in_out) {
388   int cwnd = ceil(cwnd_);
389   int packets_to_send = std::max(cwnd - static_cast<int>(in_flight_.size()), 0);
390   int timed_out = TriggerTimeouts();
391   if (timed_out > 0) {
392     HandleLoss();
393   }
394   if (packets_to_send > 0) {
395     Packets generated = GeneratePackets(packets_to_send);
396     for (Packet* packet : generated)
397       in_flight_.insert(InFlight(*static_cast<MediaPacket*>(packet)));
398 
399     in_out->merge(generated, DereferencingComparator<Packet>);
400   }
401 }
402 
UpdateCongestionControl(const FeedbackPacket * fb)403 void TcpSender::UpdateCongestionControl(const FeedbackPacket* fb) {
404   const TcpFeedback* tcp_fb = static_cast<const TcpFeedback*>(fb);
405   RTC_DCHECK(!tcp_fb->acked_packets().empty());
406   ack_received_ = true;
407 
408   uint16_t expected = tcp_fb->acked_packets().back() - last_acked_seq_num_;
409   uint16_t missing =
410       expected - static_cast<uint16_t>(tcp_fb->acked_packets().size());
411 
412   for (uint16_t ack_seq_num : tcp_fb->acked_packets())
413     in_flight_.erase(InFlight(ack_seq_num, clock_.TimeInMilliseconds()));
414 
415   if (missing > 0) {
416     HandleLoss();
417   } else if (cwnd_ <= ssthresh_) {
418     cwnd_ += tcp_fb->acked_packets().size();
419   } else {
420     cwnd_ += 1.0f / cwnd_;
421   }
422 
423   last_acked_seq_num_ =
424       LatestSequenceNumber(tcp_fb->acked_packets().back(), last_acked_seq_num_);
425 }
426 
TriggerTimeouts()427 int TcpSender::TriggerTimeouts() {
428   int timed_out = 0;
429   for (auto it = in_flight_.begin(); it != in_flight_.end();) {
430     if (it->time_ms < clock_.TimeInMilliseconds() - 1000) {
431       in_flight_.erase(it++);
432       ++timed_out;
433     } else {
434       ++it;
435     }
436   }
437   return timed_out;
438 }
439 
HandleLoss()440 void TcpSender::HandleLoss() {
441   if (clock_.TimeInMilliseconds() - last_reduction_time_ms_ < last_rtt_ms_)
442     return;
443   last_reduction_time_ms_ = clock_.TimeInMilliseconds();
444   ssthresh_ = std::max(static_cast<int>(in_flight_.size() / 2), 2);
445   cwnd_ = ssthresh_;
446 }
447 
GeneratePackets(size_t num_packets)448 Packets TcpSender::GeneratePackets(size_t num_packets) {
449   Packets generated;
450 
451   UpdateSendBitrateEstimate(num_packets);
452 
453   for (size_t i = 0; i < num_packets; ++i) {
454     if ((total_sent_bytes_ + kPacketSizeBytes) > send_limit_bytes_) {
455       if (running_) {
456         Pause();
457       }
458       break;
459     }
460     generated.push_back(
461         new MediaPacket(*flow_ids().begin(), 1000 * clock_.TimeInMilliseconds(),
462                         kPacketSizeBytes, next_sequence_number_++));
463     generated.back()->set_sender_timestamp_us(
464         1000 * clock_.TimeInMilliseconds());
465 
466     total_sent_bytes_ += kPacketSizeBytes;
467   }
468 
469   return generated;
470 }
471 
UpdateSendBitrateEstimate(size_t num_packets)472 void TcpSender::UpdateSendBitrateEstimate(size_t num_packets) {
473   const int kTimeWindowMs = 500;
474   num_recent_sent_packets_ += num_packets;
475 
476   int64_t delta_ms = clock_.TimeInMilliseconds() - last_generated_packets_ms_;
477   if (delta_ms >= kTimeWindowMs) {
478     bitrate_kbps_ =
479         static_cast<uint32_t>(8 * num_recent_sent_packets_ * kPacketSizeBytes) /
480         delta_ms;
481     last_generated_packets_ms_ = clock_.TimeInMilliseconds();
482     num_recent_sent_packets_ = 0;
483   }
484 
485   RecordBitrate();
486 }
487 
TargetBitrateKbps()488 uint32_t TcpSender::TargetBitrateKbps() {
489   return bitrate_kbps_;
490 }
491 
492 }  // namespace bwe
493 }  // namespace testing
494 }  // namespace webrtc
495