1 // Copyright (C) 2018-2019, Cloudflare, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 //     * Redistributions of source code must retain the above copyright notice,
9 //       this list of conditions and the following disclaimer.
10 //
11 //     * Redistributions in binary form must reproduce the above copyright
12 //       notice, this list of conditions and the following disclaimer in the
13 //       documentation and/or other materials provided with the distribution.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 
27 use std::cmp;
28 
29 use std::collections::hash_map;
30 use std::collections::BTreeMap;
31 use std::collections::BinaryHeap;
32 use std::collections::HashMap;
33 use std::collections::HashSet;
34 use std::collections::VecDeque;
35 
36 use crate::Error;
37 use crate::Result;
38 
39 use crate::ranges;
40 
41 const DEFAULT_URGENCY: u8 = 127;
42 
43 /// Keeps track of QUIC streams and enforces stream limits.
44 #[derive(Default)]
45 pub struct StreamMap {
46     /// Map of streams indexed by stream ID.
47     streams: HashMap<u64, Stream>,
48 
49     /// Set of streams that were completed and garbage collected.
50     ///
51     /// Instead of keeping the full stream state forever, we collect completed
52     /// streams to save memory, but we still need to keep track of previously
53     /// created streams, to prevent peers from re-creating them.
54     collected: HashSet<u64>,
55 
56     /// Peer's maximum bidirectional stream count limit.
57     peer_max_streams_bidi: u64,
58 
59     /// Peer's maximum unidirectional stream count limit.
60     peer_max_streams_uni: u64,
61 
62     /// The total number of bidirectional streams opened by the peer.
63     peer_opened_streams_bidi: u64,
64 
65     /// The total number of unidirectional streams opened by the peer.
66     peer_opened_streams_uni: u64,
67 
68     /// Local maximum bidirectional stream count limit.
69     local_max_streams_bidi: u64,
70     local_max_streams_bidi_next: u64,
71 
72     /// Local maximum unidirectional stream count limit.
73     local_max_streams_uni: u64,
74     local_max_streams_uni_next: u64,
75 
76     /// The total number of bidirectional streams opened by the local endpoint.
77     local_opened_streams_bidi: u64,
78 
79     /// The total number of unidirectional streams opened by the local endpoint.
80     local_opened_streams_uni: u64,
81 
82     /// Queue of stream IDs corresponding to streams that have buffered data
83     /// ready to be sent to the peer. This also implies that the stream has
84     /// enough flow control credits to send at least some of that data.
85     ///
86     /// Streams are grouped by their priority, where each urgency level has two
87     /// queues, one for non-incremental streams and one for incremental ones.
88     ///
89     /// Streams with lower urgency level are scheduled first, and within the
90     /// same urgency level Non-incremental streams are scheduled first, in the
91     /// order of their stream IDs, and incremental streams are scheduled in a
92     /// round-robin fashion after all non-incremental streams have been flushed.
93     flushable: BTreeMap<u8, (BinaryHeap<std::cmp::Reverse<u64>>, VecDeque<u64>)>,
94 
95     /// Set of stream IDs corresponding to streams that have outstanding data
96     /// to read. This is used to generate a `StreamIter` of streams without
97     /// having to iterate over the full list of streams.
98     readable: HashSet<u64>,
99 
100     /// Set of stream IDs corresponding to streams that have enough flow control
101     /// capacity to be written to, and is not finished. This is used to generate
102     /// a `StreamIter` of streams without having to iterate over the full list
103     /// of streams.
104     writable: HashSet<u64>,
105 
106     /// Set of stream IDs corresponding to streams that are almost out of flow
107     /// control credit and need to send MAX_STREAM_DATA. This is used to
108     /// generate a `StreamIter` of streams without having to iterate over the
109     /// full list of streams.
110     almost_full: HashSet<u64>,
111 
112     /// Set of stream IDs corresponding to streams that are blocked. The value
113     /// of the map elements represents the offset of the stream at which the
114     /// blocking occurred.
115     blocked: HashMap<u64, u64>,
116 }
117 
118 impl StreamMap {
new(max_streams_bidi: u64, max_streams_uni: u64) -> StreamMap119     pub fn new(max_streams_bidi: u64, max_streams_uni: u64) -> StreamMap {
120         StreamMap {
121             local_max_streams_bidi: max_streams_bidi,
122             local_max_streams_bidi_next: max_streams_bidi,
123 
124             local_max_streams_uni: max_streams_uni,
125             local_max_streams_uni_next: max_streams_uni,
126 
127             ..StreamMap::default()
128         }
129     }
130 
131     /// Returns the stream with the given ID if it exists.
get(&self, id: u64) -> Option<&Stream>132     pub fn get(&self, id: u64) -> Option<&Stream> {
133         self.streams.get(&id)
134     }
135 
136     /// Returns the mutable stream with the given ID if it exists.
get_mut(&mut self, id: u64) -> Option<&mut Stream>137     pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream> {
138         self.streams.get_mut(&id)
139     }
140 
141     /// Returns the mutable stream with the given ID if it exists, or creates
142     /// a new one otherwise.
143     ///
144     /// The `local` parameter indicates whether the stream's creation was
145     /// requested by the local application rather than the peer, and is
146     /// used to validate the requested stream ID, and to select the initial
147     /// flow control values from the local and remote transport parameters
148     /// (also passed as arguments).
149     ///
150     /// This also takes care of enforcing both local and the peer's stream
151     /// count limits. If one of these limits is violated, the `StreamLimit`
152     /// error is returned.
get_or_create( &mut self, id: u64, local_params: &crate::TransportParams, peer_params: &crate::TransportParams, local: bool, is_server: bool, ) -> Result<&mut Stream>153     pub(crate) fn get_or_create(
154         &mut self, id: u64, local_params: &crate::TransportParams,
155         peer_params: &crate::TransportParams, local: bool, is_server: bool,
156     ) -> Result<&mut Stream> {
157         let stream = match self.streams.entry(id) {
158             hash_map::Entry::Vacant(v) => {
159                 // Stream has already been closed and garbage collected.
160                 if self.collected.contains(&id) {
161                     return Err(Error::Done);
162                 }
163 
164                 if local != is_local(id, is_server) {
165                     return Err(Error::InvalidStreamState);
166                 }
167 
168                 let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
169                     // Locally-initiated bidirectional stream.
170                     (true, true) => (
171                         local_params.initial_max_stream_data_bidi_local,
172                         peer_params.initial_max_stream_data_bidi_remote,
173                     ),
174 
175                     // Locally-initiated unidirectional stream.
176                     (true, false) => (0, peer_params.initial_max_stream_data_uni),
177 
178                     // Remotely-initiated bidirectional stream.
179                     (false, true) => (
180                         local_params.initial_max_stream_data_bidi_remote,
181                         peer_params.initial_max_stream_data_bidi_local,
182                     ),
183 
184                     // Remotely-initiated unidirectional stream.
185                     (false, false) =>
186                         (local_params.initial_max_stream_data_uni, 0),
187                 };
188 
189                 // Enforce stream count limits.
190                 match (is_local(id, is_server), is_bidi(id)) {
191                     (true, true) => {
192                         if self.local_opened_streams_bidi >=
193                             self.peer_max_streams_bidi
194                         {
195                             return Err(Error::StreamLimit);
196                         }
197 
198                         self.local_opened_streams_bidi += 1;
199                     },
200 
201                     (true, false) => {
202                         if self.local_opened_streams_uni >=
203                             self.peer_max_streams_uni
204                         {
205                             return Err(Error::StreamLimit);
206                         }
207 
208                         self.local_opened_streams_uni += 1;
209                     },
210 
211                     (false, true) => {
212                         if self.peer_opened_streams_bidi >=
213                             self.local_max_streams_bidi
214                         {
215                             return Err(Error::StreamLimit);
216                         }
217 
218                         self.peer_opened_streams_bidi += 1;
219                     },
220 
221                     (false, false) => {
222                         if self.peer_opened_streams_uni >=
223                             self.local_max_streams_uni
224                         {
225                             return Err(Error::StreamLimit);
226                         }
227 
228                         self.peer_opened_streams_uni += 1;
229                     },
230                 };
231 
232                 let s = Stream::new(max_rx_data, max_tx_data, is_bidi(id), local);
233                 v.insert(s)
234             },
235 
236             hash_map::Entry::Occupied(v) => v.into_mut(),
237         };
238 
239         // Stream might already be writable due to initial flow control limits.
240         if stream.is_writable() {
241             self.writable.insert(id);
242         }
243 
244         Ok(stream)
245     }
246 
247     /// Pushes the stream ID to the back of the flushable streams queue with
248     /// the specified urgency.
249     ///
250     /// Note that the caller is responsible for checking that the specified
251     /// stream ID was not in the queue already before calling this.
252     ///
253     /// Queueing a stream multiple times simultaneously means that it might be
254     /// unfairly scheduled more often than other streams, and might also cause
255     /// spurious cycles through the queue, so it should be avoided.
push_flushable(&mut self, stream_id: u64, urgency: u8, incr: bool)256     pub fn push_flushable(&mut self, stream_id: u64, urgency: u8, incr: bool) {
257         // Push the element to the back of the queue corresponding to the given
258         // urgency. If the queue doesn't exist yet, create it first.
259         let queues = self
260             .flushable
261             .entry(urgency)
262             .or_insert_with(|| (BinaryHeap::new(), VecDeque::new()));
263 
264         if !incr {
265             // Non-incremental streams are scheduled in order of their stream ID.
266             queues.0.push(std::cmp::Reverse(stream_id))
267         } else {
268             // Incremental streams are scheduled in a round-robin fashion.
269             queues.1.push_back(stream_id)
270         };
271     }
272 
273     /// Removes and returns the first stream ID from the flushable streams
274     /// queue with the specified urgency.
275     ///
276     /// Note that if the stream is still flushable after sending some of its
277     /// outstanding data, it needs to be added back to the queue.
pop_flushable(&mut self) -> Option<u64>278     pub fn pop_flushable(&mut self) -> Option<u64> {
279         // Remove the first element from the queue corresponding to the lowest
280         // urgency that has elements.
281         let (node, clear) =
282             if let Some((urgency, queues)) = self.flushable.iter_mut().next() {
283                 let node = if !queues.0.is_empty() {
284                     queues.0.pop().map(|x| x.0)
285                 } else {
286                     queues.1.pop_front()
287                 };
288 
289                 let clear = if queues.0.is_empty() && queues.1.is_empty() {
290                     Some(*urgency)
291                 } else {
292                     None
293                 };
294 
295                 (node, clear)
296             } else {
297                 (None, None)
298             };
299 
300         // Remove the queue from the list of queues if it is now empty, so that
301         // the next time `pop_flushable()` is called the next queue with elements
302         // is used.
303         if let Some(urgency) = &clear {
304             self.flushable.remove(urgency);
305         }
306 
307         node
308     }
309 
310     /// Adds or removes the stream ID to/from the readable streams set.
311     ///
312     /// If the stream was already in the list, this does nothing.
mark_readable(&mut self, stream_id: u64, readable: bool)313     pub fn mark_readable(&mut self, stream_id: u64, readable: bool) {
314         if readable {
315             self.readable.insert(stream_id);
316         } else {
317             self.readable.remove(&stream_id);
318         }
319     }
320 
321     /// Adds or removes the stream ID to/from the writable streams set.
322     ///
323     /// This should also be called anytime a new stream is created, in addition
324     /// to when an existing stream becomes writable (or stops being writable).
325     ///
326     /// If the stream was already in the list, this does nothing.
mark_writable(&mut self, stream_id: u64, writable: bool)327     pub fn mark_writable(&mut self, stream_id: u64, writable: bool) {
328         if writable {
329             self.writable.insert(stream_id);
330         } else {
331             self.writable.remove(&stream_id);
332         }
333     }
334 
335     /// Adds or removes the stream ID to/from the almost full streams set.
336     ///
337     /// If the stream was already in the list, this does nothing.
mark_almost_full(&mut self, stream_id: u64, almost_full: bool)338     pub fn mark_almost_full(&mut self, stream_id: u64, almost_full: bool) {
339         if almost_full {
340             self.almost_full.insert(stream_id);
341         } else {
342             self.almost_full.remove(&stream_id);
343         }
344     }
345 
346     /// Adds or removes the stream ID to/from the blocked streams set with the
347     /// given offset value.
348     ///
349     /// If the stream was already in the list, this does nothing.
mark_blocked(&mut self, stream_id: u64, blocked: bool, off: u64)350     pub fn mark_blocked(&mut self, stream_id: u64, blocked: bool, off: u64) {
351         if blocked {
352             self.blocked.insert(stream_id, off);
353         } else {
354             self.blocked.remove(&stream_id);
355         }
356     }
357 
358     /// Updates the peer's maximum bidirectional stream count limit.
update_peer_max_streams_bidi(&mut self, v: u64)359     pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
360         self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
361     }
362 
363     /// Updates the peer's maximum unidirectional stream count limit.
update_peer_max_streams_uni(&mut self, v: u64)364     pub fn update_peer_max_streams_uni(&mut self, v: u64) {
365         self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
366     }
367 
368     /// Commits the new max_streams_bidi limit.
update_max_streams_bidi(&mut self)369     pub fn update_max_streams_bidi(&mut self) {
370         self.local_max_streams_bidi = self.local_max_streams_bidi_next;
371     }
372 
373     /// Returns the new max_streams_bidi limit.
max_streams_bidi_next(&mut self) -> u64374     pub fn max_streams_bidi_next(&mut self) -> u64 {
375         self.local_max_streams_bidi_next
376     }
377 
378     /// Commits the new max_streams_uni limit.
update_max_streams_uni(&mut self)379     pub fn update_max_streams_uni(&mut self) {
380         self.local_max_streams_uni = self.local_max_streams_uni_next;
381     }
382 
383     /// Returns the new max_streams_uni limit.
max_streams_uni_next(&mut self) -> u64384     pub fn max_streams_uni_next(&mut self) -> u64 {
385         self.local_max_streams_uni_next
386     }
387 
388     /// Drops completed stream.
389     ///
390     /// This should only be called when Stream::is_complete() returns true for
391     /// the given stream.
collect(&mut self, stream_id: u64, local: bool)392     pub fn collect(&mut self, stream_id: u64, local: bool) {
393         if !local {
394             // If the stream was created by the peer, give back a max streams
395             // credit.
396             if is_bidi(stream_id) {
397                 self.local_max_streams_bidi_next =
398                     self.local_max_streams_bidi_next.saturating_add(1);
399             } else {
400                 self.local_max_streams_uni_next =
401                     self.local_max_streams_uni_next.saturating_add(1);
402             }
403         }
404 
405         self.streams.remove(&stream_id);
406         self.collected.insert(stream_id);
407     }
408 
409     /// Creates an iterator over streams that have outstanding data to read.
readable(&self) -> StreamIter410     pub fn readable(&self) -> StreamIter {
411         StreamIter::from(&self.readable)
412     }
413 
414     /// Creates an iterator over streams that can be written to.
writable(&self) -> StreamIter415     pub fn writable(&self) -> StreamIter {
416         StreamIter::from(&self.writable)
417     }
418 
419     /// Creates an iterator over streams that need to send MAX_STREAM_DATA.
almost_full(&self) -> StreamIter420     pub fn almost_full(&self) -> StreamIter {
421         StreamIter::from(&self.almost_full)
422     }
423 
424     /// Creates an iterator over streams that need to send STREAM_DATA_BLOCKED.
blocked(&self) -> hash_map::Iter<u64, u64>425     pub fn blocked(&self) -> hash_map::Iter<u64, u64> {
426         self.blocked.iter()
427     }
428 
429     /// Returns true if there are any streams that have data to write.
has_flushable(&self) -> bool430     pub fn has_flushable(&self) -> bool {
431         !self.flushable.is_empty()
432     }
433 
434     /// Returns true if there are any streams that need to update the local
435     /// flow control limit.
has_almost_full(&self) -> bool436     pub fn has_almost_full(&self) -> bool {
437         !self.almost_full.is_empty()
438     }
439 
440     /// Returns true if there are any streams that are blocked.
has_blocked(&self) -> bool441     pub fn has_blocked(&self) -> bool {
442         !self.blocked.is_empty()
443     }
444 
445     /// Returns true if the max bidirectional streams count needs to be updated
446     /// by sending a MAX_STREAMS frame to the peer.
should_update_max_streams_bidi(&self) -> bool447     pub fn should_update_max_streams_bidi(&self) -> bool {
448         self.local_max_streams_bidi_next != self.local_max_streams_bidi &&
449             self.local_max_streams_bidi_next / 2 >
450                 self.local_max_streams_bidi - self.peer_opened_streams_bidi
451     }
452 
453     /// Returns true if the max unidirectional streams count needs to be updated
454     /// by sending a MAX_STREAMS frame to the peer.
should_update_max_streams_uni(&self) -> bool455     pub fn should_update_max_streams_uni(&self) -> bool {
456         self.local_max_streams_uni_next != self.local_max_streams_uni &&
457             self.local_max_streams_uni_next / 2 >
458                 self.local_max_streams_uni - self.peer_opened_streams_uni
459     }
460 
461     /// Returns the number of active streams in the map.
462     #[cfg(test)]
len(&self) -> usize463     pub fn len(&self) -> usize {
464         self.streams.len()
465     }
466 }
467 
468 /// A QUIC stream.
469 #[derive(Default)]
470 pub struct Stream {
471     /// Receive-side stream buffer.
472     pub recv: RecvBuf,
473 
474     /// Send-side stream buffer.
475     pub send: SendBuf,
476 
477     /// Whether the stream is bidirectional.
478     pub bidi: bool,
479 
480     /// Whether the stream was created by the local endpoint.
481     pub local: bool,
482 
483     /// Application data.
484     pub data: Option<Box<dyn Send + std::any::Any>>,
485 
486     /// The stream's urgency (lower is better). Default is `DEFAULT_URGENCY`.
487     pub urgency: u8,
488 
489     /// Whether the stream can be flushed incrementally. Default is `true`.
490     pub incremental: bool,
491 }
492 
493 impl Stream {
494     /// Creates a new stream with the given flow control limits.
new( max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool, ) -> Stream495     pub fn new(
496         max_rx_data: u64, max_tx_data: u64, bidi: bool, local: bool,
497     ) -> Stream {
498         Stream {
499             recv: RecvBuf::new(max_rx_data),
500             send: SendBuf::new(max_tx_data),
501             bidi,
502             local,
503             data: None,
504             urgency: DEFAULT_URGENCY,
505             incremental: true,
506         }
507     }
508 
509     /// Returns true if the stream has data to read.
is_readable(&self) -> bool510     pub fn is_readable(&self) -> bool {
511         self.recv.ready()
512     }
513 
514     /// Returns true if the stream has enough flow control capacity to be
515     /// written to, and is not finished.
is_writable(&self) -> bool516     pub fn is_writable(&self) -> bool {
517         !self.send.shutdown &&
518             !self.send.is_fin() &&
519             self.send.off < self.send.max_data
520     }
521 
522     /// Returns true if the stream has data to send and is allowed to send at
523     /// least some of it.
is_flushable(&self) -> bool524     pub fn is_flushable(&self) -> bool {
525         self.send.ready() && self.send.off_front() < self.send.max_data
526     }
527 
528     /// Returns true if the stream is complete.
529     ///
530     /// For bidirectional streams this happens when both the receive and send
531     /// sides are complete. That is when all incoming data has been read by the
532     /// application, and when all outgoing data has been acked by the peer.
533     ///
534     /// For unidirectional streams this happens when either the receive or send
535     /// side is complete, depending on whether the stream was created locally
536     /// or not.
is_complete(&self) -> bool537     pub fn is_complete(&self) -> bool {
538         match (self.bidi, self.local) {
539             // For bidirectional streams we need to check both receive and send
540             // sides for completion.
541             (true, _) => self.recv.is_fin() && self.send.is_complete(),
542 
543             // For unidirectional streams generated locally, we only need to
544             // check the send side for completion.
545             (false, true) => self.send.is_complete(),
546 
547             // For unidirectional streams generated by the peer, we only need
548             // to check the receive side for completion.
549             (false, false) => self.recv.is_fin(),
550         }
551     }
552 }
553 
554 /// Returns true if the stream was created locally.
is_local(stream_id: u64, is_server: bool) -> bool555 pub fn is_local(stream_id: u64, is_server: bool) -> bool {
556     (stream_id & 0x1) == (is_server as u64)
557 }
558 
559 /// Returns true if the stream is bidirectional.
is_bidi(stream_id: u64) -> bool560 pub fn is_bidi(stream_id: u64) -> bool {
561     (stream_id & 0x2) == 0
562 }
563 
564 /// An iterator over QUIC streams.
565 #[derive(Default)]
566 pub struct StreamIter {
567     streams: Vec<u64>,
568 }
569 
570 impl StreamIter {
from(streams: &HashSet<u64>) -> Self571     fn from(streams: &HashSet<u64>) -> Self {
572         StreamIter {
573             streams: streams.iter().copied().collect(),
574         }
575     }
576 }
577 
578 impl Iterator for StreamIter {
579     type Item = u64;
580 
next(&mut self) -> Option<Self::Item>581     fn next(&mut self) -> Option<Self::Item> {
582         self.streams.pop()
583     }
584 }
585 
586 impl ExactSizeIterator for StreamIter {
len(&self) -> usize587     fn len(&self) -> usize {
588         self.streams.len()
589     }
590 }
591 
592 /// Receive-side stream buffer.
593 ///
594 /// Stream data received by the peer is buffered in a list of data chunks
595 /// ordered by offset in ascending order. Contiguous data can then be read
596 /// into a slice.
597 #[derive(Debug, Default)]
598 pub struct RecvBuf {
599     /// Chunks of data received from the peer that have not yet been read by
600     /// the application, ordered by offset.
601     data: BinaryHeap<RangeBuf>,
602 
603     /// The lowest data offset that has yet to be read by the application.
604     off: u64,
605 
606     /// The total length of data received on this stream.
607     len: u64,
608 
609     /// The maximum offset the peer is allowed to send us.
610     max_data: u64,
611 
612     /// The updated maximum offset the peer is allowed to send us.
613     max_data_next: u64,
614 
615     /// The final stream offset received from the peer, if any.
616     fin_off: Option<u64>,
617 
618     /// Whether incoming data is validated but not buffered.
619     drain: bool,
620 }
621 
622 impl RecvBuf {
623     /// Creates a new receive buffer.
new(max_data: u64) -> RecvBuf624     fn new(max_data: u64) -> RecvBuf {
625         RecvBuf {
626             max_data,
627             max_data_next: max_data,
628             ..RecvBuf::default()
629         }
630     }
631 
632     /// Inserts the given chunk of data in the buffer.
633     ///
634     /// This also takes care of enforcing stream flow control limits, as well
635     /// as handling incoming data that overlaps data that is already in the
636     /// buffer.
push(&mut self, buf: RangeBuf) -> Result<()>637     pub fn push(&mut self, buf: RangeBuf) -> Result<()> {
638         if buf.max_off() > self.max_data {
639             return Err(Error::FlowControl);
640         }
641 
642         if let Some(fin_off) = self.fin_off {
643             // Stream's size is known, forbid data beyond that point.
644             if buf.max_off() > fin_off {
645                 return Err(Error::FinalSize);
646             }
647 
648             // Stream's size is already known, forbid changing it.
649             if buf.fin() && fin_off != buf.max_off() {
650                 return Err(Error::FinalSize);
651             }
652         }
653 
654         // Stream's known size is lower than data already received.
655         if buf.fin() && buf.max_off() < self.len {
656             return Err(Error::FinalSize);
657         }
658 
659         // We already saved the final offset, so there's nothing else we
660         // need to keep from the RangeBuf if it's empty.
661         if self.fin_off.is_some() && buf.is_empty() {
662             return Ok(());
663         }
664 
665         // No need to process an empty buffer with the fin flag, if we already
666         // know the final size.
667         if buf.fin() && buf.is_empty() && self.fin_off.is_some() {
668             return Ok(());
669         }
670 
671         if buf.fin() {
672             self.fin_off = Some(buf.max_off());
673         }
674 
675         // No need to store empty buffer that doesn't carry the fin flag.
676         if !buf.fin() && buf.is_empty() {
677             return Ok(());
678         }
679 
680         // Check if data is fully duplicate, that is the buffer's max offset is
681         // lower or equal to the offset already stored in the recv buffer.
682         if self.off >= buf.max_off() {
683             // An exception is applied to empty range buffers, because an empty
684             // buffer's max offset matches the max offset of the recv buffer.
685             //
686             // By this point all spurious empty buffers should have already been
687             // discarded, so allowing empty buffers here should be safe.
688             if !buf.is_empty() {
689                 return Ok(());
690             }
691         }
692 
693         if self.drain {
694             return Ok(());
695         }
696 
697         let mut tmp_buf = Some(buf);
698 
699         while let Some(mut buf) = tmp_buf {
700             tmp_buf = None;
701 
702             // Discard incoming data below current stream offset. Bytes up to
703             // `self.off` have already been received so we should not buffer
704             // them again. This is also important to make sure `ready()` doesn't
705             // get stuck when a buffer with lower offset than the stream's is
706             // buffered.
707             if self.off > buf.off() {
708                 buf = buf.split_off((self.off - buf.off()) as usize);
709             }
710 
711             for b in &self.data {
712                 // New buffer is fully contained in existing buffer.
713                 if buf.off() >= b.off() && buf.max_off() <= b.max_off() {
714                     return Ok(());
715                 }
716 
717                 // New buffer's start overlaps existing buffer.
718                 if buf.off() >= b.off() && buf.off() < b.max_off() {
719                     buf = buf.split_off((b.max_off() - buf.off()) as usize);
720                 }
721 
722                 // New buffer's end overlaps existing buffer.
723                 if buf.off() < b.off() && buf.max_off() > b.off() {
724                     tmp_buf = Some(buf.split_off((b.off() - buf.off()) as usize));
725                 }
726             }
727 
728             self.len = cmp::max(self.len, buf.max_off());
729 
730             self.data.push(buf);
731         }
732 
733         Ok(())
734     }
735 
736     /// Writes data from the receive buffer into the given output buffer.
737     ///
738     /// Only contiguous data is written to the output buffer, starting from
739     /// offset 0. The offset is incremented as data is read out of the receive
740     /// buffer into the application buffer. If there is no data at the expected
741     /// read offset, the `Done` error is returned.
742     ///
743     /// On success the amount of data read, and a flag indicating if there is
744     /// no more data in the buffer, are returned as a tuple.
pop(&mut self, out: &mut [u8]) -> Result<(usize, bool)>745     pub fn pop(&mut self, out: &mut [u8]) -> Result<(usize, bool)> {
746         let mut len = 0;
747         let mut cap = out.len();
748 
749         if !self.ready() {
750             return Err(Error::Done);
751         }
752 
753         while cap > 0 && self.ready() {
754             let mut buf = match self.data.peek_mut() {
755                 Some(v) => v,
756 
757                 None => break,
758             };
759 
760             let buf_len = cmp::min(buf.len(), cap);
761 
762             out[len..len + buf_len].copy_from_slice(&buf[..buf_len]);
763 
764             self.off += buf_len as u64;
765 
766             len += buf_len;
767             cap -= buf_len;
768 
769             if buf_len < buf.len() {
770                 buf.consume(buf_len);
771 
772                 // We reached the maximum capacity, so end here.
773                 break;
774             }
775 
776             std::collections::binary_heap::PeekMut::pop(buf);
777         }
778 
779         self.max_data_next = self.max_data_next.saturating_add(len as u64);
780 
781         Ok((len, self.is_fin()))
782     }
783 
784     /// Resets the stream at the given offset.
reset(&mut self, final_size: u64) -> Result<usize>785     pub fn reset(&mut self, final_size: u64) -> Result<usize> {
786         // Stream's size is already known, forbid changing it.
787         if let Some(fin_off) = self.fin_off {
788             if fin_off != final_size {
789                 return Err(Error::FinalSize);
790             }
791         }
792 
793         // Stream's known size is lower than data already received.
794         if final_size < self.len {
795             return Err(Error::FinalSize);
796         }
797 
798         self.fin_off = Some(final_size);
799 
800         // Return how many bytes need to be removed from the connection flow
801         // control.
802         Ok((final_size - self.len) as usize)
803     }
804 
805     /// Commits the new max_data limit.
update_max_data(&mut self)806     pub fn update_max_data(&mut self) {
807         self.max_data = self.max_data_next;
808     }
809 
810     /// Return the new max_data limit.
max_data_next(&mut self) -> u64811     pub fn max_data_next(&mut self) -> u64 {
812         self.max_data_next
813     }
814 
815     /// Shuts down receiving data.
shutdown(&mut self) -> Result<()>816     pub fn shutdown(&mut self) -> Result<()> {
817         if self.drain {
818             return Err(Error::Done);
819         }
820 
821         self.drain = true;
822 
823         self.data.clear();
824 
825         Ok(())
826     }
827 
828     /// Returns the lowest offset of data buffered.
829     #[allow(dead_code)]
off_front(&self) -> u64830     pub fn off_front(&self) -> u64 {
831         self.off
832     }
833 
834     /// Returns true if we need to update the local flow control limit.
almost_full(&self) -> bool835     pub fn almost_full(&self) -> bool {
836         // Send MAX_STREAM_DATA when the new limit is at least double the
837         // amount of data that can be received before blocking.
838         self.fin_off.is_none() &&
839             self.max_data_next != self.max_data &&
840             self.max_data_next / 2 > self.max_data - self.len
841     }
842 
843     /// Returns the largest offset ever received.
max_off(&self) -> u64844     pub fn max_off(&self) -> u64 {
845         self.len
846     }
847 
848     /// Returns true if the receive-side of the stream is complete.
849     ///
850     /// This happens when the stream's receive final size is known, and the
851     /// application has read all data from the stream.
is_fin(&self) -> bool852     pub fn is_fin(&self) -> bool {
853         if self.fin_off == Some(self.off) {
854             return true;
855         }
856 
857         false
858     }
859 
860     /// Returns true if the stream has data to be read.
ready(&self) -> bool861     fn ready(&self) -> bool {
862         let buf = match self.data.peek() {
863             Some(v) => v,
864 
865             None => return false,
866         };
867 
868         buf.off() == self.off
869     }
870 }
871 
872 /// Send-side stream buffer.
873 ///
874 /// Stream data scheduled to be sent to the peer is buffered in a list of data
875 /// chunks ordered by offset in ascending order. Contiguous data can then be
876 /// read into a slice.
877 ///
878 /// By default, new data is appended at the end of the stream, but data can be
879 /// inserted at the start of the buffer (this is to allow data that needs to be
880 /// retransmitted to be re-buffered).
881 #[derive(Debug, Default)]
882 pub struct SendBuf {
883     /// Chunks of data to be sent, ordered by offset.
884     data: BinaryHeap<RangeBuf>,
885 
886     /// The maximum offset of data buffered in the stream.
887     off: u64,
888 
889     /// The amount of data that was ever written to this stream.
890     len: u64,
891 
892     /// The maximum offset we are allowed to send to the peer.
893     max_data: u64,
894 
895     /// The final stream offset written to the stream, if any.
896     fin_off: Option<u64>,
897 
898     /// Whether the stream's send-side has been shut down.
899     shutdown: bool,
900 
901     /// Ranges of data offsets that have been acked.
902     acked: ranges::RangeSet,
903 }
904 
905 impl SendBuf {
906     /// Creates a new send buffer.
new(max_data: u64) -> SendBuf907     fn new(max_data: u64) -> SendBuf {
908         SendBuf {
909             max_data,
910             ..SendBuf::default()
911         }
912     }
913 
914     /// Inserts the given slice of data at the end of the buffer.
915     ///
916     /// The number of bytes that were actually stored in the buffer is returned
917     /// (this may be lower than the size of the input buffer, in case of partial
918     /// writes).
push_slice( &mut self, mut data: &[u8], mut fin: bool, ) -> Result<usize>919     pub fn push_slice(
920         &mut self, mut data: &[u8], mut fin: bool,
921     ) -> Result<usize> {
922         if self.shutdown {
923             // Since we won't write any more data anyway, pretend that we sent
924             // all data that was passed in.
925             return Ok(data.len());
926         }
927 
928         if data.is_empty() {
929             // Create a dummy range buffer, in order to propagate the `fin` flag
930             // into `RangeBuf::push()`. This will be discarded later on.
931             let buf = RangeBuf::from(&[], self.off, fin);
932 
933             return self.push(buf).map(|_| 0);
934         }
935 
936         if data.len() > self.cap() {
937             // Truncate the input buffer according to the stream's capacity.
938             let len = self.cap();
939             data = &data[..len];
940 
941             // We are not buffering the full input, so clear the fin flag.
942             fin = false;
943         }
944 
945         let buf = RangeBuf::from(data, self.off, fin);
946         self.push(buf)?;
947 
948         self.off += data.len() as u64;
949 
950         Ok(data.len())
951     }
952 
953     /// Inserts the given chunk of data in the buffer.
push(&mut self, buf: RangeBuf) -> Result<()>954     pub fn push(&mut self, buf: RangeBuf) -> Result<()> {
955         if let Some(fin_off) = self.fin_off {
956             // Can't write past final offset.
957             if buf.max_off() > fin_off {
958                 return Err(Error::FinalSize);
959             }
960 
961             // Can't "undo" final offset.
962             if buf.max_off() == fin_off && !buf.fin() {
963                 return Err(Error::FinalSize);
964             }
965         }
966 
967         if self.shutdown {
968             return Ok(());
969         }
970 
971         if buf.fin() {
972             self.fin_off = Some(buf.max_off());
973         }
974 
975         // Don't queue data that was already fully acked.
976         if self.ack_off() >= buf.max_off() {
977             return Ok(());
978         }
979 
980         self.len += buf.len() as u64;
981 
982         // We already recorded the final offset, so we can just discard the
983         // empty buffer now.
984         if buf.is_empty() {
985             return Ok(());
986         }
987 
988         self.data.push(buf);
989 
990         Ok(())
991     }
992 
993     /// Returns contiguous data from the send buffer as a single `RangeBuf`.
pop(&mut self, max_data: usize) -> Result<RangeBuf>994     pub fn pop(&mut self, max_data: usize) -> Result<RangeBuf> {
995         let mut out = RangeBuf::default();
996         out.data =
997             Vec::with_capacity(cmp::min(max_data as u64, self.len) as usize);
998         out.off = self.off;
999 
1000         let mut out_len = max_data;
1001         let mut out_off = self.data.peek().map_or_else(|| out.off, RangeBuf::off);
1002 
1003         while out_len > 0 &&
1004             self.ready() &&
1005             self.off_front() == out_off &&
1006             self.off_front() < self.max_data
1007         {
1008             let mut buf = match self.data.peek_mut() {
1009                 Some(v) => v,
1010 
1011                 None => break,
1012             };
1013 
1014             let buf_len = cmp::min(buf.len(), out_len);
1015 
1016             if out.is_empty() {
1017                 out.off = buf.off();
1018             }
1019 
1020             self.len -= buf_len as u64;
1021 
1022             out_len -= buf_len;
1023             out_off = buf.off() + buf_len as u64;
1024 
1025             out.data.extend_from_slice(&buf[..buf_len]);
1026 
1027             if buf_len < buf.len() {
1028                 buf.consume(buf_len);
1029 
1030                 // We reached the maximum capacity, so end here.
1031                 break;
1032             }
1033 
1034             std::collections::binary_heap::PeekMut::pop(buf);
1035         }
1036 
1037         // Override the `fin` flag set for the output buffer by matching the
1038         // buffer's maximum offset against the stream's final offset (if known).
1039         //
1040         // This is more efficient than tracking `fin` using the range buffers
1041         // themselves, and lets us avoid queueing empty buffers just so we can
1042         // propagate the final size.
1043         out.fin = self.fin_off == Some(out.max_off());
1044 
1045         Ok(out)
1046     }
1047 
1048     /// Updates the max_data limit to the given value.
update_max_data(&mut self, max_data: u64)1049     pub fn update_max_data(&mut self, max_data: u64) {
1050         self.max_data = cmp::max(self.max_data, max_data);
1051     }
1052 
1053     /// Increments the acked data offset.
ack(&mut self, off: u64, len: usize)1054     pub fn ack(&mut self, off: u64, len: usize) {
1055         self.acked.insert(off..off + len as u64);
1056     }
1057 
1058     /// Shuts down sending data.
shutdown(&mut self) -> Result<()>1059     pub fn shutdown(&mut self) -> Result<()> {
1060         if self.shutdown {
1061             return Err(Error::Done);
1062         }
1063 
1064         self.shutdown = true;
1065 
1066         self.data.clear();
1067 
1068         Ok(())
1069     }
1070 
1071     /// Returns the largest offset of data buffered.
1072     #[allow(dead_code)]
off_back(&self) -> u641073     pub fn off_back(&self) -> u64 {
1074         self.off
1075     }
1076 
1077     /// Returns the lowest offset of data buffered.
off_front(&self) -> u641078     pub fn off_front(&self) -> u64 {
1079         match self.data.peek() {
1080             Some(v) => v.off(),
1081 
1082             None => self.off,
1083         }
1084     }
1085 
1086     /// The maximum offset we are allowed to send to the peer.
max_off(&self) -> u641087     pub fn max_off(&self) -> u64 {
1088         self.max_data
1089     }
1090 
1091     /// Returns true if all data in the stream has been sent.
1092     ///
1093     /// This happens when the stream's send final size is knwon, and the
1094     /// application has already written data up to that point.
is_fin(&self) -> bool1095     pub fn is_fin(&self) -> bool {
1096         if self.fin_off == Some(self.off) {
1097             return true;
1098         }
1099 
1100         false
1101     }
1102 
1103     /// Returns true if the send-side of the stream is complete.
1104     ///
1105     /// This happens when the stream's send final size is known, and the peer
1106     /// has already acked all stream data up to that point.
is_complete(&self) -> bool1107     pub fn is_complete(&self) -> bool {
1108         if let Some(fin_off) = self.fin_off {
1109             if self.acked == (0..fin_off) {
1110                 return true;
1111             }
1112         }
1113 
1114         false
1115     }
1116 
1117     /// Returns true if there is data to be written.
ready(&self) -> bool1118     fn ready(&self) -> bool {
1119         !self.data.is_empty()
1120     }
1121 
1122     /// Returns the highest contiguously acked offset.
ack_off(&self) -> u641123     fn ack_off(&self) -> u64 {
1124         match self.acked.iter().next() {
1125             // Only consider the initial range if it contiguously covers the
1126             // start of the stream (i.e. from offset 0).
1127             Some(std::ops::Range { start: 0, end }) => end,
1128 
1129             Some(_) | None => 0,
1130         }
1131     }
1132 
1133     /// Returns the outgoing flow control capacity.
cap(&self) -> usize1134     pub fn cap(&self) -> usize {
1135         (self.max_data - self.off) as usize
1136     }
1137 }
1138 
1139 /// Buffer holding data at a specific offset.
1140 #[derive(Clone, Debug, Default, Eq)]
1141 pub struct RangeBuf {
1142     /// The internal buffer holding the data.
1143     data: Vec<u8>,
1144 
1145     /// The starting offset within `data`. This allows partially consuming a
1146     /// buffer without duplicating the data.
1147     pos: usize,
1148 
1149     /// The starting offset within a stream.
1150     off: u64,
1151 
1152     /// Whether this contains the final byte in the stream.
1153     fin: bool,
1154 }
1155 
1156 impl RangeBuf {
1157     /// Creates a new `RangeBuf` from the given slice.
from(buf: &[u8], off: u64, fin: bool) -> RangeBuf1158     pub(crate) fn from(buf: &[u8], off: u64, fin: bool) -> RangeBuf {
1159         RangeBuf {
1160             data: Vec::from(buf),
1161             pos: 0,
1162             off,
1163             fin,
1164         }
1165     }
1166 
1167     /// Returns whether `self` holds the final offset in the stream.
fin(&self) -> bool1168     pub fn fin(&self) -> bool {
1169         self.fin
1170     }
1171 
1172     /// Returns the starting offset of `self`.
off(&self) -> u641173     pub fn off(&self) -> u64 {
1174         self.off + self.pos as u64
1175     }
1176 
1177     /// Returns the final offset of `self`.
max_off(&self) -> u641178     pub fn max_off(&self) -> u64 {
1179         self.off() + self.len() as u64
1180     }
1181 
1182     /// Returns the length of `self`.
len(&self) -> usize1183     pub fn len(&self) -> usize {
1184         self.data.len() - self.pos
1185     }
1186 
1187     /// Returns true if `self` has a length of zero bytes.
is_empty(&self) -> bool1188     pub fn is_empty(&self) -> bool {
1189         self.len() == 0
1190     }
1191 
1192     /// Consumes the starting `count` bytes of `self`.
consume(&mut self, count: usize)1193     pub fn consume(&mut self, count: usize) {
1194         self.pos += count;
1195     }
1196 
1197     /// Splits the buffer into two at the given index.
split_off(&mut self, at: usize) -> RangeBuf1198     pub fn split_off(&mut self, at: usize) -> RangeBuf {
1199         let buf = RangeBuf {
1200             data: self.data.split_off(at),
1201             pos: 0,
1202             off: self.off + at as u64,
1203             fin: self.fin,
1204         };
1205 
1206         self.fin = false;
1207 
1208         buf
1209     }
1210 }
1211 
1212 impl std::ops::Deref for RangeBuf {
1213     type Target = [u8];
1214 
deref(&self) -> &[u8]1215     fn deref(&self) -> &[u8] {
1216         &self.data[self.pos..]
1217     }
1218 }
1219 
1220 impl std::ops::DerefMut for RangeBuf {
deref_mut(&mut self) -> &mut [u8]1221     fn deref_mut(&mut self) -> &mut [u8] {
1222         &mut self.data[self.pos..]
1223     }
1224 }
1225 
1226 impl Ord for RangeBuf {
cmp(&self, other: &RangeBuf) -> cmp::Ordering1227     fn cmp(&self, other: &RangeBuf) -> cmp::Ordering {
1228         // Invert ordering to implement min-heap.
1229         self.off.cmp(&other.off).reverse()
1230     }
1231 }
1232 
1233 impl PartialOrd for RangeBuf {
partial_cmp(&self, other: &RangeBuf) -> Option<cmp::Ordering>1234     fn partial_cmp(&self, other: &RangeBuf) -> Option<cmp::Ordering> {
1235         Some(self.cmp(other))
1236     }
1237 }
1238 
1239 impl PartialEq for RangeBuf {
eq(&self, other: &RangeBuf) -> bool1240     fn eq(&self, other: &RangeBuf) -> bool {
1241         self.off == other.off
1242     }
1243 }
1244 
1245 #[cfg(test)]
1246 mod tests {
1247     use super::*;
1248 
1249     #[test]
empty_read()1250     fn empty_read() {
1251         let mut recv = RecvBuf::new(std::u64::MAX);
1252         assert_eq!(recv.len, 0);
1253 
1254         let mut buf = [0; 32];
1255 
1256         assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1257     }
1258 
1259     #[test]
empty_stream_frame()1260     fn empty_stream_frame() {
1261         let mut recv = RecvBuf::new(15);
1262         assert_eq!(recv.len, 0);
1263 
1264         let buf = RangeBuf::from(b"hello", 0, false);
1265         assert!(recv.push(buf).is_ok());
1266         assert_eq!(recv.len, 5);
1267         assert_eq!(recv.off, 0);
1268         assert_eq!(recv.data.len(), 1);
1269 
1270         let mut buf = [0; 32];
1271         assert_eq!(recv.pop(&mut buf), Ok((5, false)));
1272 
1273         // Don't store non-fin empty buffer.
1274         let buf = RangeBuf::from(b"", 10, false);
1275         assert!(recv.push(buf).is_ok());
1276         assert_eq!(recv.len, 5);
1277         assert_eq!(recv.off, 5);
1278         assert_eq!(recv.data.len(), 0);
1279 
1280         // Check flow control for empty buffer.
1281         let buf = RangeBuf::from(b"", 16, false);
1282         assert_eq!(recv.push(buf), Err(Error::FlowControl));
1283 
1284         // Store fin empty buffer.
1285         let buf = RangeBuf::from(b"", 5, true);
1286         assert!(recv.push(buf).is_ok());
1287         assert_eq!(recv.len, 5);
1288         assert_eq!(recv.off, 5);
1289         assert_eq!(recv.data.len(), 1);
1290 
1291         // Don't store additional fin empty buffers.
1292         let buf = RangeBuf::from(b"", 5, true);
1293         assert!(recv.push(buf).is_ok());
1294         assert_eq!(recv.len, 5);
1295         assert_eq!(recv.off, 5);
1296         assert_eq!(recv.data.len(), 1);
1297 
1298         // Don't store additional fin non-empty buffers.
1299         let buf = RangeBuf::from(b"aa", 3, true);
1300         assert!(recv.push(buf).is_ok());
1301         assert_eq!(recv.len, 5);
1302         assert_eq!(recv.off, 5);
1303         assert_eq!(recv.data.len(), 1);
1304 
1305         // Validate final size with fin empty buffers.
1306         let buf = RangeBuf::from(b"", 6, true);
1307         assert_eq!(recv.push(buf), Err(Error::FinalSize));
1308         let buf = RangeBuf::from(b"", 4, true);
1309         assert_eq!(recv.push(buf), Err(Error::FinalSize));
1310 
1311         let mut buf = [0; 32];
1312         assert_eq!(recv.pop(&mut buf), Ok((0, true)));
1313     }
1314 
1315     #[test]
ordered_read()1316     fn ordered_read() {
1317         let mut recv = RecvBuf::new(std::u64::MAX);
1318         assert_eq!(recv.len, 0);
1319 
1320         let mut buf = [0; 32];
1321 
1322         let first = RangeBuf::from(b"hello", 0, false);
1323         let second = RangeBuf::from(b"world", 5, false);
1324         let third = RangeBuf::from(b"something", 10, true);
1325 
1326         assert!(recv.push(second).is_ok());
1327         assert_eq!(recv.len, 10);
1328         assert_eq!(recv.off, 0);
1329 
1330         assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1331 
1332         assert!(recv.push(third).is_ok());
1333         assert_eq!(recv.len, 19);
1334         assert_eq!(recv.off, 0);
1335 
1336         assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1337 
1338         assert!(recv.push(first).is_ok());
1339         assert_eq!(recv.len, 19);
1340         assert_eq!(recv.off, 0);
1341 
1342         let (len, fin) = recv.pop(&mut buf).unwrap();
1343         assert_eq!(len, 19);
1344         assert_eq!(fin, true);
1345         assert_eq!(&buf[..len], b"helloworldsomething");
1346         assert_eq!(recv.len, 19);
1347         assert_eq!(recv.off, 19);
1348 
1349         assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1350     }
1351 
1352     #[test]
split_read()1353     fn split_read() {
1354         let mut recv = RecvBuf::new(std::u64::MAX);
1355         assert_eq!(recv.len, 0);
1356 
1357         let mut buf = [0; 32];
1358 
1359         let first = RangeBuf::from(b"something", 0, false);
1360         let second = RangeBuf::from(b"helloworld", 9, true);
1361 
1362         assert!(recv.push(first).is_ok());
1363         assert_eq!(recv.len, 9);
1364         assert_eq!(recv.off, 0);
1365 
1366         assert!(recv.push(second).is_ok());
1367         assert_eq!(recv.len, 19);
1368         assert_eq!(recv.off, 0);
1369 
1370         let (len, fin) = recv.pop(&mut buf[..10]).unwrap();
1371         assert_eq!(len, 10);
1372         assert_eq!(fin, false);
1373         assert_eq!(&buf[..len], b"somethingh");
1374         assert_eq!(recv.len, 19);
1375         assert_eq!(recv.off, 10);
1376 
1377         let (len, fin) = recv.pop(&mut buf[..5]).unwrap();
1378         assert_eq!(len, 5);
1379         assert_eq!(fin, false);
1380         assert_eq!(&buf[..len], b"ellow");
1381         assert_eq!(recv.len, 19);
1382         assert_eq!(recv.off, 15);
1383 
1384         let (len, fin) = recv.pop(&mut buf[..10]).unwrap();
1385         assert_eq!(len, 4);
1386         assert_eq!(fin, true);
1387         assert_eq!(&buf[..len], b"orld");
1388         assert_eq!(recv.len, 19);
1389         assert_eq!(recv.off, 19);
1390     }
1391 
1392     #[test]
incomplete_read()1393     fn incomplete_read() {
1394         let mut recv = RecvBuf::new(std::u64::MAX);
1395         assert_eq!(recv.len, 0);
1396 
1397         let mut buf = [0; 32];
1398 
1399         let first = RangeBuf::from(b"something", 0, false);
1400         let second = RangeBuf::from(b"helloworld", 9, true);
1401 
1402         assert!(recv.push(second).is_ok());
1403         assert_eq!(recv.len, 19);
1404         assert_eq!(recv.off, 0);
1405 
1406         assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1407 
1408         assert!(recv.push(first).is_ok());
1409         assert_eq!(recv.len, 19);
1410         assert_eq!(recv.off, 0);
1411 
1412         let (len, fin) = recv.pop(&mut buf).unwrap();
1413         assert_eq!(len, 19);
1414         assert_eq!(fin, true);
1415         assert_eq!(&buf[..len], b"somethinghelloworld");
1416         assert_eq!(recv.len, 19);
1417         assert_eq!(recv.off, 19);
1418     }
1419 
1420     #[test]
zero_len_read()1421     fn zero_len_read() {
1422         let mut recv = RecvBuf::new(std::u64::MAX);
1423         assert_eq!(recv.len, 0);
1424 
1425         let mut buf = [0; 32];
1426 
1427         let first = RangeBuf::from(b"something", 0, false);
1428         let second = RangeBuf::from(b"", 9, true);
1429 
1430         assert!(recv.push(first).is_ok());
1431         assert_eq!(recv.len, 9);
1432         assert_eq!(recv.off, 0);
1433         assert_eq!(recv.data.len(), 1);
1434 
1435         assert!(recv.push(second).is_ok());
1436         assert_eq!(recv.len, 9);
1437         assert_eq!(recv.off, 0);
1438         assert_eq!(recv.data.len(), 1);
1439 
1440         let (len, fin) = recv.pop(&mut buf).unwrap();
1441         assert_eq!(len, 9);
1442         assert_eq!(fin, true);
1443         assert_eq!(&buf[..len], b"something");
1444         assert_eq!(recv.len, 9);
1445         assert_eq!(recv.off, 9);
1446     }
1447 
1448     #[test]
past_read()1449     fn past_read() {
1450         let mut recv = RecvBuf::new(std::u64::MAX);
1451         assert_eq!(recv.len, 0);
1452 
1453         let mut buf = [0; 32];
1454 
1455         let first = RangeBuf::from(b"something", 0, false);
1456         let second = RangeBuf::from(b"hello", 3, false);
1457         let third = RangeBuf::from(b"ello", 4, true);
1458         let fourth = RangeBuf::from(b"ello", 5, true);
1459 
1460         assert!(recv.push(first).is_ok());
1461         assert_eq!(recv.len, 9);
1462         assert_eq!(recv.off, 0);
1463         assert_eq!(recv.data.len(), 1);
1464 
1465         let (len, fin) = recv.pop(&mut buf).unwrap();
1466         assert_eq!(len, 9);
1467         assert_eq!(fin, false);
1468         assert_eq!(&buf[..len], b"something");
1469         assert_eq!(recv.len, 9);
1470         assert_eq!(recv.off, 9);
1471 
1472         assert!(recv.push(second).is_ok());
1473         assert_eq!(recv.len, 9);
1474         assert_eq!(recv.off, 9);
1475         assert_eq!(recv.data.len(), 0);
1476 
1477         assert_eq!(recv.push(third), Err(Error::FinalSize));
1478 
1479         assert!(recv.push(fourth).is_ok());
1480         assert_eq!(recv.len, 9);
1481         assert_eq!(recv.off, 9);
1482         assert_eq!(recv.data.len(), 0);
1483 
1484         assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1485     }
1486 
1487     #[test]
fully_overlapping_read()1488     fn fully_overlapping_read() {
1489         let mut recv = RecvBuf::new(std::u64::MAX);
1490         assert_eq!(recv.len, 0);
1491 
1492         let mut buf = [0; 32];
1493 
1494         let first = RangeBuf::from(b"something", 0, false);
1495         let second = RangeBuf::from(b"hello", 4, false);
1496 
1497         assert!(recv.push(first).is_ok());
1498         assert_eq!(recv.len, 9);
1499         assert_eq!(recv.off, 0);
1500         assert_eq!(recv.data.len(), 1);
1501 
1502         assert!(recv.push(second).is_ok());
1503         assert_eq!(recv.len, 9);
1504         assert_eq!(recv.off, 0);
1505         assert_eq!(recv.data.len(), 1);
1506 
1507         let (len, fin) = recv.pop(&mut buf).unwrap();
1508         assert_eq!(len, 9);
1509         assert_eq!(fin, false);
1510         assert_eq!(&buf[..len], b"something");
1511         assert_eq!(recv.len, 9);
1512         assert_eq!(recv.off, 9);
1513         assert_eq!(recv.data.len(), 0);
1514 
1515         assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1516     }
1517 
1518     #[test]
fully_overlapping_read2()1519     fn fully_overlapping_read2() {
1520         let mut recv = RecvBuf::new(std::u64::MAX);
1521         assert_eq!(recv.len, 0);
1522 
1523         let mut buf = [0; 32];
1524 
1525         let first = RangeBuf::from(b"something", 0, false);
1526         let second = RangeBuf::from(b"hello", 4, false);
1527 
1528         assert!(recv.push(second).is_ok());
1529         assert_eq!(recv.len, 9);
1530         assert_eq!(recv.off, 0);
1531         assert_eq!(recv.data.len(), 1);
1532 
1533         assert!(recv.push(first).is_ok());
1534         assert_eq!(recv.len, 9);
1535         assert_eq!(recv.off, 0);
1536         assert_eq!(recv.data.len(), 2);
1537 
1538         let (len, fin) = recv.pop(&mut buf).unwrap();
1539         assert_eq!(len, 9);
1540         assert_eq!(fin, false);
1541         assert_eq!(&buf[..len], b"somehello");
1542         assert_eq!(recv.len, 9);
1543         assert_eq!(recv.off, 9);
1544         assert_eq!(recv.data.len(), 0);
1545 
1546         assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1547     }
1548 
1549     #[test]
fully_overlapping_read3()1550     fn fully_overlapping_read3() {
1551         let mut recv = RecvBuf::new(std::u64::MAX);
1552         assert_eq!(recv.len, 0);
1553 
1554         let mut buf = [0; 32];
1555 
1556         let first = RangeBuf::from(b"something", 0, false);
1557         let second = RangeBuf::from(b"hello", 3, false);
1558 
1559         assert!(recv.push(second).is_ok());
1560         assert_eq!(recv.len, 8);
1561         assert_eq!(recv.off, 0);
1562         assert_eq!(recv.data.len(), 1);
1563 
1564         assert!(recv.push(first).is_ok());
1565         assert_eq!(recv.len, 9);
1566         assert_eq!(recv.off, 0);
1567         assert_eq!(recv.data.len(), 3);
1568 
1569         let (len, fin) = recv.pop(&mut buf).unwrap();
1570         assert_eq!(len, 9);
1571         assert_eq!(fin, false);
1572         assert_eq!(&buf[..len], b"somhellog");
1573         assert_eq!(recv.len, 9);
1574         assert_eq!(recv.off, 9);
1575         assert_eq!(recv.data.len(), 0);
1576 
1577         assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1578     }
1579 
1580     #[test]
fully_overlapping_read_multi()1581     fn fully_overlapping_read_multi() {
1582         let mut recv = RecvBuf::new(std::u64::MAX);
1583         assert_eq!(recv.len, 0);
1584 
1585         let mut buf = [0; 32];
1586 
1587         let first = RangeBuf::from(b"somethingsomething", 0, false);
1588         let second = RangeBuf::from(b"hello", 3, false);
1589         let third = RangeBuf::from(b"hello", 12, false);
1590 
1591         assert!(recv.push(second).is_ok());
1592         assert_eq!(recv.len, 8);
1593         assert_eq!(recv.off, 0);
1594         assert_eq!(recv.data.len(), 1);
1595 
1596         assert!(recv.push(third).is_ok());
1597         assert_eq!(recv.len, 17);
1598         assert_eq!(recv.off, 0);
1599         assert_eq!(recv.data.len(), 2);
1600 
1601         assert!(recv.push(first).is_ok());
1602         assert_eq!(recv.len, 18);
1603         assert_eq!(recv.off, 0);
1604         assert_eq!(recv.data.len(), 5);
1605 
1606         let (len, fin) = recv.pop(&mut buf).unwrap();
1607         assert_eq!(len, 18);
1608         assert_eq!(fin, false);
1609         assert_eq!(&buf[..len], b"somhellogsomhellog");
1610         assert_eq!(recv.len, 18);
1611         assert_eq!(recv.off, 18);
1612         assert_eq!(recv.data.len(), 0);
1613 
1614         assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1615     }
1616 
1617     #[test]
overlapping_start_read()1618     fn overlapping_start_read() {
1619         let mut recv = RecvBuf::new(std::u64::MAX);
1620         assert_eq!(recv.len, 0);
1621 
1622         let mut buf = [0; 32];
1623 
1624         let first = RangeBuf::from(b"something", 0, false);
1625         let second = RangeBuf::from(b"hello", 8, true);
1626 
1627         assert!(recv.push(first).is_ok());
1628         assert_eq!(recv.len, 9);
1629         assert_eq!(recv.off, 0);
1630         assert_eq!(recv.data.len(), 1);
1631 
1632         assert!(recv.push(second).is_ok());
1633         assert_eq!(recv.len, 13);
1634         assert_eq!(recv.off, 0);
1635         assert_eq!(recv.data.len(), 2);
1636 
1637         let (len, fin) = recv.pop(&mut buf).unwrap();
1638         assert_eq!(len, 13);
1639         assert_eq!(fin, true);
1640         assert_eq!(&buf[..len], b"somethingello");
1641         assert_eq!(recv.len, 13);
1642         assert_eq!(recv.off, 13);
1643 
1644         assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1645     }
1646 
1647     #[test]
overlapping_end_read()1648     fn overlapping_end_read() {
1649         let mut recv = RecvBuf::new(std::u64::MAX);
1650         assert_eq!(recv.len, 0);
1651 
1652         let mut buf = [0; 32];
1653 
1654         let first = RangeBuf::from(b"hello", 0, false);
1655         let second = RangeBuf::from(b"something", 3, true);
1656 
1657         assert!(recv.push(second).is_ok());
1658         assert_eq!(recv.len, 12);
1659         assert_eq!(recv.off, 0);
1660         assert_eq!(recv.data.len(), 1);
1661 
1662         assert!(recv.push(first).is_ok());
1663         assert_eq!(recv.len, 12);
1664         assert_eq!(recv.off, 0);
1665         assert_eq!(recv.data.len(), 2);
1666 
1667         let (len, fin) = recv.pop(&mut buf).unwrap();
1668         assert_eq!(len, 12);
1669         assert_eq!(fin, true);
1670         assert_eq!(&buf[..len], b"helsomething");
1671         assert_eq!(recv.len, 12);
1672         assert_eq!(recv.off, 12);
1673 
1674         assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1675     }
1676 
1677     #[test]
partially_multi_overlapping_reordered_read()1678     fn partially_multi_overlapping_reordered_read() {
1679         let mut recv = RecvBuf::new(std::u64::MAX);
1680         assert_eq!(recv.len, 0);
1681 
1682         let mut buf = [0; 32];
1683 
1684         let first = RangeBuf::from(b"hello", 8, false);
1685         let second = RangeBuf::from(b"something", 0, false);
1686         let third = RangeBuf::from(b"moar", 11, true);
1687 
1688         assert!(recv.push(first).is_ok());
1689         assert_eq!(recv.len, 13);
1690         assert_eq!(recv.off, 0);
1691         assert_eq!(recv.data.len(), 1);
1692 
1693         assert!(recv.push(second).is_ok());
1694         assert_eq!(recv.len, 13);
1695         assert_eq!(recv.off, 0);
1696         assert_eq!(recv.data.len(), 2);
1697 
1698         assert!(recv.push(third).is_ok());
1699         assert_eq!(recv.len, 15);
1700         assert_eq!(recv.off, 0);
1701         assert_eq!(recv.data.len(), 3);
1702 
1703         let (len, fin) = recv.pop(&mut buf).unwrap();
1704         assert_eq!(len, 15);
1705         assert_eq!(fin, true);
1706         assert_eq!(&buf[..len], b"somethinhelloar");
1707         assert_eq!(recv.len, 15);
1708         assert_eq!(recv.off, 15);
1709         assert_eq!(recv.data.len(), 0);
1710 
1711         assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1712     }
1713 
1714     #[test]
partially_multi_overlapping_reordered_read2()1715     fn partially_multi_overlapping_reordered_read2() {
1716         let mut recv = RecvBuf::new(std::u64::MAX);
1717         assert_eq!(recv.len, 0);
1718 
1719         let mut buf = [0; 32];
1720 
1721         let first = RangeBuf::from(b"aaa", 0, false);
1722         let second = RangeBuf::from(b"bbb", 2, false);
1723         let third = RangeBuf::from(b"ccc", 4, false);
1724         let fourth = RangeBuf::from(b"ddd", 6, false);
1725         let fifth = RangeBuf::from(b"eee", 9, false);
1726         let sixth = RangeBuf::from(b"fff", 11, false);
1727 
1728         assert!(recv.push(second).is_ok());
1729         assert_eq!(recv.len, 5);
1730         assert_eq!(recv.off, 0);
1731         assert_eq!(recv.data.len(), 1);
1732 
1733         assert!(recv.push(fourth).is_ok());
1734         assert_eq!(recv.len, 9);
1735         assert_eq!(recv.off, 0);
1736         assert_eq!(recv.data.len(), 2);
1737 
1738         assert!(recv.push(third).is_ok());
1739         assert_eq!(recv.len, 9);
1740         assert_eq!(recv.off, 0);
1741         assert_eq!(recv.data.len(), 3);
1742 
1743         assert!(recv.push(first).is_ok());
1744         assert_eq!(recv.len, 9);
1745         assert_eq!(recv.off, 0);
1746         assert_eq!(recv.data.len(), 4);
1747 
1748         assert!(recv.push(sixth).is_ok());
1749         assert_eq!(recv.len, 14);
1750         assert_eq!(recv.off, 0);
1751         assert_eq!(recv.data.len(), 5);
1752 
1753         assert!(recv.push(fifth).is_ok());
1754         assert_eq!(recv.len, 14);
1755         assert_eq!(recv.off, 0);
1756         assert_eq!(recv.data.len(), 6);
1757 
1758         let (len, fin) = recv.pop(&mut buf).unwrap();
1759         assert_eq!(len, 14);
1760         assert_eq!(fin, false);
1761         assert_eq!(&buf[..len], b"aabbbcdddeefff");
1762         assert_eq!(recv.len, 14);
1763         assert_eq!(recv.off, 14);
1764         assert_eq!(recv.data.len(), 0);
1765 
1766         assert_eq!(recv.pop(&mut buf), Err(Error::Done));
1767     }
1768 
1769     #[test]
empty_write()1770     fn empty_write() {
1771         let mut send = SendBuf::new(std::u64::MAX);
1772         assert_eq!(send.len, 0);
1773 
1774         let write = send.pop(std::usize::MAX).unwrap();
1775         assert_eq!(write.len(), 0);
1776         assert_eq!(write.fin(), false);
1777     }
1778 
1779     #[test]
multi_write()1780     fn multi_write() {
1781         let mut send = SendBuf::new(std::u64::MAX);
1782         assert_eq!(send.len, 0);
1783 
1784         let first = b"something";
1785         let second = b"helloworld";
1786 
1787         assert!(send.push_slice(first, false).is_ok());
1788         assert_eq!(send.len, 9);
1789 
1790         assert!(send.push_slice(second, true).is_ok());
1791         assert_eq!(send.len, 19);
1792 
1793         let write = send.pop(128).unwrap();
1794         assert_eq!(write.len(), 19);
1795         assert_eq!(write.fin(), true);
1796         assert_eq!(&write[..], b"somethinghelloworld");
1797         assert_eq!(send.len, 0);
1798     }
1799 
1800     #[test]
split_write()1801     fn split_write() {
1802         let mut send = SendBuf::new(std::u64::MAX);
1803         assert_eq!(send.len, 0);
1804 
1805         let first = b"something";
1806         let second = b"helloworld";
1807 
1808         assert!(send.push_slice(first, false).is_ok());
1809         assert_eq!(send.len, 9);
1810 
1811         assert!(send.push_slice(second, true).is_ok());
1812         assert_eq!(send.len, 19);
1813 
1814         let write = send.pop(10).unwrap();
1815         assert_eq!(write.off(), 0);
1816         assert_eq!(write.len(), 10);
1817         assert_eq!(write.fin(), false);
1818         assert_eq!(&write[..], b"somethingh");
1819         assert_eq!(send.len, 9);
1820 
1821         let write = send.pop(5).unwrap();
1822         assert_eq!(write.off(), 10);
1823         assert_eq!(write.len(), 5);
1824         assert_eq!(write.fin(), false);
1825         assert_eq!(&write[..], b"ellow");
1826         assert_eq!(send.len, 4);
1827 
1828         let write = send.pop(10).unwrap();
1829         assert_eq!(write.off(), 15);
1830         assert_eq!(write.len(), 4);
1831         assert_eq!(write.fin(), true);
1832         assert_eq!(&write[..], b"orld");
1833         assert_eq!(send.len, 0);
1834     }
1835 
1836     #[test]
resend()1837     fn resend() {
1838         let mut send = SendBuf::new(std::u64::MAX);
1839         assert_eq!(send.len, 0);
1840         assert_eq!(send.off_front(), 0);
1841 
1842         let first = b"something";
1843         let second = b"helloworld";
1844 
1845         assert!(send.push_slice(first, false).is_ok());
1846         assert_eq!(send.off_front(), 0);
1847 
1848         assert!(send.push_slice(second, true).is_ok());
1849         assert_eq!(send.off_front(), 0);
1850 
1851         let write1 = send.pop(4).unwrap();
1852         assert_eq!(write1.off(), 0);
1853         assert_eq!(write1.len(), 4);
1854         assert_eq!(write1.fin(), false);
1855         assert_eq!(&write1[..], b"some");
1856         assert_eq!(send.len, 15);
1857         assert_eq!(send.off_front(), 4);
1858 
1859         let write2 = send.pop(5).unwrap();
1860         assert_eq!(write2.off(), 4);
1861         assert_eq!(write2.len(), 5);
1862         assert_eq!(write2.fin(), false);
1863         assert_eq!(&write2[..], b"thing");
1864         assert_eq!(send.len, 10);
1865         assert_eq!(send.off_front(), 9);
1866 
1867         let write3 = send.pop(5).unwrap();
1868         assert_eq!(write3.off(), 9);
1869         assert_eq!(write3.len(), 5);
1870         assert_eq!(write3.fin(), false);
1871         assert_eq!(&write3[..], b"hello");
1872         assert_eq!(send.len, 5);
1873         assert_eq!(send.off_front(), 14);
1874 
1875         send.push(write2).unwrap();
1876         assert_eq!(send.len, 10);
1877         assert_eq!(send.off_front(), 4);
1878 
1879         send.push(write1).unwrap();
1880         assert_eq!(send.len, 14);
1881         assert_eq!(send.off_front(), 0);
1882 
1883         let write4 = send.pop(11).unwrap();
1884         assert_eq!(write4.off(), 0);
1885         assert_eq!(write4.len(), 9);
1886         assert_eq!(write4.fin(), false);
1887         assert_eq!(&write4[..], b"something");
1888         assert_eq!(send.len, 5);
1889         assert_eq!(send.off_front(), 14);
1890 
1891         let write5 = send.pop(11).unwrap();
1892         assert_eq!(write5.off(), 14);
1893         assert_eq!(write5.len(), 5);
1894         assert_eq!(write5.fin(), true);
1895         assert_eq!(&write5[..], b"world");
1896         assert_eq!(send.len, 0);
1897         assert_eq!(send.off_front(), 19);
1898     }
1899 
1900     #[test]
write_blocked_by_off()1901     fn write_blocked_by_off() {
1902         let mut send = SendBuf::default();
1903         assert_eq!(send.len, 0);
1904 
1905         let first = b"something";
1906         let second = b"helloworld";
1907 
1908         assert_eq!(send.push_slice(first, false), Ok(0));
1909         assert_eq!(send.len, 0);
1910 
1911         assert_eq!(send.push_slice(second, true), Ok(0));
1912         assert_eq!(send.len, 0);
1913 
1914         send.update_max_data(5);
1915 
1916         assert_eq!(send.push_slice(first, false), Ok(5));
1917         assert_eq!(send.len, 5);
1918 
1919         assert_eq!(send.push_slice(second, true), Ok(0));
1920         assert_eq!(send.len, 5);
1921 
1922         let write = send.pop(10).unwrap();
1923         assert_eq!(write.off(), 0);
1924         assert_eq!(write.len(), 5);
1925         assert_eq!(write.fin(), false);
1926         assert_eq!(&write[..], b"somet");
1927         assert_eq!(send.len, 0);
1928 
1929         let write = send.pop(10).unwrap();
1930         assert_eq!(write.off(), 5);
1931         assert_eq!(write.len(), 0);
1932         assert_eq!(write.fin(), false);
1933         assert_eq!(&write[..], b"");
1934         assert_eq!(send.len, 0);
1935 
1936         send.update_max_data(15);
1937 
1938         assert_eq!(send.push_slice(&first[5..], false), Ok(4));
1939         assert_eq!(send.len, 4);
1940 
1941         assert_eq!(send.push_slice(second, true), Ok(6));
1942         assert_eq!(send.len, 10);
1943 
1944         let write = send.pop(10).unwrap();
1945         assert_eq!(write.off(), 5);
1946         assert_eq!(write.len(), 10);
1947         assert_eq!(write.fin(), false);
1948         assert_eq!(&write[..], b"hinghellow");
1949         assert_eq!(send.len, 0);
1950 
1951         send.update_max_data(25);
1952 
1953         assert_eq!(send.push_slice(&second[6..], true), Ok(4));
1954         assert_eq!(send.len, 4);
1955 
1956         let write = send.pop(10).unwrap();
1957         assert_eq!(write.off(), 15);
1958         assert_eq!(write.len(), 4);
1959         assert_eq!(write.fin(), true);
1960         assert_eq!(&write[..], b"orld");
1961         assert_eq!(send.len, 0);
1962     }
1963 
1964     #[test]
zero_len_write()1965     fn zero_len_write() {
1966         let mut send = SendBuf::new(std::u64::MAX);
1967         assert_eq!(send.len, 0);
1968 
1969         let first = b"something";
1970 
1971         assert!(send.push_slice(first, false).is_ok());
1972         assert_eq!(send.len, 9);
1973 
1974         assert!(send.push_slice(&[], true).is_ok());
1975         assert_eq!(send.len, 9);
1976 
1977         let write = send.pop(10).unwrap();
1978         assert_eq!(write.off(), 0);
1979         assert_eq!(write.len(), 9);
1980         assert_eq!(write.fin(), true);
1981         assert_eq!(&write[..], b"something");
1982         assert_eq!(send.len, 0);
1983     }
1984 
1985     #[test]
recv_flow_control()1986     fn recv_flow_control() {
1987         let mut stream = Stream::new(15, 0, true, true);
1988         assert!(!stream.recv.almost_full());
1989 
1990         let mut buf = [0; 32];
1991 
1992         let first = RangeBuf::from(b"hello", 0, false);
1993         let second = RangeBuf::from(b"world", 5, false);
1994         let third = RangeBuf::from(b"something", 10, false);
1995 
1996         assert_eq!(stream.recv.push(second), Ok(()));
1997         assert_eq!(stream.recv.push(first), Ok(()));
1998         assert!(!stream.recv.almost_full());
1999 
2000         assert_eq!(stream.recv.push(third), Err(Error::FlowControl));
2001 
2002         let (len, fin) = stream.recv.pop(&mut buf).unwrap();
2003         assert_eq!(&buf[..len], b"helloworld");
2004         assert_eq!(fin, false);
2005 
2006         assert!(stream.recv.almost_full());
2007 
2008         stream.recv.update_max_data();
2009         assert_eq!(stream.recv.max_data_next(), 25);
2010         assert!(!stream.recv.almost_full());
2011 
2012         let third = RangeBuf::from(b"something", 10, false);
2013         assert_eq!(stream.recv.push(third), Ok(()));
2014     }
2015 
2016     #[test]
recv_past_fin()2017     fn recv_past_fin() {
2018         let mut stream = Stream::new(15, 0, true, true);
2019         assert!(!stream.recv.almost_full());
2020 
2021         let first = RangeBuf::from(b"hello", 0, true);
2022         let second = RangeBuf::from(b"world", 5, false);
2023 
2024         assert_eq!(stream.recv.push(first), Ok(()));
2025         assert_eq!(stream.recv.push(second), Err(Error::FinalSize));
2026     }
2027 
2028     #[test]
recv_fin_dup()2029     fn recv_fin_dup() {
2030         let mut stream = Stream::new(15, 0, true, true);
2031         assert!(!stream.recv.almost_full());
2032 
2033         let first = RangeBuf::from(b"hello", 0, true);
2034         let second = RangeBuf::from(b"hello", 0, true);
2035 
2036         assert_eq!(stream.recv.push(first), Ok(()));
2037         assert_eq!(stream.recv.push(second), Ok(()));
2038 
2039         let mut buf = [0; 32];
2040 
2041         let (len, fin) = stream.recv.pop(&mut buf).unwrap();
2042         assert_eq!(&buf[..len], b"hello");
2043         assert_eq!(fin, true);
2044     }
2045 
2046     #[test]
recv_fin_change()2047     fn recv_fin_change() {
2048         let mut stream = Stream::new(15, 0, true, true);
2049         assert!(!stream.recv.almost_full());
2050 
2051         let first = RangeBuf::from(b"hello", 0, true);
2052         let second = RangeBuf::from(b"world", 5, true);
2053 
2054         assert_eq!(stream.recv.push(second), Ok(()));
2055         assert_eq!(stream.recv.push(first), Err(Error::FinalSize));
2056     }
2057 
2058     #[test]
recv_fin_lower_than_received()2059     fn recv_fin_lower_than_received() {
2060         let mut stream = Stream::new(15, 0, true, true);
2061         assert!(!stream.recv.almost_full());
2062 
2063         let first = RangeBuf::from(b"hello", 0, true);
2064         let second = RangeBuf::from(b"world", 5, false);
2065 
2066         assert_eq!(stream.recv.push(second), Ok(()));
2067         assert_eq!(stream.recv.push(first), Err(Error::FinalSize));
2068     }
2069 
2070     #[test]
recv_fin_flow_control()2071     fn recv_fin_flow_control() {
2072         let mut stream = Stream::new(15, 0, true, true);
2073         assert!(!stream.recv.almost_full());
2074 
2075         let mut buf = [0; 32];
2076 
2077         let first = RangeBuf::from(b"hello", 0, false);
2078         let second = RangeBuf::from(b"world", 5, true);
2079 
2080         assert_eq!(stream.recv.push(first), Ok(()));
2081         assert_eq!(stream.recv.push(second), Ok(()));
2082 
2083         let (len, fin) = stream.recv.pop(&mut buf).unwrap();
2084         assert_eq!(&buf[..len], b"helloworld");
2085         assert_eq!(fin, true);
2086 
2087         assert!(!stream.recv.almost_full());
2088     }
2089 
2090     #[test]
recv_fin_reset_mismatch()2091     fn recv_fin_reset_mismatch() {
2092         let mut stream = Stream::new(15, 0, true, true);
2093         assert!(!stream.recv.almost_full());
2094 
2095         let first = RangeBuf::from(b"hello", 0, true);
2096 
2097         assert_eq!(stream.recv.push(first), Ok(()));
2098         assert_eq!(stream.recv.reset(10), Err(Error::FinalSize));
2099     }
2100 
2101     #[test]
recv_reset_dup()2102     fn recv_reset_dup() {
2103         let mut stream = Stream::new(15, 0, true, true);
2104         assert!(!stream.recv.almost_full());
2105 
2106         let first = RangeBuf::from(b"hello", 0, false);
2107 
2108         assert_eq!(stream.recv.push(first), Ok(()));
2109         assert_eq!(stream.recv.reset(5), Ok(0));
2110         assert_eq!(stream.recv.reset(5), Ok(0));
2111     }
2112 
2113     #[test]
recv_reset_change()2114     fn recv_reset_change() {
2115         let mut stream = Stream::new(15, 0, true, true);
2116         assert!(!stream.recv.almost_full());
2117 
2118         let first = RangeBuf::from(b"hello", 0, false);
2119 
2120         assert_eq!(stream.recv.push(first), Ok(()));
2121         assert_eq!(stream.recv.reset(5), Ok(0));
2122         assert_eq!(stream.recv.reset(10), Err(Error::FinalSize));
2123     }
2124 
2125     #[test]
recv_reset_lower_than_received()2126     fn recv_reset_lower_than_received() {
2127         let mut stream = Stream::new(15, 0, true, true);
2128         assert!(!stream.recv.almost_full());
2129 
2130         let first = RangeBuf::from(b"hello", 0, false);
2131 
2132         assert_eq!(stream.recv.push(first), Ok(()));
2133         assert_eq!(stream.recv.reset(4), Err(Error::FinalSize));
2134     }
2135 
2136     #[test]
send_flow_control()2137     fn send_flow_control() {
2138         let mut stream = Stream::new(0, 15, true, true);
2139 
2140         let first = b"hello";
2141         let second = b"world";
2142         let third = b"something";
2143 
2144         assert!(stream.send.push_slice(first, false).is_ok());
2145         assert!(stream.send.push_slice(second, false).is_ok());
2146         assert!(stream.send.push_slice(third, false).is_ok());
2147 
2148         let write = stream.send.pop(25).unwrap();
2149         assert_eq!(write.off(), 0);
2150         assert_eq!(write.len(), 15);
2151         assert_eq!(write.fin(), false);
2152         assert_eq!(write.data, b"helloworldsomet");
2153 
2154         let write = stream.send.pop(25).unwrap();
2155         assert_eq!(write.off(), 15);
2156         assert_eq!(write.len(), 0);
2157         assert_eq!(write.fin(), false);
2158         assert_eq!(write.data, b"");
2159 
2160         let first = RangeBuf::from(b"helloworldsomet", 0, false);
2161         assert_eq!(stream.send.push(first), Ok(()));
2162 
2163         let write = stream.send.pop(10).unwrap();
2164         assert_eq!(write.off(), 0);
2165         assert_eq!(write.len(), 10);
2166         assert_eq!(write.fin(), false);
2167         assert_eq!(write.data, b"helloworld");
2168 
2169         let write = stream.send.pop(10).unwrap();
2170         assert_eq!(write.off(), 10);
2171         assert_eq!(write.len(), 5);
2172         assert_eq!(write.fin(), false);
2173         assert_eq!(write.data, b"somet");
2174     }
2175 
2176     #[test]
send_past_fin()2177     fn send_past_fin() {
2178         let mut stream = Stream::new(0, 15, true, true);
2179 
2180         let first = b"hello";
2181         let second = b"world";
2182         let third = b"third";
2183 
2184         assert_eq!(stream.send.push_slice(first, false), Ok(5));
2185 
2186         assert_eq!(stream.send.push_slice(second, true), Ok(5));
2187         assert!(stream.send.is_fin());
2188 
2189         assert_eq!(stream.send.push_slice(third, false), Err(Error::FinalSize));
2190     }
2191 
2192     #[test]
send_fin_dup()2193     fn send_fin_dup() {
2194         let mut stream = Stream::new(0, 15, true, true);
2195 
2196         let first = RangeBuf::from(b"hello", 0, true);
2197         let second = RangeBuf::from(b"hello", 0, true);
2198 
2199         assert_eq!(stream.send.push(first), Ok(()));
2200         assert_eq!(stream.send.push(second), Ok(()));
2201     }
2202 
2203     #[test]
send_undo_fin()2204     fn send_undo_fin() {
2205         let mut stream = Stream::new(0, 15, true, true);
2206 
2207         let first = b"hello";
2208         let second = RangeBuf::from(b"hello", 0, false);
2209 
2210         assert_eq!(stream.send.push_slice(first, true), Ok(5));
2211         assert!(stream.send.is_fin());
2212 
2213         assert_eq!(stream.send.push(second), Err(Error::FinalSize));
2214     }
2215 
2216     #[test]
send_fin_max_data_match()2217     fn send_fin_max_data_match() {
2218         let mut stream = Stream::new(0, 15, true, true);
2219 
2220         let slice = b"hellohellohello";
2221 
2222         assert!(stream.send.push_slice(slice, true).is_ok());
2223 
2224         let write = stream.send.pop(15).unwrap();
2225         assert_eq!(write.off(), 0);
2226         assert_eq!(write.len(), 15);
2227         assert_eq!(write.fin(), true);
2228         assert_eq!(write.data, slice);
2229     }
2230 
2231     #[test]
send_fin_zero_length()2232     fn send_fin_zero_length() {
2233         let mut stream = Stream::new(0, 15, true, true);
2234 
2235         assert_eq!(stream.send.push_slice(b"hello", false), Ok(5));
2236         assert_eq!(stream.send.push_slice(b"", true), Ok(0));
2237         assert!(stream.send.is_fin());
2238 
2239         let write = stream.send.pop(5).unwrap();
2240         assert_eq!(write.off(), 0);
2241         assert_eq!(write.len(), 5);
2242         assert_eq!(write.fin(), true);
2243         assert_eq!(write.data, b"hello");
2244     }
2245 
2246     #[test]
send_ack()2247     fn send_ack() {
2248         let mut stream = Stream::new(0, 15, true, true);
2249 
2250         assert_eq!(stream.send.push_slice(b"hello", false), Ok(5));
2251         assert_eq!(stream.send.push_slice(b"world", false), Ok(5));
2252         assert_eq!(stream.send.push_slice(b"", true), Ok(0));
2253         assert!(stream.send.is_fin());
2254 
2255         let write = stream.send.pop(5).unwrap();
2256         assert_eq!(write.off(), 0);
2257         assert_eq!(write.len(), 5);
2258         assert_eq!(write.fin(), false);
2259         assert_eq!(write.data, b"hello");
2260 
2261         stream.send.ack(write.off(), write.len());
2262 
2263         assert_eq!(stream.send.push(write), Ok(()));
2264 
2265         let write = stream.send.pop(5).unwrap();
2266         assert_eq!(write.off(), 5);
2267         assert_eq!(write.len(), 5);
2268         assert_eq!(write.fin(), true);
2269         assert_eq!(write.data, b"world");
2270     }
2271 
2272     #[test]
send_ack_reordering()2273     fn send_ack_reordering() {
2274         let mut stream = Stream::new(0, 15, true, true);
2275 
2276         assert_eq!(stream.send.push_slice(b"hello", false), Ok(5));
2277         assert_eq!(stream.send.push_slice(b"world", false), Ok(5));
2278         assert_eq!(stream.send.push_slice(b"", true), Ok(0));
2279         assert!(stream.send.is_fin());
2280 
2281         let write1 = stream.send.pop(5).unwrap();
2282         assert_eq!(write1.off(), 0);
2283         assert_eq!(write1.len(), 5);
2284         assert_eq!(write1.fin(), false);
2285         assert_eq!(write1.data, b"hello");
2286 
2287         let write2 = stream.send.pop(1).unwrap();
2288         assert_eq!(write2.off(), 5);
2289         assert_eq!(write2.len(), 1);
2290         assert_eq!(write2.fin(), false);
2291         assert_eq!(write2.data, b"w");
2292 
2293         stream.send.ack(write2.off(), write2.len());
2294         stream.send.ack(write1.off(), write1.len());
2295 
2296         assert_eq!(stream.send.push(write1), Ok(()));
2297         assert_eq!(stream.send.push(write2), Ok(()));
2298 
2299         let write = stream.send.pop(5).unwrap();
2300         assert_eq!(write.off(), 6);
2301         assert_eq!(write.len(), 4);
2302         assert_eq!(write.fin(), true);
2303         assert_eq!(write.data, b"orld");
2304     }
2305 
2306     #[test]
recv_data_below_off()2307     fn recv_data_below_off() {
2308         let mut stream = Stream::new(15, 0, true, true);
2309 
2310         let first = RangeBuf::from(b"hello", 0, false);
2311 
2312         assert_eq!(stream.recv.push(first), Ok(()));
2313 
2314         let mut buf = [0; 10];
2315 
2316         let (len, fin) = stream.recv.pop(&mut buf).unwrap();
2317         assert_eq!(&buf[..len], b"hello");
2318         assert_eq!(fin, false);
2319 
2320         let first = RangeBuf::from(b"elloworld", 1, true);
2321         assert_eq!(stream.recv.push(first), Ok(()));
2322 
2323         let (len, fin) = stream.recv.pop(&mut buf).unwrap();
2324         assert_eq!(&buf[..len], b"world");
2325         assert_eq!(fin, true);
2326     }
2327 
2328     #[test]
stream_complete()2329     fn stream_complete() {
2330         let mut stream = Stream::new(30, 30, true, true);
2331 
2332         assert_eq!(stream.send.push_slice(b"hello", false), Ok(5));
2333         assert_eq!(stream.send.push_slice(b"world", false), Ok(5));
2334 
2335         assert!(!stream.send.is_complete());
2336         assert!(!stream.send.is_fin());
2337 
2338         assert_eq!(stream.send.push_slice(b"", true), Ok(0));
2339 
2340         assert!(!stream.send.is_complete());
2341         assert!(stream.send.is_fin());
2342 
2343         let buf = RangeBuf::from(b"hello", 0, true);
2344         assert!(stream.recv.push(buf).is_ok());
2345         assert!(!stream.recv.is_fin());
2346 
2347         stream.send.ack(6, 4);
2348         assert!(!stream.send.is_complete());
2349 
2350         let mut buf = [0; 2];
2351         assert_eq!(stream.recv.pop(&mut buf), Ok((2, false)));
2352         assert!(!stream.recv.is_fin());
2353 
2354         stream.send.ack(1, 5);
2355         assert!(!stream.send.is_complete());
2356 
2357         stream.send.ack(0, 1);
2358         assert!(stream.send.is_complete());
2359 
2360         assert!(!stream.is_complete());
2361 
2362         let mut buf = [0; 3];
2363         assert_eq!(stream.recv.pop(&mut buf), Ok((3, true)));
2364         assert!(stream.recv.is_fin());
2365 
2366         assert!(stream.is_complete());
2367     }
2368 
2369     #[test]
send_fin_zero_length_output()2370     fn send_fin_zero_length_output() {
2371         let mut stream = Stream::new(0, 15, true, true);
2372 
2373         assert_eq!(stream.send.push_slice(b"hello", false), Ok(5));
2374         assert!(!stream.send.is_fin());
2375 
2376         let write = stream.send.pop(5).unwrap();
2377         assert_eq!(write.off(), 0);
2378         assert_eq!(write.len(), 5);
2379         assert_eq!(write.fin(), false);
2380         assert_eq!(write.data, b"hello");
2381 
2382         assert_eq!(stream.send.push_slice(b"", true), Ok(0));
2383         assert!(stream.send.is_fin());
2384 
2385         let write = stream.send.pop(5).unwrap();
2386         assert_eq!(write.off(), 5);
2387         assert_eq!(write.len(), 0);
2388         assert_eq!(write.fin(), true);
2389         assert_eq!(write.data, b"");
2390     }
2391 }
2392