1 /*
2  *  Copyright 2018 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "call/simulated_network.h"
12 
13 #include <algorithm>
14 #include <cmath>
15 #include <utility>
16 
17 #include "api/units/data_rate.h"
18 #include "api/units/data_size.h"
19 #include "api/units/time_delta.h"
20 #include "rtc_base/checks.h"
21 
22 namespace webrtc {
23 namespace {
24 constexpr TimeDelta kDefaultProcessDelay = TimeDelta::Millis(5);
25 }  // namespace
26 
27 CoDelSimulation::CoDelSimulation() = default;
28 CoDelSimulation::~CoDelSimulation() = default;
29 
DropDequeuedPacket(Timestamp now,Timestamp enqueing_time,DataSize packet_size,DataSize queue_size)30 bool CoDelSimulation::DropDequeuedPacket(Timestamp now,
31                                          Timestamp enqueing_time,
32                                          DataSize packet_size,
33                                          DataSize queue_size) {
34   constexpr TimeDelta kWindow = TimeDelta::Millis(100);
35   constexpr TimeDelta kDelayThreshold = TimeDelta::Millis(5);
36   constexpr TimeDelta kDropCountMemory = TimeDelta::Millis(1600);
37   constexpr DataSize kMaxPacketSize = DataSize::Bytes(1500);
38 
39   // Compensates for process interval in simulation; not part of standard CoDel.
40   TimeDelta queuing_time = now - enqueing_time - kDefaultProcessDelay;
41 
42   if (queue_size < kMaxPacketSize || queuing_time < kDelayThreshold) {
43     enter_drop_state_at_ = Timestamp::PlusInfinity();
44     state_ = kNormal;
45     return false;
46   }
47   switch (state_) {
48     case kNormal:
49       enter_drop_state_at_ = now + kWindow;
50       state_ = kPending;
51       return false;
52 
53     case kPending:
54       if (now >= enter_drop_state_at_) {
55         state_ = kDropping;
56         // Starting the drop counter with the drops made during the most recent
57         // drop state period.
58         drop_count_ = drop_count_ - previous_drop_count_;
59         if (now >= last_drop_at_ + kDropCountMemory)
60           drop_count_ = 0;
61         previous_drop_count_ = drop_count_;
62         last_drop_at_ = now;
63         ++drop_count_;
64         return true;
65       }
66       return false;
67 
68     case kDropping:
69       TimeDelta drop_delay = kWindow / sqrt(static_cast<double>(drop_count_));
70       Timestamp next_drop_at = last_drop_at_ + drop_delay;
71       if (now >= next_drop_at) {
72         if (queue_size - packet_size < kMaxPacketSize)
73           state_ = kPending;
74         last_drop_at_ = next_drop_at;
75         ++drop_count_;
76         return true;
77       }
78       return false;
79   }
80 }
81 
SimulatedNetwork(Config config,uint64_t random_seed)82 SimulatedNetwork::SimulatedNetwork(Config config, uint64_t random_seed)
83     : random_(random_seed), bursting_(false) {
84   SetConfig(config);
85 }
86 
87 SimulatedNetwork::~SimulatedNetwork() = default;
88 
SetConfig(const Config & config)89 void SimulatedNetwork::SetConfig(const Config& config) {
90   MutexLock lock(&config_lock_);
91   config_state_.config = config;  // Shallow copy of the struct.
92   double prob_loss = config.loss_percent / 100.0;
93   if (config_state_.config.avg_burst_loss_length == -1) {
94     // Uniform loss
95     config_state_.prob_loss_bursting = prob_loss;
96     config_state_.prob_start_bursting = prob_loss;
97   } else {
98     // Lose packets according to a gilbert-elliot model.
99     int avg_burst_loss_length = config.avg_burst_loss_length;
100     int min_avg_burst_loss_length = std::ceil(prob_loss / (1 - prob_loss));
101 
102     RTC_CHECK_GT(avg_burst_loss_length, min_avg_burst_loss_length)
103         << "For a total packet loss of " << config.loss_percent
104         << "%% then"
105            " avg_burst_loss_length must be "
106         << min_avg_burst_loss_length + 1 << " or higher.";
107 
108     config_state_.prob_loss_bursting = (1.0 - 1.0 / avg_burst_loss_length);
109     config_state_.prob_start_bursting =
110         prob_loss / (1 - prob_loss) / avg_burst_loss_length;
111   }
112 }
113 
UpdateConfig(std::function<void (BuiltInNetworkBehaviorConfig *)> config_modifier)114 void SimulatedNetwork::UpdateConfig(
115     std::function<void(BuiltInNetworkBehaviorConfig*)> config_modifier) {
116   MutexLock lock(&config_lock_);
117   config_modifier(&config_state_.config);
118 }
119 
PauseTransmissionUntil(int64_t until_us)120 void SimulatedNetwork::PauseTransmissionUntil(int64_t until_us) {
121   MutexLock lock(&config_lock_);
122   config_state_.pause_transmission_until_us = until_us;
123 }
124 
EnqueuePacket(PacketInFlightInfo packet)125 bool SimulatedNetwork::EnqueuePacket(PacketInFlightInfo packet) {
126   RTC_DCHECK_RUNS_SERIALIZED(&process_checker_);
127   ConfigState state = GetConfigState();
128 
129   UpdateCapacityQueue(state, packet.send_time_us);
130 
131   packet.size += state.config.packet_overhead;
132 
133   if (state.config.queue_length_packets > 0 &&
134       capacity_link_.size() >= state.config.queue_length_packets) {
135     // Too many packet on the link, drop this one.
136     return false;
137   }
138 
139   // Set arrival time = send time for now; actual arrival time will be
140   // calculated in UpdateCapacityQueue.
141   queue_size_bytes_ += packet.size;
142   capacity_link_.push({packet, packet.send_time_us});
143   if (!next_process_time_us_) {
144     next_process_time_us_ = packet.send_time_us + kDefaultProcessDelay.us();
145   }
146 
147   return true;
148 }
149 
NextDeliveryTimeUs() const150 absl::optional<int64_t> SimulatedNetwork::NextDeliveryTimeUs() const {
151   RTC_DCHECK_RUNS_SERIALIZED(&process_checker_);
152   return next_process_time_us_;
153 }
154 
UpdateCapacityQueue(ConfigState state,int64_t time_now_us)155 void SimulatedNetwork::UpdateCapacityQueue(ConfigState state,
156                                            int64_t time_now_us) {
157   bool needs_sort = false;
158 
159   // Catch for thread races.
160   if (time_now_us < last_capacity_link_visit_us_.value_or(time_now_us))
161     return;
162 
163   int64_t time_us = last_capacity_link_visit_us_.value_or(time_now_us);
164   // Check the capacity link first.
165   while (!capacity_link_.empty()) {
166     int64_t time_until_front_exits_us = 0;
167     if (state.config.link_capacity_kbps > 0) {
168       int64_t remaining_bits =
169           capacity_link_.front().packet.size * 8 - pending_drain_bits_;
170       RTC_DCHECK(remaining_bits > 0);
171       // Division rounded up - packet not delivered until its last bit is.
172       time_until_front_exits_us =
173           (1000 * remaining_bits + state.config.link_capacity_kbps - 1) /
174           state.config.link_capacity_kbps;
175     }
176 
177     if (time_us + time_until_front_exits_us > time_now_us) {
178       // Packet at front will not exit yet. Will not enter here on infinite
179       // capacity(=0) so no special handling needed.
180       pending_drain_bits_ +=
181           ((time_now_us - time_us) * state.config.link_capacity_kbps) / 1000;
182       break;
183     }
184     if (state.config.link_capacity_kbps > 0) {
185       pending_drain_bits_ +=
186           (time_until_front_exits_us * state.config.link_capacity_kbps) / 1000;
187     } else {
188       // Enough to drain the whole queue.
189       pending_drain_bits_ = queue_size_bytes_ * 8;
190     }
191 
192     // Time to get this packet.
193     PacketInfo packet = capacity_link_.front();
194     capacity_link_.pop();
195 
196     time_us += time_until_front_exits_us;
197     if (state.config.codel_active_queue_management) {
198       while (!capacity_link_.empty() &&
199              codel_controller_.DropDequeuedPacket(
200                  Timestamp::Micros(time_us),
201                  Timestamp::Micros(capacity_link_.front().packet.send_time_us),
202                  DataSize::Bytes(capacity_link_.front().packet.size),
203                  DataSize::Bytes(queue_size_bytes_))) {
204         PacketInfo dropped = capacity_link_.front();
205         capacity_link_.pop();
206         queue_size_bytes_ -= dropped.packet.size;
207         dropped.arrival_time_us = PacketDeliveryInfo::kNotReceived;
208         delay_link_.emplace_back(dropped);
209       }
210     }
211     RTC_DCHECK(time_us >= packet.packet.send_time_us);
212     packet.arrival_time_us =
213         std::max(state.pause_transmission_until_us, time_us);
214     queue_size_bytes_ -= packet.packet.size;
215     pending_drain_bits_ -= packet.packet.size * 8;
216     RTC_DCHECK(pending_drain_bits_ >= 0);
217 
218     // Drop packets at an average rate of |state.config.loss_percent| with
219     // and average loss burst length of |state.config.avg_burst_loss_length|.
220     if ((bursting_ && random_.Rand<double>() < state.prob_loss_bursting) ||
221         (!bursting_ && random_.Rand<double>() < state.prob_start_bursting)) {
222       bursting_ = true;
223       packet.arrival_time_us = PacketDeliveryInfo::kNotReceived;
224     } else {
225       bursting_ = false;
226       int64_t arrival_time_jitter_us = std::max(
227           random_.Gaussian(state.config.queue_delay_ms * 1000,
228                            state.config.delay_standard_deviation_ms * 1000),
229           0.0);
230 
231       // If reordering is not allowed then adjust arrival_time_jitter
232       // to make sure all packets are sent in order.
233       int64_t last_arrival_time_us =
234           delay_link_.empty() ? -1 : delay_link_.back().arrival_time_us;
235       if (!state.config.allow_reordering && !delay_link_.empty() &&
236           packet.arrival_time_us + arrival_time_jitter_us <
237               last_arrival_time_us) {
238         arrival_time_jitter_us = last_arrival_time_us - packet.arrival_time_us;
239       }
240       packet.arrival_time_us += arrival_time_jitter_us;
241       if (packet.arrival_time_us >= last_arrival_time_us) {
242         last_arrival_time_us = packet.arrival_time_us;
243       } else {
244         needs_sort = true;
245       }
246     }
247     delay_link_.emplace_back(packet);
248   }
249   last_capacity_link_visit_us_ = time_now_us;
250   // Cannot save unused capacity for later.
251   pending_drain_bits_ = std::min(pending_drain_bits_, queue_size_bytes_ * 8);
252 
253   if (needs_sort) {
254     // Packet(s) arrived out of order, make sure list is sorted.
255     std::sort(delay_link_.begin(), delay_link_.end(),
256               [](const PacketInfo& p1, const PacketInfo& p2) {
257                 return p1.arrival_time_us < p2.arrival_time_us;
258               });
259   }
260 }
261 
GetConfigState() const262 SimulatedNetwork::ConfigState SimulatedNetwork::GetConfigState() const {
263   MutexLock lock(&config_lock_);
264   return config_state_;
265 }
266 
DequeueDeliverablePackets(int64_t receive_time_us)267 std::vector<PacketDeliveryInfo> SimulatedNetwork::DequeueDeliverablePackets(
268     int64_t receive_time_us) {
269   RTC_DCHECK_RUNS_SERIALIZED(&process_checker_);
270   UpdateCapacityQueue(GetConfigState(), receive_time_us);
271   std::vector<PacketDeliveryInfo> packets_to_deliver;
272   // Check the extra delay queue.
273   while (!delay_link_.empty() &&
274          receive_time_us >= delay_link_.front().arrival_time_us) {
275     PacketInfo packet_info = delay_link_.front();
276     packets_to_deliver.emplace_back(
277         PacketDeliveryInfo(packet_info.packet, packet_info.arrival_time_us));
278     delay_link_.pop_front();
279   }
280 
281   if (!delay_link_.empty()) {
282     next_process_time_us_ = delay_link_.front().arrival_time_us;
283   } else if (!capacity_link_.empty()) {
284     next_process_time_us_ = receive_time_us + kDefaultProcessDelay.us();
285   } else {
286     next_process_time_us_.reset();
287   }
288   return packets_to_deliver;
289 }
290 
291 }  // namespace webrtc
292