1 /*
2  *  Copyright (c) 2018 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "test/network/network_emulation.h"
12 
13 #include <algorithm>
14 #include <limits>
15 #include <memory>
16 
17 #include "api/units/data_size.h"
18 #include "rtc_base/bind.h"
19 #include "rtc_base/logging.h"
20 
21 namespace webrtc {
22 
OnPacketReceived(EmulatedIpPacket packet)23 void LinkEmulation::OnPacketReceived(EmulatedIpPacket packet) {
24   task_queue_->PostTask([this, packet = std::move(packet)]() mutable {
25     RTC_DCHECK_RUN_ON(task_queue_);
26 
27     uint64_t packet_id = next_packet_id_++;
28     bool sent = network_behavior_->EnqueuePacket(PacketInFlightInfo(
29         packet.ip_packet_size(), packet.arrival_time.us(), packet_id));
30     if (sent) {
31       packets_.emplace_back(StoredPacket{packet_id, std::move(packet), false});
32     }
33     if (process_task_.Running())
34       return;
35     absl::optional<int64_t> next_time_us =
36         network_behavior_->NextDeliveryTimeUs();
37     if (!next_time_us)
38       return;
39     Timestamp current_time = clock_->CurrentTime();
40     process_task_ = RepeatingTaskHandle::DelayedStart(
41         task_queue_->Get(),
42         std::max(TimeDelta::Zero(),
43                  Timestamp::Micros(*next_time_us) - current_time),
44         [this]() {
45           RTC_DCHECK_RUN_ON(task_queue_);
46           Timestamp current_time = clock_->CurrentTime();
47           Process(current_time);
48           absl::optional<int64_t> next_time_us =
49               network_behavior_->NextDeliveryTimeUs();
50           if (!next_time_us) {
51             process_task_.Stop();
52             return TimeDelta::Zero();  // This is ignored.
53           }
54           RTC_DCHECK_GE(*next_time_us, current_time.us());
55           return Timestamp::Micros(*next_time_us) - current_time;
56         });
57   });
58 }
59 
Process(Timestamp at_time)60 void LinkEmulation::Process(Timestamp at_time) {
61   std::vector<PacketDeliveryInfo> delivery_infos =
62       network_behavior_->DequeueDeliverablePackets(at_time.us());
63   for (PacketDeliveryInfo& delivery_info : delivery_infos) {
64     StoredPacket* packet = nullptr;
65     for (auto& stored_packet : packets_) {
66       if (stored_packet.id == delivery_info.packet_id) {
67         packet = &stored_packet;
68         break;
69       }
70     }
71     RTC_CHECK(packet);
72     RTC_DCHECK(!packet->removed);
73     packet->removed = true;
74 
75     if (delivery_info.receive_time_us != PacketDeliveryInfo::kNotReceived) {
76       packet->packet.arrival_time =
77           Timestamp::Micros(delivery_info.receive_time_us);
78       receiver_->OnPacketReceived(std::move(packet->packet));
79     }
80     while (!packets_.empty() && packets_.front().removed) {
81       packets_.pop_front();
82     }
83   }
84 }
85 
NetworkRouterNode(rtc::TaskQueue * task_queue)86 NetworkRouterNode::NetworkRouterNode(rtc::TaskQueue* task_queue)
87     : task_queue_(task_queue) {}
88 
OnPacketReceived(EmulatedIpPacket packet)89 void NetworkRouterNode::OnPacketReceived(EmulatedIpPacket packet) {
90   RTC_DCHECK_RUN_ON(task_queue_);
91   if (watcher_) {
92     watcher_(packet);
93   }
94   if (filter_) {
95     if (!filter_(packet))
96       return;
97   }
98   auto receiver_it = routing_.find(packet.to.ipaddr());
99   if (receiver_it == routing_.end()) {
100     return;
101   }
102   RTC_CHECK(receiver_it != routing_.end());
103 
104   receiver_it->second->OnPacketReceived(std::move(packet));
105 }
106 
SetReceiver(const rtc::IPAddress & dest_ip,EmulatedNetworkReceiverInterface * receiver)107 void NetworkRouterNode::SetReceiver(
108     const rtc::IPAddress& dest_ip,
109     EmulatedNetworkReceiverInterface* receiver) {
110   task_queue_->PostTask([=] {
111     RTC_DCHECK_RUN_ON(task_queue_);
112     EmulatedNetworkReceiverInterface* cur_receiver = routing_[dest_ip];
113     RTC_CHECK(cur_receiver == nullptr || cur_receiver == receiver)
114         << "Routing for dest_ip=" << dest_ip.ToString() << " already exists";
115     routing_[dest_ip] = receiver;
116   });
117 }
118 
RemoveReceiver(const rtc::IPAddress & dest_ip)119 void NetworkRouterNode::RemoveReceiver(const rtc::IPAddress& dest_ip) {
120   RTC_DCHECK_RUN_ON(task_queue_);
121   routing_.erase(dest_ip);
122 }
123 
SetWatcher(std::function<void (const EmulatedIpPacket &)> watcher)124 void NetworkRouterNode::SetWatcher(
125     std::function<void(const EmulatedIpPacket&)> watcher) {
126   task_queue_->PostTask([=] {
127     RTC_DCHECK_RUN_ON(task_queue_);
128     watcher_ = watcher;
129   });
130 }
131 
SetFilter(std::function<bool (const EmulatedIpPacket &)> filter)132 void NetworkRouterNode::SetFilter(
133     std::function<bool(const EmulatedIpPacket&)> filter) {
134   task_queue_->PostTask([=] {
135     RTC_DCHECK_RUN_ON(task_queue_);
136     filter_ = filter;
137   });
138 }
139 
EmulatedNetworkNode(Clock * clock,rtc::TaskQueue * task_queue,std::unique_ptr<NetworkBehaviorInterface> network_behavior)140 EmulatedNetworkNode::EmulatedNetworkNode(
141     Clock* clock,
142     rtc::TaskQueue* task_queue,
143     std::unique_ptr<NetworkBehaviorInterface> network_behavior)
144     : router_(task_queue),
145       link_(clock, task_queue, std::move(network_behavior), &router_) {}
146 
OnPacketReceived(EmulatedIpPacket packet)147 void EmulatedNetworkNode::OnPacketReceived(EmulatedIpPacket packet) {
148   link_.OnPacketReceived(std::move(packet));
149 }
150 
CreateRoute(const rtc::IPAddress & receiver_ip,std::vector<EmulatedNetworkNode * > nodes,EmulatedNetworkReceiverInterface * receiver)151 void EmulatedNetworkNode::CreateRoute(
152     const rtc::IPAddress& receiver_ip,
153     std::vector<EmulatedNetworkNode*> nodes,
154     EmulatedNetworkReceiverInterface* receiver) {
155   RTC_CHECK(!nodes.empty());
156   for (size_t i = 0; i + 1 < nodes.size(); ++i)
157     nodes[i]->router()->SetReceiver(receiver_ip, nodes[i + 1]);
158   nodes.back()->router()->SetReceiver(receiver_ip, receiver);
159 }
160 
ClearRoute(const rtc::IPAddress & receiver_ip,std::vector<EmulatedNetworkNode * > nodes)161 void EmulatedNetworkNode::ClearRoute(const rtc::IPAddress& receiver_ip,
162                                      std::vector<EmulatedNetworkNode*> nodes) {
163   for (EmulatedNetworkNode* node : nodes)
164     node->router()->RemoveReceiver(receiver_ip);
165 }
166 
167 EmulatedNetworkNode::~EmulatedNetworkNode() = default;
168 
EmulatedEndpointImpl(uint64_t id,const rtc::IPAddress & ip,bool is_enabled,rtc::AdapterType type,rtc::TaskQueue * task_queue,Clock * clock)169 EmulatedEndpointImpl::EmulatedEndpointImpl(uint64_t id,
170                                            const rtc::IPAddress& ip,
171                                            bool is_enabled,
172                                            rtc::AdapterType type,
173                                            rtc::TaskQueue* task_queue,
174                                            Clock* clock)
175     : id_(id),
176       peer_local_addr_(ip),
177       is_enabled_(is_enabled),
178       type_(type),
179       clock_(clock),
180       task_queue_(task_queue),
181       router_(task_queue_),
182       next_port_(kFirstEphemeralPort) {
183   constexpr int kIPv4NetworkPrefixLength = 24;
184   constexpr int kIPv6NetworkPrefixLength = 64;
185 
186   int prefix_length = 0;
187   if (ip.family() == AF_INET) {
188     prefix_length = kIPv4NetworkPrefixLength;
189   } else if (ip.family() == AF_INET6) {
190     prefix_length = kIPv6NetworkPrefixLength;
191   }
192   rtc::IPAddress prefix = TruncateIP(ip, prefix_length);
193   network_ = std::make_unique<rtc::Network>(
194       ip.ToString(), "Endpoint id=" + std::to_string(id_), prefix,
195       prefix_length, type_);
196   network_->AddIP(ip);
197 
198   enabled_state_checker_.Detach();
199   stats_.local_addresses.push_back(peer_local_addr_);
200 }
201 EmulatedEndpointImpl::~EmulatedEndpointImpl() = default;
202 
GetId() const203 uint64_t EmulatedEndpointImpl::GetId() const {
204   return id_;
205 }
206 
SendPacket(const rtc::SocketAddress & from,const rtc::SocketAddress & to,rtc::CopyOnWriteBuffer packet_data,uint16_t application_overhead)207 void EmulatedEndpointImpl::SendPacket(const rtc::SocketAddress& from,
208                                       const rtc::SocketAddress& to,
209                                       rtc::CopyOnWriteBuffer packet_data,
210                                       uint16_t application_overhead) {
211   RTC_CHECK(from.ipaddr() == peer_local_addr_);
212   EmulatedIpPacket packet(from, to, std::move(packet_data),
213                           clock_->CurrentTime(), application_overhead);
214   task_queue_->PostTask([this, packet = std::move(packet)]() mutable {
215     RTC_DCHECK_RUN_ON(task_queue_);
216     Timestamp current_time = clock_->CurrentTime();
217     if (stats_.first_packet_sent_time.IsInfinite()) {
218       stats_.first_packet_sent_time = current_time;
219       stats_.first_sent_packet_size = DataSize::Bytes(packet.ip_packet_size());
220     }
221     stats_.last_packet_sent_time = current_time;
222     stats_.packets_sent++;
223     stats_.bytes_sent += DataSize::Bytes(packet.ip_packet_size());
224 
225     router_.OnPacketReceived(std::move(packet));
226   });
227 }
228 
BindReceiver(uint16_t desired_port,EmulatedNetworkReceiverInterface * receiver)229 absl::optional<uint16_t> EmulatedEndpointImpl::BindReceiver(
230     uint16_t desired_port,
231     EmulatedNetworkReceiverInterface* receiver) {
232   rtc::CritScope crit(&receiver_lock_);
233   uint16_t port = desired_port;
234   if (port == 0) {
235     // Because client can specify its own port, next_port_ can be already in
236     // use, so we need to find next available port.
237     int ports_pool_size =
238         std::numeric_limits<uint16_t>::max() - kFirstEphemeralPort + 1;
239     for (int i = 0; i < ports_pool_size; ++i) {
240       uint16_t next_port = NextPort();
241       if (port_to_receiver_.find(next_port) == port_to_receiver_.end()) {
242         port = next_port;
243         break;
244       }
245     }
246   }
247   RTC_CHECK(port != 0) << "Can't find free port for receiver in endpoint "
248                        << id_;
249   bool result = port_to_receiver_.insert({port, receiver}).second;
250   if (!result) {
251     RTC_LOG(INFO) << "Can't bind receiver to used port " << desired_port
252                   << " in endpoint " << id_;
253     return absl::nullopt;
254   }
255   RTC_LOG(INFO) << "New receiver is binded to endpoint " << id_ << " on port "
256                 << port;
257   return port;
258 }
259 
NextPort()260 uint16_t EmulatedEndpointImpl::NextPort() {
261   uint16_t out = next_port_;
262   if (next_port_ == std::numeric_limits<uint16_t>::max()) {
263     next_port_ = kFirstEphemeralPort;
264   } else {
265     next_port_++;
266   }
267   return out;
268 }
269 
UnbindReceiver(uint16_t port)270 void EmulatedEndpointImpl::UnbindReceiver(uint16_t port) {
271   rtc::CritScope crit(&receiver_lock_);
272   port_to_receiver_.erase(port);
273 }
274 
GetPeerLocalAddress() const275 rtc::IPAddress EmulatedEndpointImpl::GetPeerLocalAddress() const {
276   return peer_local_addr_;
277 }
278 
OnPacketReceived(EmulatedIpPacket packet)279 void EmulatedEndpointImpl::OnPacketReceived(EmulatedIpPacket packet) {
280   RTC_DCHECK_RUN_ON(task_queue_);
281   RTC_CHECK(packet.to.ipaddr() == peer_local_addr_)
282       << "Routing error: wrong destination endpoint. Packet.to.ipaddr()=: "
283       << packet.to.ipaddr().ToString()
284       << "; Receiver peer_local_addr_=" << peer_local_addr_.ToString();
285   rtc::CritScope crit(&receiver_lock_);
286   UpdateReceiveStats(packet);
287   auto it = port_to_receiver_.find(packet.to.port());
288   if (it == port_to_receiver_.end()) {
289     // It can happen, that remote peer closed connection, but there still some
290     // packets, that are going to it. It can happen during peer connection close
291     // process: one peer closed connection, second still sending data.
292     RTC_LOG(INFO) << "Drop packet: no receiver registered in " << id_
293                   << " on port " << packet.to.port();
294     stats_.incoming_stats_per_source[packet.from.ipaddr()].packets_dropped++;
295     stats_.incoming_stats_per_source[packet.from.ipaddr()].bytes_dropped +=
296         DataSize::Bytes(packet.ip_packet_size());
297     return;
298   }
299   // Endpoint assumes frequent calls to bind and unbind methods, so it holds
300   // lock during packet processing to ensure that receiver won't be deleted
301   // before call to OnPacketReceived.
302   it->second->OnPacketReceived(std::move(packet));
303 }
304 
Enable()305 void EmulatedEndpointImpl::Enable() {
306   RTC_DCHECK_RUN_ON(&enabled_state_checker_);
307   RTC_CHECK(!is_enabled_);
308   is_enabled_ = true;
309 }
310 
Disable()311 void EmulatedEndpointImpl::Disable() {
312   RTC_DCHECK_RUN_ON(&enabled_state_checker_);
313   RTC_CHECK(is_enabled_);
314   is_enabled_ = false;
315 }
316 
Enabled() const317 bool EmulatedEndpointImpl::Enabled() const {
318   RTC_DCHECK_RUN_ON(&enabled_state_checker_);
319   return is_enabled_;
320 }
321 
stats()322 EmulatedNetworkStats EmulatedEndpointImpl::stats() {
323   RTC_DCHECK_RUN_ON(task_queue_);
324   return stats_;
325 }
326 
UpdateReceiveStats(const EmulatedIpPacket & packet)327 void EmulatedEndpointImpl::UpdateReceiveStats(const EmulatedIpPacket& packet) {
328   RTC_DCHECK_RUN_ON(task_queue_);
329   Timestamp current_time = clock_->CurrentTime();
330   if (stats_.incoming_stats_per_source[packet.from.ipaddr()]
331           .first_packet_received_time.IsInfinite()) {
332     stats_.incoming_stats_per_source[packet.from.ipaddr()]
333         .first_packet_received_time = current_time;
334     stats_.incoming_stats_per_source[packet.from.ipaddr()]
335         .first_received_packet_size = DataSize::Bytes(packet.ip_packet_size());
336   }
337   stats_.incoming_stats_per_source[packet.from.ipaddr()]
338       .last_packet_received_time = current_time;
339   stats_.incoming_stats_per_source[packet.from.ipaddr()].packets_received++;
340   stats_.incoming_stats_per_source[packet.from.ipaddr()].bytes_received +=
341       DataSize::Bytes(packet.ip_packet_size());
342 }
343 
EndpointsContainer(const std::vector<EmulatedEndpointImpl * > & endpoints)344 EndpointsContainer::EndpointsContainer(
345     const std::vector<EmulatedEndpointImpl*>& endpoints)
346     : endpoints_(endpoints) {}
347 
LookupByLocalAddress(const rtc::IPAddress & local_ip) const348 EmulatedEndpointImpl* EndpointsContainer::LookupByLocalAddress(
349     const rtc::IPAddress& local_ip) const {
350   for (auto* endpoint : endpoints_) {
351     rtc::IPAddress peer_local_address = endpoint->GetPeerLocalAddress();
352     if (peer_local_address == local_ip) {
353       return endpoint;
354     }
355   }
356   RTC_CHECK(false) << "No network found for address" << local_ip.ToString();
357 }
358 
HasEndpoint(EmulatedEndpointImpl * endpoint) const359 bool EndpointsContainer::HasEndpoint(EmulatedEndpointImpl* endpoint) const {
360   for (auto* e : endpoints_) {
361     if (e->GetId() == endpoint->GetId()) {
362       return true;
363     }
364   }
365   return false;
366 }
367 
368 std::vector<std::unique_ptr<rtc::Network>>
GetEnabledNetworks() const369 EndpointsContainer::GetEnabledNetworks() const {
370   std::vector<std::unique_ptr<rtc::Network>> networks;
371   for (auto* endpoint : endpoints_) {
372     if (endpoint->Enabled()) {
373       networks.emplace_back(
374           std::make_unique<rtc::Network>(endpoint->network()));
375     }
376   }
377   return networks;
378 }
379 
GetStats() const380 EmulatedNetworkStats EndpointsContainer::GetStats() const {
381   EmulatedNetworkStats stats;
382   for (auto* endpoint : endpoints_) {
383     EmulatedNetworkStats endpoint_stats = endpoint->stats();
384     stats.packets_sent += endpoint_stats.packets_sent;
385     stats.bytes_sent += endpoint_stats.bytes_sent;
386     if (stats.first_packet_sent_time > endpoint_stats.first_packet_sent_time) {
387       stats.first_packet_sent_time = endpoint_stats.first_packet_sent_time;
388       stats.first_sent_packet_size = endpoint_stats.first_sent_packet_size;
389     }
390     if (stats.last_packet_sent_time < endpoint_stats.last_packet_sent_time) {
391       stats.last_packet_sent_time = endpoint_stats.last_packet_sent_time;
392     }
393     for (const rtc::IPAddress& addr : endpoint_stats.local_addresses) {
394       stats.local_addresses.push_back(addr);
395     }
396     for (auto& entry : endpoint_stats.incoming_stats_per_source) {
397       const EmulatedNetworkIncomingStats& source = entry.second;
398       EmulatedNetworkIncomingStats& in_stats =
399           stats.incoming_stats_per_source[entry.first];
400       in_stats.packets_received += source.packets_received;
401       in_stats.bytes_received += source.bytes_received;
402       in_stats.packets_dropped += source.packets_dropped;
403       in_stats.bytes_dropped += source.bytes_dropped;
404       if (in_stats.first_packet_received_time >
405           source.first_packet_received_time) {
406         in_stats.first_packet_received_time = source.first_packet_received_time;
407         in_stats.first_received_packet_size = source.first_received_packet_size;
408       }
409       if (in_stats.last_packet_received_time <
410           source.last_packet_received_time) {
411         in_stats.last_packet_received_time = source.last_packet_received_time;
412       }
413     }
414   }
415   return stats;
416 }
417 
418 }  // namespace webrtc
419