1 // Copyright (C) 2020, Cloudflare, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 //     * Redistributions of source code must retain the above copyright notice,
9 //       this list of conditions and the following disclaimer.
10 //
11 //     * Redistributions in binary form must reproduce the above copyright
12 //       notice, this list of conditions and the following disclaimer in the
13 //       documentation and/or other materials provided with the distribution.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 
27 //! Delivery rate estimation.
28 //!
29 //! This implements the algorithm for estimating delivery rate as described in
30 //! https://tools.ietf.org/html/draft-cheng-iccrg-delivery-rate-estimation-00
31 
32 use std::cmp;
33 
34 use std::time::Duration;
35 use std::time::Instant;
36 
37 use crate::recovery::Sent;
38 
39 #[derive(Default)]
40 pub struct Rate {
41     delivered: usize,
42 
43     delivered_time: Option<Instant>,
44 
45     recent_delivered_packet_sent_time: Option<Instant>,
46 
47     app_limited_at_pkt: usize,
48 
49     rate_sample: RateSample,
50 }
51 
52 impl Rate {
on_packet_sent(&mut self, pkt: &mut Sent, now: Instant)53     pub fn on_packet_sent(&mut self, pkt: &mut Sent, now: Instant) {
54         if self.delivered_time.is_none() {
55             self.delivered_time = Some(now);
56         }
57 
58         if self.recent_delivered_packet_sent_time.is_none() {
59             self.recent_delivered_packet_sent_time = Some(now);
60         }
61 
62         pkt.delivered = self.delivered;
63         pkt.delivered_time = self.delivered_time.unwrap();
64 
65         pkt.recent_delivered_packet_sent_time =
66             self.recent_delivered_packet_sent_time.unwrap();
67 
68         pkt.is_app_limited = self.app_limited_at_pkt > 0;
69     }
70 
on_packet_acked(&mut self, pkt: &Sent, now: Instant)71     pub fn on_packet_acked(&mut self, pkt: &Sent, now: Instant) {
72         self.rate_sample.prior_time = Some(pkt.delivered_time);
73 
74         self.delivered += pkt.size;
75         self.delivered_time = Some(now);
76 
77         if pkt.delivered > self.rate_sample.prior_delivered {
78             self.rate_sample.prior_delivered = pkt.delivered;
79             self.rate_sample.is_app_limited = pkt.is_app_limited;
80 
81             self.rate_sample.send_elapsed =
82                 pkt.time_sent - pkt.recent_delivered_packet_sent_time;
83 
84             self.rate_sample.ack_elapsed = self
85                 .delivered_time
86                 .unwrap()
87                 .duration_since(pkt.delivered_time);
88 
89             self.recent_delivered_packet_sent_time = Some(pkt.time_sent);
90         }
91     }
92 
estimate(&mut self)93     pub fn estimate(&mut self) {
94         if (self.app_limited_at_pkt > 0) &&
95             (self.delivered > self.app_limited_at_pkt)
96         {
97             self.app_limited_at_pkt = 0;
98         }
99 
100         match self.rate_sample.prior_time {
101             Some(_) => {
102                 self.rate_sample.delivered =
103                     self.delivered - self.rate_sample.prior_delivered;
104 
105                 self.rate_sample.interval = cmp::max(
106                     self.rate_sample.send_elapsed,
107                     self.rate_sample.ack_elapsed,
108                 );
109             },
110             None => return,
111         }
112 
113         if self.rate_sample.interval.as_secs_f64() > 0.0 {
114             self.rate_sample.delivery_rate = (self.rate_sample.delivered as f64 /
115                 self.rate_sample.interval.as_secs_f64())
116                 as u64;
117         }
118     }
119 
check_app_limited(&mut self, bytes_in_flight: usize)120     pub fn check_app_limited(&mut self, bytes_in_flight: usize) {
121         let limited = self.delivered + bytes_in_flight;
122         self.app_limited_at_pkt = if limited > 0 { limited } else { 1 };
123     }
124 
delivery_rate(&self) -> u64125     pub fn delivery_rate(&self) -> u64 {
126         self.rate_sample.delivery_rate
127     }
128 }
129 
130 impl std::fmt::Debug for Rate {
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result131     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
132         write!(f, "delivered={:?} ", self.delivered)?;
133 
134         if let Some(t) = self.delivered_time {
135             write!(f, "delivered_time={:?} ", t.elapsed())?;
136         }
137 
138         if let Some(t) = self.recent_delivered_packet_sent_time {
139             write!(f, "recent_delivered_packet_sent_time={:?} ", t.elapsed())?;
140         }
141 
142         write!(f, "app_limited_at_pkt={:?} ", self.app_limited_at_pkt)?;
143 
144         Ok(())
145     }
146 }
147 
148 #[derive(Default)]
149 struct RateSample {
150     delivery_rate: u64,
151 
152     is_app_limited: bool,
153 
154     interval: Duration,
155 
156     delivered: usize,
157 
158     prior_delivered: usize,
159 
160     prior_time: Option<Instant>,
161 
162     send_elapsed: Duration,
163 
164     ack_elapsed: Duration,
165 }
166 
167 impl std::fmt::Debug for RateSample {
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result168     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
169         write!(f, "delivery_rate={:?} ", self.delivery_rate)?;
170         write!(f, "interval={:?} ", self.interval)?;
171         write!(f, "delivered={:?} ", self.delivered)?;
172         write!(f, "prior_delivered={:?} ", self.prior_delivered)?;
173         write!(f, "send_elapsed={:?} ", self.send_elapsed)?;
174         if let Some(t) = self.prior_time {
175             write!(f, "prior_time={:?} ", t.elapsed())?;
176         }
177         write!(f, "ack_elapsed={:?}", self.ack_elapsed)?;
178 
179         Ok(())
180     }
181 }
182 
183 #[cfg(test)]
184 mod tests {
185     use super::*;
186 
187     use crate::recovery::*;
188 
189     #[test]
rate_check()190     fn rate_check() {
191         let config = Config::new(0xbabababa).unwrap();
192         let mut recovery = Recovery::new(&config);
193 
194         let mut pkt_1 = Sent {
195             pkt_num: 0,
196             frames: vec![],
197             time_sent: Instant::now(),
198             time_acked: None,
199             time_lost: None,
200             size: 1200,
201             ack_eliciting: true,
202             in_flight: true,
203             delivered: 0,
204             delivered_time: Instant::now(),
205             recent_delivered_packet_sent_time: Instant::now(),
206             is_app_limited: false,
207             has_data: false,
208         };
209 
210         recovery
211             .delivery_rate
212             .on_packet_sent(&mut pkt_1, Instant::now());
213         std::thread::sleep(Duration::from_millis(50));
214         recovery
215             .delivery_rate
216             .on_packet_acked(&pkt_1, Instant::now());
217 
218         let mut pkt_2 = Sent {
219             pkt_num: 1,
220             frames: vec![],
221             time_sent: Instant::now(),
222             time_acked: None,
223             time_lost: None,
224             size: 1200,
225             ack_eliciting: true,
226             in_flight: true,
227             delivered: 0,
228             delivered_time: Instant::now(),
229             recent_delivered_packet_sent_time: Instant::now(),
230             is_app_limited: false,
231             has_data: false,
232         };
233 
234         recovery
235             .delivery_rate
236             .on_packet_sent(&mut pkt_2, Instant::now());
237         std::thread::sleep(Duration::from_millis(50));
238         recovery
239             .delivery_rate
240             .on_packet_acked(&pkt_2, Instant::now());
241         recovery.delivery_rate.estimate();
242 
243         assert!(recovery.delivery_rate() > 0);
244     }
245 
246     #[test]
app_limited_check()247     fn app_limited_check() {
248         let config = Config::new(0xbabababa).unwrap();
249         let mut recvry = Recovery::new(&config);
250 
251         let mut pkt_1 = Sent {
252             pkt_num: 0,
253             frames: vec![],
254             time_sent: Instant::now(),
255             time_acked: None,
256             time_lost: None,
257             size: 1200,
258             ack_eliciting: true,
259             in_flight: true,
260             delivered: 0,
261             delivered_time: Instant::now(),
262             recent_delivered_packet_sent_time: Instant::now(),
263             is_app_limited: false,
264             has_data: false,
265         };
266 
267         recvry
268             .delivery_rate
269             .on_packet_sent(&mut pkt_1, Instant::now());
270         std::thread::sleep(Duration::from_millis(50));
271         recvry.delivery_rate.on_packet_acked(&pkt_1, Instant::now());
272 
273         let mut pkt_2 = Sent {
274             pkt_num: 1,
275             frames: vec![],
276             time_sent: Instant::now(),
277             time_acked: None,
278             time_lost: None,
279             size: 1200,
280             ack_eliciting: true,
281             in_flight: true,
282             delivered: 0,
283             delivered_time: Instant::now(),
284             recent_delivered_packet_sent_time: Instant::now(),
285             is_app_limited: false,
286             has_data: false,
287         };
288 
289         recvry.app_limited = true;
290         recvry
291             .delivery_rate
292             .check_app_limited(recvry.bytes_in_flight);
293         recvry
294             .delivery_rate
295             .on_packet_sent(&mut pkt_2, Instant::now());
296         std::thread::sleep(Duration::from_millis(50));
297         recvry.delivery_rate.on_packet_acked(&pkt_2, Instant::now());
298         recvry.delivery_rate.estimate();
299 
300         assert_eq!(recvry.delivery_rate.app_limited_at_pkt, 0);
301     }
302 }
303