1 /*
2  *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "test/network/cross_traffic.h"
12 
13 #include <math.h>
14 
15 #include <utility>
16 
17 #include "absl/memory/memory.h"
18 #include "absl/types/optional.h"
19 #include "cross_traffic.h"
20 #include "rtc_base/logging.h"
21 #include "rtc_base/numerics/safe_minmax.h"
22 
23 namespace webrtc {
24 namespace test {
25 
RandomWalkCrossTraffic(RandomWalkConfig config,TrafficRoute * traffic_route)26 RandomWalkCrossTraffic::RandomWalkCrossTraffic(RandomWalkConfig config,
27                                                TrafficRoute* traffic_route)
28     : config_(config),
29       traffic_route_(traffic_route),
30       random_(config_.random_seed) {
31   sequence_checker_.Detach();
32 }
33 RandomWalkCrossTraffic::~RandomWalkCrossTraffic() = default;
34 
Process(Timestamp at_time)35 void RandomWalkCrossTraffic::Process(Timestamp at_time) {
36   RTC_DCHECK_RUN_ON(&sequence_checker_);
37   if (last_process_time_.IsMinusInfinity()) {
38     last_process_time_ = at_time;
39   }
40   TimeDelta delta = at_time - last_process_time_;
41   last_process_time_ = at_time;
42 
43   if (at_time - last_update_time_ >= config_.update_interval) {
44     intensity_ += random_.Gaussian(config_.bias, config_.variance) *
45                   sqrt((at_time - last_update_time_).seconds<double>());
46     intensity_ = rtc::SafeClamp(intensity_, 0.0, 1.0);
47     last_update_time_ = at_time;
48   }
49   pending_size_ += TrafficRate() * delta;
50 
51   if (pending_size_ >= config_.min_packet_size &&
52       at_time >= last_send_time_ + config_.min_packet_interval) {
53     traffic_route_->SendPacket(pending_size_.bytes());
54     pending_size_ = DataSize::Zero();
55     last_send_time_ = at_time;
56   }
57 }
58 
TrafficRate() const59 DataRate RandomWalkCrossTraffic::TrafficRate() const {
60   RTC_DCHECK_RUN_ON(&sequence_checker_);
61   return config_.peak_rate * intensity_;
62 }
63 
StatsPrinter()64 ColumnPrinter RandomWalkCrossTraffic::StatsPrinter() {
65   return ColumnPrinter::Lambda(
66       "random_walk_cross_traffic_rate",
67       [this](rtc::SimpleStringBuilder& sb) {
68         sb.AppendFormat("%.0lf", TrafficRate().bps() / 8.0);
69       },
70       32);
71 }
72 
PulsedPeaksCrossTraffic(PulsedPeaksConfig config,TrafficRoute * traffic_route)73 PulsedPeaksCrossTraffic::PulsedPeaksCrossTraffic(PulsedPeaksConfig config,
74                                                  TrafficRoute* traffic_route)
75     : config_(config), traffic_route_(traffic_route) {
76   sequence_checker_.Detach();
77 }
78 PulsedPeaksCrossTraffic::~PulsedPeaksCrossTraffic() = default;
79 
Process(Timestamp at_time)80 void PulsedPeaksCrossTraffic::Process(Timestamp at_time) {
81   RTC_DCHECK_RUN_ON(&sequence_checker_);
82   TimeDelta time_since_toggle = at_time - last_update_time_;
83   if (time_since_toggle.IsInfinite() ||
84       (sending_ && time_since_toggle >= config_.send_duration)) {
85     sending_ = false;
86     last_update_time_ = at_time;
87   } else if (!sending_ && time_since_toggle >= config_.hold_duration) {
88     sending_ = true;
89     last_update_time_ = at_time;
90     // Start sending period.
91     last_send_time_ = at_time;
92   }
93 
94   if (sending_) {
95     DataSize pending_size = config_.peak_rate * (at_time - last_send_time_);
96 
97     if (pending_size >= config_.min_packet_size &&
98         at_time >= last_send_time_ + config_.min_packet_interval) {
99       traffic_route_->SendPacket(pending_size.bytes());
100       last_send_time_ = at_time;
101     }
102   }
103 }
104 
TrafficRate() const105 DataRate PulsedPeaksCrossTraffic::TrafficRate() const {
106   RTC_DCHECK_RUN_ON(&sequence_checker_);
107   return sending_ ? config_.peak_rate : DataRate::Zero();
108 }
109 
StatsPrinter()110 ColumnPrinter PulsedPeaksCrossTraffic::StatsPrinter() {
111   return ColumnPrinter::Lambda(
112       "pulsed_peaks_cross_traffic_rate",
113       [this](rtc::SimpleStringBuilder& sb) {
114         sb.AppendFormat("%.0lf", TrafficRate().bps() / 8.0);
115       },
116       32);
117 }
118 
TcpMessageRouteImpl(Clock * clock,TaskQueueBase * task_queue,EmulatedRoute * send_route,EmulatedRoute * ret_route)119 TcpMessageRouteImpl::TcpMessageRouteImpl(Clock* clock,
120                                          TaskQueueBase* task_queue,
121                                          EmulatedRoute* send_route,
122                                          EmulatedRoute* ret_route)
123     : clock_(clock),
124       task_queue_(task_queue),
125       request_route_(send_route,
126                      [this](TcpPacket packet, Timestamp) {
127                        OnRequest(std::move(packet));
128                      }),
129       response_route_(ret_route,
__anonfd5ec06b0402(TcpPacket packet, Timestamp arrival_time) 130                       [this](TcpPacket packet, Timestamp arrival_time) {
131                         OnResponse(std::move(packet), arrival_time);
132                       }) {}
133 
SendMessage(size_t size,std::function<void ()> on_received)134 void TcpMessageRouteImpl::SendMessage(size_t size,
135                                       std::function<void()> on_received) {
136   task_queue_->PostTask(
137       ToQueuedTask([this, size, handler = std::move(on_received)] {
138         // If we are currently sending a message we won't reset the connection,
139         // we'll act as if the messages are sent in the same TCP stream. This is
140         // intended to simulate recreation of a TCP session for each message
141         // in the typical case while avoiding the complexity overhead of
142         // maintaining multiple virtual TCP sessions in parallel.
143         if (pending_.empty() && in_flight_.empty()) {
144           cwnd_ = 10;
145           ssthresh_ = INFINITY;
146         }
147         int64_t data_left = static_cast<int64_t>(size);
148         int64_t kMaxPacketSize = 1200;
149         int64_t kMinPacketSize = 4;
150         Message message{std::move(handler)};
151         while (data_left > 0) {
152           int64_t packet_size = std::min(data_left, kMaxPacketSize);
153           int fragment_id = next_fragment_id_++;
154           pending_.push_back(MessageFragment{
155               fragment_id,
156               static_cast<size_t>(std::max(kMinPacketSize, packet_size))});
157           message.pending_fragment_ids.insert(fragment_id);
158           data_left -= packet_size;
159         }
160         messages_.emplace_back(message);
161         SendPackets(clock_->CurrentTime());
162       }));
163 }
164 
OnRequest(TcpPacket packet_info)165 void TcpMessageRouteImpl::OnRequest(TcpPacket packet_info) {
166   for (auto it = messages_.begin(); it != messages_.end(); ++it) {
167     if (it->pending_fragment_ids.count(packet_info.fragment.fragment_id) != 0) {
168       it->pending_fragment_ids.erase(packet_info.fragment.fragment_id);
169       if (it->pending_fragment_ids.empty()) {
170         it->handler();
171         messages_.erase(it);
172       }
173       break;
174     }
175   }
176   const size_t kAckPacketSize = 20;
177   response_route_.SendPacket(kAckPacketSize, packet_info);
178 }
179 
OnResponse(TcpPacket packet_info,Timestamp at_time)180 void TcpMessageRouteImpl::OnResponse(TcpPacket packet_info, Timestamp at_time) {
181   auto it = in_flight_.find(packet_info.sequence_number);
182   if (it != in_flight_.end()) {
183     last_rtt_ = at_time - packet_info.send_time;
184     in_flight_.erase(it);
185   }
186   auto lost_end = in_flight_.lower_bound(packet_info.sequence_number);
187   for (auto lost_it = in_flight_.begin(); lost_it != lost_end;
188        lost_it = in_flight_.erase(lost_it)) {
189     pending_.push_front(lost_it->second.fragment);
190   }
191 
192   if (packet_info.sequence_number - last_acked_seq_num_ > 1) {
193     HandleLoss(at_time);
194   } else if (cwnd_ <= ssthresh_) {
195     cwnd_ += 1;
196   } else {
197     cwnd_ += 1.0f / cwnd_;
198   }
199   last_acked_seq_num_ =
200       std::max(packet_info.sequence_number, last_acked_seq_num_);
201   SendPackets(at_time);
202 }
203 
HandleLoss(Timestamp at_time)204 void TcpMessageRouteImpl::HandleLoss(Timestamp at_time) {
205   if (at_time - last_reduction_time_ < last_rtt_)
206     return;
207   last_reduction_time_ = at_time;
208   ssthresh_ = std::max(static_cast<int>(in_flight_.size() / 2), 2);
209   cwnd_ = ssthresh_;
210 }
211 
SendPackets(Timestamp at_time)212 void TcpMessageRouteImpl::SendPackets(Timestamp at_time) {
213   const TimeDelta kPacketTimeout = TimeDelta::Seconds(1);
214   int cwnd = std::ceil(cwnd_);
215   int packets_to_send = std::max(cwnd - static_cast<int>(in_flight_.size()), 0);
216   while (packets_to_send-- > 0 && !pending_.empty()) {
217     auto seq_num = next_sequence_number_++;
218     TcpPacket send;
219     send.sequence_number = seq_num;
220     send.send_time = at_time;
221     send.fragment = pending_.front();
222     pending_.pop_front();
223     request_route_.SendPacket(send.fragment.size, send);
224     in_flight_.insert({seq_num, send});
225     task_queue_->PostDelayedTask(ToQueuedTask([this, seq_num] {
226                                    HandlePacketTimeout(seq_num,
227                                                        clock_->CurrentTime());
228                                  }),
229                                  kPacketTimeout.ms());
230   }
231 }
232 
HandlePacketTimeout(int seq_num,Timestamp at_time)233 void TcpMessageRouteImpl::HandlePacketTimeout(int seq_num, Timestamp at_time) {
234   auto lost = in_flight_.find(seq_num);
235   if (lost != in_flight_.end()) {
236     pending_.push_front(lost->second.fragment);
237     in_flight_.erase(lost);
238     HandleLoss(at_time);
239     SendPackets(at_time);
240   }
241 }
242 
FakeTcpCrossTraffic(Clock * clock,FakeTcpConfig config,EmulatedRoute * send_route,EmulatedRoute * ret_route)243 FakeTcpCrossTraffic::FakeTcpCrossTraffic(Clock* clock,
244                                          FakeTcpConfig config,
245                                          EmulatedRoute* send_route,
246                                          EmulatedRoute* ret_route)
247     : clock_(clock), conf_(config), route_(this, send_route, ret_route) {}
248 
Start(TaskQueueBase * task_queue)249 void FakeTcpCrossTraffic::Start(TaskQueueBase* task_queue) {
250   repeating_task_handle_ = RepeatingTaskHandle::Start(task_queue, [this] {
251     Process(clock_->CurrentTime());
252     return conf_.process_interval;
253   });
254 }
255 
Stop()256 void FakeTcpCrossTraffic::Stop() {
257   repeating_task_handle_.Stop();
258 }
259 
Process(Timestamp at_time)260 void FakeTcpCrossTraffic::Process(Timestamp at_time) {
261   SendPackets(at_time);
262 }
263 
OnRequest(int sequence_number,Timestamp at_time)264 void FakeTcpCrossTraffic::OnRequest(int sequence_number, Timestamp at_time) {
265   const size_t kAckPacketSize = 20;
266   route_.SendResponse(kAckPacketSize, sequence_number);
267 }
268 
OnResponse(int sequence_number,Timestamp at_time)269 void FakeTcpCrossTraffic::OnResponse(int sequence_number, Timestamp at_time) {
270   ack_received_ = true;
271   auto it = in_flight_.find(sequence_number);
272   if (it != in_flight_.end()) {
273     last_rtt_ = at_time - in_flight_.at(sequence_number);
274     in_flight_.erase(sequence_number);
275   }
276   if (sequence_number - last_acked_seq_num_ > 1) {
277     HandleLoss(at_time);
278   } else if (cwnd_ <= ssthresh_) {
279     cwnd_ += 1;
280   } else {
281     cwnd_ += 1.0f / cwnd_;
282   }
283   last_acked_seq_num_ = std::max(sequence_number, last_acked_seq_num_);
284   SendPackets(at_time);
285 }
286 
HandleLoss(Timestamp at_time)287 void FakeTcpCrossTraffic::HandleLoss(Timestamp at_time) {
288   if (at_time - last_reduction_time_ < last_rtt_)
289     return;
290   last_reduction_time_ = at_time;
291   ssthresh_ = std::max(static_cast<int>(in_flight_.size() / 2), 2);
292   cwnd_ = ssthresh_;
293 }
294 
SendPackets(Timestamp at_time)295 void FakeTcpCrossTraffic::SendPackets(Timestamp at_time) {
296   int cwnd = std::ceil(cwnd_);
297   int packets_to_send = std::max(cwnd - static_cast<int>(in_flight_.size()), 0);
298   bool timeouts = false;
299   for (auto it = in_flight_.begin(); it != in_flight_.end();) {
300     if (it->second < at_time - conf_.packet_timeout) {
301       it = in_flight_.erase(it);
302       timeouts = true;
303     } else {
304       ++it;
305     }
306   }
307   if (timeouts)
308     HandleLoss(at_time);
309   for (int i = 0; i < packets_to_send; ++i) {
310     if ((total_sent_ + conf_.packet_size) > conf_.send_limit) {
311       break;
312     }
313     in_flight_.insert({next_sequence_number_, at_time});
314     route_.SendRequest(conf_.packet_size.bytes<size_t>(),
315                        next_sequence_number_++);
316     total_sent_ += conf_.packet_size;
317   }
318 }
319 
320 }  // namespace test
321 }  // namespace webrtc
322