1 // Copyright (C) 2018-2019, Cloudflare, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 //     * Redistributions of source code must retain the above copyright notice,
9 //       this list of conditions and the following disclaimer.
10 //
11 //     * Redistributions in binary form must reproduce the above copyright
12 //       notice, this list of conditions and the following disclaimer in the
13 //       documentation and/or other materials provided with the distribution.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 
27 //! �� Savoury implementation of the QUIC transport protocol and HTTP/3.
28 //!
29 //! [quiche] is an implementation of the QUIC transport protocol and HTTP/3 as
30 //! specified by the [IETF]. It provides a low level API for processing QUIC
31 //! packets and handling connection state. The application is responsible for
32 //! providing I/O (e.g. sockets handling) as well as an event loop with support
33 //! for timers.
34 //!
35 //! [quiche]: https://github.com/cloudflare/quiche/
36 //! [ietf]: https://quicwg.org/
37 //!
38 //! ## Connection setup
39 //!
40 //! The first step in establishing a QUIC connection using quiche is creating a
41 //! configuration object:
42 //!
43 //! ```
44 //! let config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
45 //! # Ok::<(), quiche::Error>(())
46 //! ```
47 //!
48 //! This is shared among multiple connections and can be used to configure a
49 //! QUIC endpoint.
50 //!
51 //! On the client-side the [`connect()`] utility function can be used to create
52 //! a new connection, while [`accept()`] is for servers:
53 //!
54 //! ```
55 //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
56 //! # let server_name = "quic.tech";
57 //! # let scid = [0xba; 16];
58 //! // Client connection.
59 //! let conn = quiche::connect(Some(&server_name), &scid, &mut config)?;
60 //!
61 //! // Server connection.
62 //! let conn = quiche::accept(&scid, None, &mut config)?;
63 //! # Ok::<(), quiche::Error>(())
64 //! ```
65 //!
66 //! ## Handling incoming packets
67 //!
68 //! Using the connection's [`recv()`] method the application can process
69 //! incoming packets that belong to that connection from the network:
70 //!
71 //! ```no_run
72 //! # let mut buf = [0; 512];
73 //! # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
74 //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
75 //! # let scid = [0xba; 16];
76 //! # let mut conn = quiche::accept(&scid, None, &mut config)?;
77 //! loop {
78 //!     let read = socket.recv(&mut buf).unwrap();
79 //!
80 //!     let read = match conn.recv(&mut buf[..read]) {
81 //!         Ok(v) => v,
82 //!
83 //!         Err(quiche::Error::Done) => {
84 //!             // Done reading.
85 //!             break;
86 //!         },
87 //!
88 //!         Err(e) => {
89 //!             // An error occurred, handle it.
90 //!             break;
91 //!         },
92 //!     };
93 //! }
94 //! # Ok::<(), quiche::Error>(())
95 //! ```
96 //!
97 //! ## Generating outgoing packets
98 //!
99 //! Outgoing packet are generated using the connection's [`send()`] method
100 //! instead:
101 //!
102 //! ```no_run
103 //! # let mut out = [0; 512];
104 //! # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
105 //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
106 //! # let scid = [0xba; 16];
107 //! # let mut conn = quiche::accept(&scid, None, &mut config)?;
108 //! loop {
109 //!     let write = match conn.send(&mut out) {
110 //!         Ok(v) => v,
111 //!
112 //!         Err(quiche::Error::Done) => {
113 //!             // Done writing.
114 //!             break;
115 //!         },
116 //!
117 //!         Err(e) => {
118 //!             // An error occurred, handle it.
119 //!             break;
120 //!         },
121 //!     };
122 //!
123 //!     socket.send(&out[..write]).unwrap();
124 //! }
125 //! # Ok::<(), quiche::Error>(())
126 //! ```
127 //!
128 //! When packets are sent, the application is responsible for maintaining a
129 //! timer to react to time-based connection events. The timer expiration can be
130 //! obtained using the connection's [`timeout()`] method.
131 //!
132 //! ```
133 //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
134 //! # let scid = [0xba; 16];
135 //! # let mut conn = quiche::accept(&scid, None, &mut config)?;
136 //! let timeout = conn.timeout();
137 //! # Ok::<(), quiche::Error>(())
138 //! ```
139 //!
140 //! The application is responsible for providing a timer implementation, which
141 //! can be specific to the operating system or networking framework used. When
142 //! a timer expires, the connection's [`on_timeout()`] method should be called,
143 //! after which additional packets might need to be sent on the network:
144 //!
145 //! ```no_run
146 //! # let mut out = [0; 512];
147 //! # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
148 //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
149 //! # let scid = [0xba; 16];
150 //! # let mut conn = quiche::accept(&scid, None, &mut config)?;
151 //! // Timeout expired, handle it.
152 //! conn.on_timeout();
153 //!
154 //! // Send more packets as needed after timeout.
155 //! loop {
156 //!     let write = match conn.send(&mut out) {
157 //!         Ok(v) => v,
158 //!
159 //!         Err(quiche::Error::Done) => {
160 //!             // Done writing.
161 //!             break;
162 //!         },
163 //!
164 //!         Err(e) => {
165 //!             // An error occurred, handle it.
166 //!             break;
167 //!         },
168 //!     };
169 //!
170 //!     socket.send(&out[..write]).unwrap();
171 //! }
172 //! # Ok::<(), quiche::Error>(())
173 //! ```
174 //!
175 //! ## Sending and receiving stream data
176 //!
177 //! After some back and forth, the connection will complete its handshake and
178 //! will be ready for sending or receiving application data.
179 //!
180 //! Data can be sent on a stream by using the [`stream_send()`] method:
181 //!
182 //! ```no_run
183 //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
184 //! # let scid = [0xba; 16];
185 //! # let mut conn = quiche::accept(&scid, None, &mut config)?;
186 //! if conn.is_established() {
187 //!     // Handshake completed, send some data on stream 0.
188 //!     conn.stream_send(0, b"hello", true)?;
189 //! }
190 //! # Ok::<(), quiche::Error>(())
191 //! ```
192 //!
193 //! The application can check whether there are any readable streams by using
194 //! the connection's [`readable()`] method, which returns an iterator over all
195 //! the streams that have outstanding data to read.
196 //!
197 //! The [`stream_recv()`] method can then be used to retrieve the application
198 //! data from the readable stream:
199 //!
200 //! ```no_run
201 //! # let mut buf = [0; 512];
202 //! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
203 //! # let scid = [0xba; 16];
204 //! # let mut conn = quiche::accept(&scid, None, &mut config)?;
205 //! if conn.is_established() {
206 //!     // Iterate over readable streams.
207 //!     for stream_id in conn.readable() {
208 //!         // Stream is readable, read until there's no more data.
209 //!         while let Ok((read, fin)) = conn.stream_recv(stream_id, &mut buf) {
210 //!             println!("Got {} bytes on stream {}", read, stream_id);
211 //!         }
212 //!     }
213 //! }
214 //! # Ok::<(), quiche::Error>(())
215 //! ```
216 //!
217 //! ## HTTP/3
218 //!
219 //! The quiche [HTTP/3 module] provides a high level API for sending and
220 //! receiving HTTP requests and responses on top of the QUIC transport protocol.
221 //!
222 //! [`connect()`]: fn.connect.html
223 //! [`accept()`]: fn.accept.html
224 //! [`recv()`]: struct.Connection.html#method.recv
225 //! [`send()`]: struct.Connection.html#method.send
226 //! [`timeout()`]: struct.Connection.html#method.timeout
227 //! [`on_timeout()`]: struct.Connection.html#method.on_timeout
228 //! [`stream_send()`]: struct.Connection.html#method.stream_send
229 //! [`readable()`]: struct.Connection.html#method.readable
230 //! [`stream_recv()`]: struct.Connection.html#method.stream_recv
231 //! [HTTP/3 module]: h3/index.html
232 //!
233 //! ## Congestion Control
234 //!
235 //! The quiche library provides a high-level API for configuring which
236 //! congestion control algorithm to use throughout the QUIC connection.
237 //!
238 //! When a QUIC connection is created, the application can optionally choose
239 //! which CC algorithm to use. See [`CongestionControlAlgorithm`] for currently
240 //! available congestion control algorithms.
241 //!
242 //! For example:
243 //!
244 //! ```
245 //! let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
246 //! config.set_cc_algorithm(quiche::CongestionControlAlgorithm::Reno);
247 //! ```
248 //!
249 //! Alternatively, you can configure the congestion control algorithm to use
250 //! by its name.
251 //!
252 //! ```
253 //! let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
254 //! config.set_cc_algorithm_name("reno").unwrap();
255 //! ```
256 //!
257 //! Note that the CC algorithm should be configured before calling [`connect()`]
258 //! or [`accept()`]. Otherwise the connection will use a default CC algorithm.
259 //!
260 //! [`CongestionControlAlgorithm`]: enum.CongestionControlAlgorithm.html
261 
262 #![allow(improper_ctypes)]
263 #![warn(missing_docs)]
264 
265 #[macro_use]
266 extern crate log;
267 
268 use std::cmp;
269 use std::time;
270 
271 use std::pin::Pin;
272 use std::str::FromStr;
273 
274 /// The current QUIC wire version.
275 pub const PROTOCOL_VERSION: u32 = PROTOCOL_VERSION_DRAFT29;
276 
277 /// Supported QUIC versions.
278 ///
279 /// Note that the older ones might not be fully supported.
280 const PROTOCOL_VERSION_DRAFT27: u32 = 0xff00_001b;
281 const PROTOCOL_VERSION_DRAFT28: u32 = 0xff00_001c;
282 const PROTOCOL_VERSION_DRAFT29: u32 = 0xff00_001d;
283 
284 /// The maximum length of a connection ID.
285 pub const MAX_CONN_ID_LEN: usize = crate::packet::MAX_CID_LEN as usize;
286 
287 /// The minimum length of Initial packets sent by a client.
288 pub const MIN_CLIENT_INITIAL_LEN: usize = 1200;
289 
290 #[cfg(not(feature = "fuzzing"))]
291 const PAYLOAD_MIN_LEN: usize = 4;
292 
293 #[cfg(feature = "fuzzing")]
294 // Due to the fact that in fuzzing mode we use a zero-length AEAD tag (which
295 // would normally be 16 bytes), we need to adjust the minimum payload size to
296 // account for that.
297 const PAYLOAD_MIN_LEN: usize = 20;
298 
299 const MAX_AMPLIFICATION_FACTOR: usize = 3;
300 
301 // The maximum number of tracked packet number ranges that need to be acked.
302 //
303 // This represents more or less how many ack blocks can fit in a typical packet.
304 const MAX_ACK_RANGES: usize = 68;
305 
306 // The highest possible stream ID allowed.
307 const MAX_STREAM_ID: u64 = 1 << 60;
308 
309 // The default length of DATAGRAM queues.
310 const DEFAULT_MAX_DGRAM_QUEUE_LEN: usize = 0;
311 
312 // The DATAGRAM standard recommends either none or 65536 as maximum DATAGRAM
313 // frames size. We enforce the recommendation for forward compatibility.
314 const MAX_DGRAM_FRAME_SIZE: u64 = 65536;
315 
316 /// A specialized [`Result`] type for quiche operations.
317 ///
318 /// This type is used throughout quiche's public API for any operation that
319 /// can produce an error.
320 ///
321 /// [`Result`]: https://doc.rust-lang.org/std/result/enum.Result.html
322 pub type Result<T> = std::result::Result<T, Error>;
323 
324 /// A QUIC error.
325 #[derive(Clone, Copy, Debug, PartialEq)]
326 #[repr(C)]
327 pub enum Error {
328     /// There is no more work to do.
329     Done               = -1,
330 
331     /// The provided buffer is too short.
332     BufferTooShort     = -2,
333 
334     /// The provided packet cannot be parsed because its version is unknown.
335     UnknownVersion     = -3,
336 
337     /// The provided packet cannot be parsed because it contains an invalid
338     /// frame.
339     InvalidFrame       = -4,
340 
341     /// The provided packet cannot be parsed.
342     InvalidPacket      = -5,
343 
344     /// The operation cannot be completed because the connection is in an
345     /// invalid state.
346     InvalidState       = -6,
347 
348     /// The operation cannot be completed because the stream is in an
349     /// invalid state.
350     InvalidStreamState = -7,
351 
352     /// The peer's transport params cannot be parsed.
353     InvalidTransportParam = -8,
354 
355     /// A cryptographic operation failed.
356     CryptoFail         = -9,
357 
358     /// The TLS handshake failed.
359     TlsFail            = -10,
360 
361     /// The peer violated the local flow control limits.
362     FlowControl        = -11,
363 
364     /// The peer violated the local stream limits.
365     StreamLimit        = -12,
366 
367     /// The received data exceeds the stream's final size.
368     FinalSize          = -13,
369 
370     /// Error in congestion control.
371     CongestionControl  = -14,
372 }
373 
374 impl Error {
to_wire(self) -> u64375     fn to_wire(self) -> u64 {
376         match self {
377             Error::Done => 0x0,
378             Error::InvalidFrame => 0x7,
379             Error::InvalidStreamState => 0x5,
380             Error::InvalidTransportParam => 0x8,
381             Error::FlowControl => 0x3,
382             Error::StreamLimit => 0x4,
383             Error::FinalSize => 0x6,
384             _ => 0xa,
385         }
386     }
387 
to_c(self) -> libc::ssize_t388     fn to_c(self) -> libc::ssize_t {
389         self as _
390     }
391 }
392 
393 impl std::fmt::Display for Error {
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result394     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
395         write!(f, "{:?}", self)
396     }
397 }
398 
399 impl std::error::Error for Error {
source(&self) -> Option<&(dyn std::error::Error + 'static)>400     fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
401         None
402     }
403 }
404 
405 impl std::convert::From<octets::BufferTooShortError> for Error {
from(_err: octets::BufferTooShortError) -> Self406     fn from(_err: octets::BufferTooShortError) -> Self {
407         Error::BufferTooShort
408     }
409 }
410 
411 /// The stream's side to shutdown.
412 ///
413 /// This should be used when calling [`stream_shutdown()`].
414 ///
415 /// [`stream_shutdown()`]: struct.Connection.html#method.stream_shutdown
416 #[repr(C)]
417 pub enum Shutdown {
418     /// Stop receiving stream data.
419     Read  = 0,
420 
421     /// Stop sending stream data.
422     Write = 1,
423 }
424 
425 /// Stores configuration shared between multiple connections.
426 pub struct Config {
427     local_transport_params: TransportParams,
428 
429     version: u32,
430 
431     tls_ctx: tls::Context,
432 
433     application_protos: Vec<Vec<u8>>,
434 
435     grease: bool,
436 
437     cc_algorithm: CongestionControlAlgorithm,
438 
439     hystart: bool,
440 
441     dgram_recv_max_queue_len: usize,
442     dgram_send_max_queue_len: usize,
443 }
444 
445 impl Config {
446     /// Creates a config object with the given version.
447     ///
448     /// ## Examples:
449     ///
450     /// ```
451     /// let config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
452     /// # Ok::<(), quiche::Error>(())
453     /// ```
new(version: u32) -> Result<Config>454     pub fn new(version: u32) -> Result<Config> {
455         let tls_ctx = tls::Context::new()?;
456 
457         Ok(Config {
458             local_transport_params: TransportParams::default(),
459             version,
460             tls_ctx,
461             application_protos: Vec::new(),
462             grease: true,
463             cc_algorithm: CongestionControlAlgorithm::CUBIC,
464             hystart: true,
465 
466             dgram_recv_max_queue_len: DEFAULT_MAX_DGRAM_QUEUE_LEN,
467             dgram_send_max_queue_len: DEFAULT_MAX_DGRAM_QUEUE_LEN,
468         })
469     }
470 
471     /// Configures the given certificate chain.
472     ///
473     /// The content of `file` is parsed as a PEM-encoded leaf certificate,
474     /// followed by optional intermediate certificates.
475     ///
476     /// ## Examples:
477     ///
478     /// ```no_run
479     /// # let mut config = quiche::Config::new(0xbabababa)?;
480     /// config.load_cert_chain_from_pem_file("/path/to/cert.pem")?;
481     /// # Ok::<(), quiche::Error>(())
482     /// ```
load_cert_chain_from_pem_file(&mut self, file: &str) -> Result<()>483     pub fn load_cert_chain_from_pem_file(&mut self, file: &str) -> Result<()> {
484         self.tls_ctx.use_certificate_chain_file(file)
485     }
486 
487     /// Configures the given private key.
488     ///
489     /// The content of `file` is parsed as a PEM-encoded private key.
490     ///
491     /// ## Examples:
492     ///
493     /// ```no_run
494     /// # let mut config = quiche::Config::new(0xbabababa)?;
495     /// config.load_priv_key_from_pem_file("/path/to/key.pem")?;
496     /// # Ok::<(), quiche::Error>(())
497     /// ```
load_priv_key_from_pem_file(&mut self, file: &str) -> Result<()>498     pub fn load_priv_key_from_pem_file(&mut self, file: &str) -> Result<()> {
499         self.tls_ctx.use_privkey_file(file)
500     }
501 
502     /// Specifies a file where trusted CA certificates are stored for the
503     /// purposes of certificate verification.
504     ///
505     /// The content of `file` is parsed as a PEM-encoded certificate chain.
506     ///
507     /// ## Examples:
508     ///
509     /// ```no_run
510     /// # let mut config = quiche::Config::new(0xbabababa)?;
511     /// config.load_verify_locations_from_file("/path/to/cert.pem")?;
512     /// # Ok::<(), quiche::Error>(())
513     /// ```
load_verify_locations_from_file(&mut self, file: &str) -> Result<()>514     pub fn load_verify_locations_from_file(&mut self, file: &str) -> Result<()> {
515         self.tls_ctx.load_verify_locations_from_file(file)
516     }
517 
518     /// Specifies a directory where trusted CA certificates are stored for the
519     /// purposes of certificate verification.
520     ///
521     /// The content of `dir` a set of PEM-encoded certificate chains.
522     ///
523     /// ## Examples:
524     ///
525     /// ```no_run
526     /// # let mut config = quiche::Config::new(0xbabababa)?;
527     /// config.load_verify_locations_from_directory("/path/to/certs")?;
528     /// # Ok::<(), quiche::Error>(())
529     /// ```
load_verify_locations_from_directory( &mut self, dir: &str, ) -> Result<()>530     pub fn load_verify_locations_from_directory(
531         &mut self, dir: &str,
532     ) -> Result<()> {
533         self.tls_ctx.load_verify_locations_from_directory(dir)
534     }
535 
536     /// Configures whether to verify the peer's certificate.
537     ///
538     /// The default value is `true` for client connections, and `false` for
539     /// server ones.
verify_peer(&mut self, verify: bool)540     pub fn verify_peer(&mut self, verify: bool) {
541         self.tls_ctx.set_verify(verify);
542     }
543 
544     /// Configures whether to send GREASE values.
545     ///
546     /// The default value is `true`.
grease(&mut self, grease: bool)547     pub fn grease(&mut self, grease: bool) {
548         self.grease = grease;
549     }
550 
551     /// Enables logging of secrets.
552     ///
553     /// When logging is enabled, the [`set_keylog()`] method must be called on
554     /// the connection for its cryptographic secrets to be logged in the
555     /// [keylog] format to the specified writer.
556     ///
557     /// [`set_keylog()`]: struct.Connection.html#method.set_keylog
558     /// [keylog]: https://developer.mozilla.org/en-US/docs/Mozilla/Projects/NSS/Key_Log_Format
log_keys(&mut self)559     pub fn log_keys(&mut self) {
560         self.tls_ctx.enable_keylog();
561     }
562 
563     /// Enables sending or receiving early data.
enable_early_data(&mut self)564     pub fn enable_early_data(&mut self) {
565         self.tls_ctx.set_early_data_enabled(true);
566     }
567 
568     /// Configures the list of supported application protocols.
569     ///
570     /// The list of protocols `protos` must be in wire-format (i.e. a series
571     /// of non-empty, 8-bit length-prefixed strings).
572     ///
573     /// On the client this configures the list of protocols to send to the
574     /// server as part of the ALPN extension.
575     ///
576     /// On the server this configures the list of supported protocols to match
577     /// against the client-supplied list.
578     ///
579     /// Applications must set a value, but no default is provided.
580     ///
581     /// ## Examples:
582     ///
583     /// ```
584     /// # let mut config = quiche::Config::new(0xbabababa)?;
585     /// config.set_application_protos(b"\x08http/1.1\x08http/0.9")?;
586     /// # Ok::<(), quiche::Error>(())
587     /// ```
set_application_protos(&mut self, protos: &[u8]) -> Result<()>588     pub fn set_application_protos(&mut self, protos: &[u8]) -> Result<()> {
589         let mut b = octets::Octets::with_slice(&protos);
590 
591         let mut protos_list = Vec::new();
592 
593         while let Ok(proto) = b.get_bytes_with_u8_length() {
594             protos_list.push(proto.to_vec());
595         }
596 
597         self.application_protos = protos_list;
598 
599         self.tls_ctx.set_alpn(&self.application_protos)
600     }
601 
602     /// Sets the `max_idle_timeout` transport parameter.
603     ///
604     /// The default value is infinite, that is, no timeout is used.
set_max_idle_timeout(&mut self, v: u64)605     pub fn set_max_idle_timeout(&mut self, v: u64) {
606         self.local_transport_params.max_idle_timeout = v;
607     }
608 
609     /// Sets the `max_udp_payload_size transport` parameter.
610     ///
611     /// The default value is `65527`.
set_max_udp_payload_size(&mut self, v: u64)612     pub fn set_max_udp_payload_size(&mut self, v: u64) {
613         self.local_transport_params.max_udp_payload_size = v;
614     }
615 
616     /// Sets the `initial_max_data` transport parameter.
617     ///
618     /// When set to a non-zero value quiche will only allow at most `v` bytes
619     /// of incoming stream data to be buffered for the whole connection (that
620     /// is, data that is not yet read by the application) and will allow more
621     /// data to be received as the buffer is consumed by the application.
622     ///
623     /// The default value is `0`.
set_initial_max_data(&mut self, v: u64)624     pub fn set_initial_max_data(&mut self, v: u64) {
625         self.local_transport_params.initial_max_data = v;
626     }
627 
628     /// Sets the `initial_max_stream_data_bidi_local` transport parameter.
629     ///
630     /// When set to a non-zero value quiche will only allow at most `v` bytes
631     /// of incoming stream data to be buffered for each locally-initiated
632     /// bidirectional stream (that is, data that is not yet read by the
633     /// application) and will allow more data to be received as the buffer is
634     /// consumed by the application.
635     ///
636     /// The default value is `0`.
set_initial_max_stream_data_bidi_local(&mut self, v: u64)637     pub fn set_initial_max_stream_data_bidi_local(&mut self, v: u64) {
638         self.local_transport_params
639             .initial_max_stream_data_bidi_local = v;
640     }
641 
642     /// Sets the `initial_max_stream_data_bidi_remote` transport parameter.
643     ///
644     /// When set to a non-zero value quiche will only allow at most `v` bytes
645     /// of incoming stream data to be buffered for each remotely-initiated
646     /// bidirectional stream (that is, data that is not yet read by the
647     /// application) and will allow more data to be received as the buffer is
648     /// consumed by the application.
649     ///
650     /// The default value is `0`.
set_initial_max_stream_data_bidi_remote(&mut self, v: u64)651     pub fn set_initial_max_stream_data_bidi_remote(&mut self, v: u64) {
652         self.local_transport_params
653             .initial_max_stream_data_bidi_remote = v;
654     }
655 
656     /// Sets the `initial_max_stream_data_uni` transport parameter.
657     ///
658     /// When set to a non-zero value quiche will only allow at most `v` bytes
659     /// of incoming stream data to be buffered for each unidirectional stream
660     /// (that is, data that is not yet read by the application) and will allow
661     /// more data to be received as the buffer is consumed by the application.
662     ///
663     /// The default value is `0`.
set_initial_max_stream_data_uni(&mut self, v: u64)664     pub fn set_initial_max_stream_data_uni(&mut self, v: u64) {
665         self.local_transport_params.initial_max_stream_data_uni = v;
666     }
667 
668     /// Sets the `initial_max_streams_bidi` transport parameter.
669     ///
670     /// When set to a non-zero value quiche will only allow `v` number of
671     /// concurrent remotely-initiated bidirectional streams to be open at any
672     /// given time and will increase the limit automatically as streams are
673     /// completed.
674     ///
675     /// A bidirectional stream is considered completed when all incoming data
676     /// has been read by the application (up to the `fin` offset) or the
677     /// stream's read direction has been shutdown, and all outgoing data has
678     /// been acked by the peer (up to the `fin` offset) or the stream's write
679     /// direction has been shutdown.
680     ///
681     /// The default value is `0`.
set_initial_max_streams_bidi(&mut self, v: u64)682     pub fn set_initial_max_streams_bidi(&mut self, v: u64) {
683         self.local_transport_params.initial_max_streams_bidi = v;
684     }
685 
686     /// Sets the `initial_max_streams_uni` transport parameter.
687     ///
688     /// When set to a non-zero value quiche will only allow `v` number of
689     /// concurrent remotely-initiated unidirectional streams to be open at any
690     /// given time and will increase the limit automatically as streams are
691     /// completed.
692     ///
693     /// A unidirectional stream is considered completed when all incoming data
694     /// has been read by the application (up to the `fin` offset) or the
695     /// stream's read direction has been shutdown.
696     ///
697     /// The default value is `0`.
set_initial_max_streams_uni(&mut self, v: u64)698     pub fn set_initial_max_streams_uni(&mut self, v: u64) {
699         self.local_transport_params.initial_max_streams_uni = v;
700     }
701 
702     /// Sets the `ack_delay_exponent` transport parameter.
703     ///
704     /// The default value is `3`.
set_ack_delay_exponent(&mut self, v: u64)705     pub fn set_ack_delay_exponent(&mut self, v: u64) {
706         self.local_transport_params.ack_delay_exponent = v;
707     }
708 
709     /// Sets the `max_ack_delay` transport parameter.
710     ///
711     /// The default value is `25`.
set_max_ack_delay(&mut self, v: u64)712     pub fn set_max_ack_delay(&mut self, v: u64) {
713         self.local_transport_params.max_ack_delay = v;
714     }
715 
716     /// Sets the `disable_active_migration` transport parameter.
717     ///
718     /// The default value is `false`.
set_disable_active_migration(&mut self, v: bool)719     pub fn set_disable_active_migration(&mut self, v: bool) {
720         self.local_transport_params.disable_active_migration = v;
721     }
722 
723     /// Sets the congestion control algorithm used by string.
724     ///
725     /// The default value is `reno`. On error `Error::CongestionControl`
726     /// will be returned.
727     ///
728     /// ## Examples:
729     ///
730     /// ```
731     /// # let mut config = quiche::Config::new(0xbabababa)?;
732     /// config.set_cc_algorithm_name("reno");
733     /// # Ok::<(), quiche::Error>(())
734     /// ```
set_cc_algorithm_name(&mut self, name: &str) -> Result<()>735     pub fn set_cc_algorithm_name(&mut self, name: &str) -> Result<()> {
736         self.cc_algorithm = CongestionControlAlgorithm::from_str(name)?;
737 
738         Ok(())
739     }
740 
741     /// Sets the congestion control algorithm used.
742     ///
743     /// The default value is `CongestionControlAlgorithm::CUBIC`.
set_cc_algorithm(&mut self, algo: CongestionControlAlgorithm)744     pub fn set_cc_algorithm(&mut self, algo: CongestionControlAlgorithm) {
745         self.cc_algorithm = algo;
746     }
747 
748     /// Configures whether to enable HyStart++.
749     ///
750     /// The default value is `true`.
enable_hystart(&mut self, v: bool)751     pub fn enable_hystart(&mut self, v: bool) {
752         self.hystart = v;
753     }
754 
755     /// Configures whether to enable receiving DATAGRAM frames.
756     ///
757     /// When enabled, the `max_datagram_frame_size` transport parameter is set
758     /// to 65536 as recommended by draft-ietf-quic-datagram-01.
759     ///
760     /// The default is `false`.
enable_dgram( &mut self, enabled: bool, recv_queue_len: usize, send_queue_len: usize, )761     pub fn enable_dgram(
762         &mut self, enabled: bool, recv_queue_len: usize, send_queue_len: usize,
763     ) {
764         self.local_transport_params.max_datagram_frame_size = if enabled {
765             Some(MAX_DGRAM_FRAME_SIZE)
766         } else {
767             None
768         };
769         self.dgram_recv_max_queue_len = recv_queue_len;
770         self.dgram_send_max_queue_len = send_queue_len;
771     }
772 }
773 
774 /// A QUIC connection.
775 pub struct Connection {
776     /// QUIC wire version used for the connection.
777     version: u32,
778 
779     /// Peer's connection ID.
780     dcid: Vec<u8>,
781 
782     /// Local connection ID.
783     scid: Vec<u8>,
784 
785     /// Unique opaque ID for the connection that can be used for logging.
786     trace_id: String,
787 
788     /// Packet number spaces.
789     pkt_num_spaces: [packet::PktNumSpace; packet::EPOCH_COUNT],
790 
791     /// Peer's transport parameters.
792     peer_transport_params: TransportParams,
793 
794     /// Local transport parameters.
795     local_transport_params: TransportParams,
796 
797     /// TLS handshake state.
798     handshake: tls::Handshake,
799 
800     /// Loss recovery and congestion control state.
801     recovery: recovery::Recovery,
802 
803     /// List of supported application protocols.
804     application_protos: Vec<Vec<u8>>,
805 
806     /// Total number of received packets.
807     recv_count: usize,
808 
809     /// Total number of sent packets.
810     sent_count: usize,
811 
812     /// Total number of bytes received from the peer.
813     rx_data: u64,
814 
815     /// Local flow control limit for the connection.
816     max_rx_data: u64,
817 
818     /// Updated local flow control limit for the connection. This is used to
819     /// trigger sending MAX_DATA frames after a certain threshold.
820     max_rx_data_next: u64,
821 
822     /// Whether we send MAX_DATA frame.
823     almost_full: bool,
824 
825     /// Total number of bytes sent to the peer.
826     tx_data: u64,
827 
828     /// Peer's flow control limit for the connection.
829     max_tx_data: u64,
830 
831     /// Total number of bytes the server can send before the peer's address
832     /// is verified.
833     max_send_bytes: usize,
834 
835     /// Streams map, indexed by stream ID.
836     streams: stream::StreamMap,
837 
838     /// Peer's original destination connection ID. Used by the client to
839     /// validate the server's transport parameter.
840     odcid: Option<Vec<u8>>,
841 
842     /// Peer's retry source connection ID. Used by the client during stateless
843     /// retry to validate the server's transport parameter.
844     rscid: Option<Vec<u8>>,
845 
846     /// Received address verification token.
847     token: Option<Vec<u8>>,
848 
849     /// Error code to be sent to the peer in CONNECTION_CLOSE.
850     error: Option<u64>,
851 
852     /// Error code to be sent to the peer in APPLICATION_CLOSE.
853     app_error: Option<u64>,
854 
855     /// Error reason to be sent to the peer in APPLICATION_CLOSE.
856     app_reason: Vec<u8>,
857 
858     /// Received path challenge.
859     challenge: Option<Vec<u8>>,
860 
861     /// The connection-level limit at which send blocking occurred.
862     blocked_limit: Option<u64>,
863 
864     /// Idle timeout expiration time.
865     idle_timer: Option<time::Instant>,
866 
867     /// Draining timeout expiration time.
868     draining_timer: Option<time::Instant>,
869 
870     /// Whether this is a server-side connection.
871     is_server: bool,
872 
873     /// Whether the initial secrets have been derived.
874     derived_initial_secrets: bool,
875 
876     /// Whether a version negotiation packet has already been received. Only
877     /// relevant for client connections.
878     did_version_negotiation: bool,
879 
880     /// Whether stateless retry has been performed.
881     did_retry: bool,
882 
883     /// Whether the peer already updated its connection ID.
884     got_peer_conn_id: bool,
885 
886     /// Whether the peer's address has been verified.
887     verified_peer_address: bool,
888 
889     /// Whether the peer has verified our address.
890     peer_verified_address: bool,
891 
892     /// Whether the peer's transport parameters were parsed.
893     parsed_peer_transport_params: bool,
894 
895     /// Whether the HANDSHAKE_DONE has been sent.
896     handshake_done_sent: bool,
897 
898     /// Whether the connection handshake has been confirmed.
899     handshake_confirmed: bool,
900 
901     /// Whether an ack-eliciting packet has been sent since last receiving a
902     /// packet.
903     ack_eliciting_sent: bool,
904 
905     /// Whether the connection is closed.
906     closed: bool,
907 
908     /// Whether to send GREASE.
909     grease: bool,
910 
911     /// TLS keylog writer.
912     keylog: Option<Box<dyn std::io::Write + Send>>,
913 
914     /// Qlog streaming output.
915     #[cfg(feature = "qlog")]
916     qlog_streamer: Option<qlog::QlogStreamer>,
917 
918     /// Whether peer transport parameters were qlogged.
919     #[cfg(feature = "qlog")]
920     qlogged_peer_params: bool,
921 
922     /// DATAGRAM queues.
923     dgram_recv_queue: dgram::DatagramQueue,
924     dgram_send_queue: dgram::DatagramQueue,
925 }
926 
927 /// Creates a new server-side connection.
928 ///
929 /// The `scid` parameter represents the server's source connection ID, while
930 /// the optional `odcid` parameter represents the original destination ID the
931 /// client sent before a stateless retry (this is only required when using
932 /// the [`retry()`] function).
933 ///
934 /// [`retry()`]: fn.retry.html
935 ///
936 /// ## Examples:
937 ///
938 /// ```no_run
939 /// # let mut config = quiche::Config::new(0xbabababa)?;
940 /// # let scid = [0xba; 16];
941 /// let conn = quiche::accept(&scid, None, &mut config)?;
942 /// # Ok::<(), quiche::Error>(())
943 /// ```
accept( scid: &[u8], odcid: Option<&[u8]>, config: &mut Config, ) -> Result<Pin<Box<Connection>>>944 pub fn accept(
945     scid: &[u8], odcid: Option<&[u8]>, config: &mut Config,
946 ) -> Result<Pin<Box<Connection>>> {
947     let conn = Connection::new(scid, odcid, config, true)?;
948 
949     Ok(conn)
950 }
951 
952 /// Creates a new client-side connection.
953 ///
954 /// The `scid` parameter is used as the connection's source connection ID,
955 /// while the optional `server_name` parameter is used to verify the peer's
956 /// certificate.
957 ///
958 /// ## Examples:
959 ///
960 /// ```no_run
961 /// # let mut config = quiche::Config::new(0xbabababa)?;
962 /// # let server_name = "quic.tech";
963 /// # let scid = [0xba; 16];
964 /// let conn = quiche::connect(Some(&server_name), &scid, &mut config)?;
965 /// # Ok::<(), quiche::Error>(())
966 /// ```
connect( server_name: Option<&str>, scid: &[u8], config: &mut Config, ) -> Result<Pin<Box<Connection>>>967 pub fn connect(
968     server_name: Option<&str>, scid: &[u8], config: &mut Config,
969 ) -> Result<Pin<Box<Connection>>> {
970     let conn = Connection::new(scid, None, config, false)?;
971 
972     if let Some(server_name) = server_name {
973         conn.handshake.set_host_name(server_name)?;
974     }
975 
976     Ok(conn)
977 }
978 
979 /// Writes a version negotiation packet.
980 ///
981 /// The `scid` and `dcid` parameters are the source connection ID and the
982 /// destination connection ID extracted from the received client's Initial
983 /// packet that advertises an unsupported version.
984 ///
985 /// ## Examples:
986 ///
987 /// ```no_run
988 /// # let mut buf = [0; 512];
989 /// # let mut out = [0; 512];
990 /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
991 /// let (len, src) = socket.recv_from(&mut buf).unwrap();
992 ///
993 /// let hdr =
994 ///     quiche::Header::from_slice(&mut buf[..len], quiche::MAX_CONN_ID_LEN)?;
995 ///
996 /// if hdr.version != quiche::PROTOCOL_VERSION {
997 ///     let len = quiche::negotiate_version(&hdr.scid, &hdr.dcid, &mut out)?;
998 ///     socket.send_to(&out[..len], &src).unwrap();
999 /// }
1000 /// # Ok::<(), quiche::Error>(())
1001 /// ```
negotiate_version( scid: &[u8], dcid: &[u8], out: &mut [u8], ) -> Result<usize>1002 pub fn negotiate_version(
1003     scid: &[u8], dcid: &[u8], out: &mut [u8],
1004 ) -> Result<usize> {
1005     packet::negotiate_version(scid, dcid, out)
1006 }
1007 
1008 /// Writes a stateless retry packet.
1009 ///
1010 /// The `scid` and `dcid` parameters are the source connection ID and the
1011 /// destination connection ID extracted from the received client's Initial
1012 /// packet, while `new_scid` is the server's new source connection ID and
1013 /// `token` is the address validation token the client needs to echo back.
1014 ///
1015 /// The application is responsible for generating the address validation
1016 /// token to be sent to the client, and verifying tokens sent back by the
1017 /// client. The generated token should include the `dcid` parameter, such
1018 /// that it can be later extracted from the token and passed to the
1019 /// [`accept()`] function as its `odcid` parameter.
1020 ///
1021 /// [`accept()`]: fn.accept.html
1022 ///
1023 /// ## Examples:
1024 ///
1025 /// ```no_run
1026 /// # let mut config = quiche::Config::new(0xbabababa)?;
1027 /// # let mut buf = [0; 512];
1028 /// # let mut out = [0; 512];
1029 /// # let scid = [0xba; 16];
1030 /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
1031 /// # fn mint_token(hdr: &quiche::Header, src: &std::net::SocketAddr) -> Vec<u8> {
1032 /// #     vec![]
1033 /// # }
1034 /// # fn validate_token<'a>(src: &std::net::SocketAddr, token: &'a [u8]) -> Option<&'a [u8]> {
1035 /// #     None
1036 /// # }
1037 /// let (len, src) = socket.recv_from(&mut buf).unwrap();
1038 ///
1039 /// let hdr = quiche::Header::from_slice(&mut buf[..len], quiche::MAX_CONN_ID_LEN)?;
1040 ///
1041 /// let token = hdr.token.as_ref().unwrap();
1042 ///
1043 /// // No token sent by client, create a new one.
1044 /// if token.is_empty() {
1045 ///     let new_token = mint_token(&hdr, &src);
1046 ///
1047 ///     let len = quiche::retry(
1048 ///         &hdr.scid, &hdr.dcid, &scid, &new_token, hdr.version, &mut out,
1049 ///     )?;
1050 ///
1051 ///     socket.send_to(&out[..len], &src).unwrap();
1052 ///     return Ok(());
1053 /// }
1054 ///
1055 /// // Client sent token, validate it.
1056 /// let odcid = validate_token(&src, token);
1057 ///
1058 /// if odcid == None {
1059 ///     // Invalid address validation token.
1060 ///     return Ok(());
1061 /// }
1062 ///
1063 /// let conn = quiche::accept(&scid, odcid, &mut config)?;
1064 /// # Ok::<(), quiche::Error>(())
1065 /// ```
retry( scid: &[u8], dcid: &[u8], new_scid: &[u8], token: &[u8], version: u32, out: &mut [u8], ) -> Result<usize>1066 pub fn retry(
1067     scid: &[u8], dcid: &[u8], new_scid: &[u8], token: &[u8], version: u32,
1068     out: &mut [u8],
1069 ) -> Result<usize> {
1070     packet::retry(scid, dcid, new_scid, token, version, out)
1071 }
1072 
1073 /// Returns true if the given protocol version is supported.
version_is_supported(version: u32) -> bool1074 pub fn version_is_supported(version: u32) -> bool {
1075     matches!(
1076         version,
1077         PROTOCOL_VERSION_DRAFT27 |
1078             PROTOCOL_VERSION_DRAFT28 |
1079             PROTOCOL_VERSION_DRAFT29
1080     )
1081 }
1082 
1083 /// Pushes a frame to the output packet if there is enough space.
1084 ///
1085 /// Returns `true` on success, `false` otherwise. In case of failure it means
1086 /// there is no room to add the frame in the packet. You may retry to add the
1087 /// frame later.
1088 macro_rules! push_frame_to_pkt {
1089     ($frames:expr, $frame:expr, $payload_len: expr, $left:expr) => {{
1090         if $frame.wire_len() <= $left {
1091             $payload_len += $frame.wire_len();
1092             $left -= $frame.wire_len();
1093 
1094             $frames.push($frame);
1095 
1096             true
1097         } else {
1098             false
1099         }
1100     }};
1101 }
1102 
1103 /// Conditional qlog action.
1104 ///
1105 /// Executes the provided body if the qlog feature is enabled and quiche
1106 /// has been condifigured with a log writer.
1107 macro_rules! qlog_with {
1108     ($qlog_streamer:expr, $qlog_streamer_ref:ident, $body:block) => {{
1109         #[cfg(feature = "qlog")]
1110         {
1111             if let Some($qlog_streamer_ref) = &mut $qlog_streamer {
1112                 $body
1113             }
1114         }
1115     }};
1116 }
1117 
1118 impl Connection {
new( scid: &[u8], odcid: Option<&[u8]>, config: &mut Config, is_server: bool, ) -> Result<Pin<Box<Connection>>>1119     fn new(
1120         scid: &[u8], odcid: Option<&[u8]>, config: &mut Config, is_server: bool,
1121     ) -> Result<Pin<Box<Connection>>> {
1122         let tls = config.tls_ctx.new_handshake()?;
1123         Connection::with_tls(scid, odcid, config, tls, is_server)
1124     }
1125 
with_tls( scid: &[u8], odcid: Option<&[u8]>, config: &mut Config, tls: tls::Handshake, is_server: bool, ) -> Result<Pin<Box<Connection>>>1126     fn with_tls(
1127         scid: &[u8], odcid: Option<&[u8]>, config: &mut Config,
1128         tls: tls::Handshake, is_server: bool,
1129     ) -> Result<Pin<Box<Connection>>> {
1130         let max_rx_data = config.local_transport_params.initial_max_data;
1131 
1132         let scid_as_hex: Vec<String> =
1133             scid.iter().map(|b| format!("{:02x}", b)).collect();
1134 
1135         let mut conn = Box::pin(Connection {
1136             version: config.version,
1137 
1138             dcid: Vec::new(),
1139             scid: scid.to_vec(),
1140 
1141             trace_id: scid_as_hex.join(""),
1142 
1143             pkt_num_spaces: [
1144                 packet::PktNumSpace::new(),
1145                 packet::PktNumSpace::new(),
1146                 packet::PktNumSpace::new(),
1147             ],
1148 
1149             peer_transport_params: TransportParams::default(),
1150 
1151             local_transport_params: config.local_transport_params.clone(),
1152 
1153             handshake: tls,
1154 
1155             recovery: recovery::Recovery::new(&config),
1156 
1157             application_protos: config.application_protos.clone(),
1158 
1159             recv_count: 0,
1160             sent_count: 0,
1161 
1162             rx_data: 0,
1163             max_rx_data,
1164             max_rx_data_next: max_rx_data,
1165             almost_full: false,
1166 
1167             tx_data: 0,
1168             max_tx_data: 0,
1169 
1170             max_send_bytes: 0,
1171 
1172             streams: stream::StreamMap::new(
1173                 config.local_transport_params.initial_max_streams_bidi,
1174                 config.local_transport_params.initial_max_streams_uni,
1175             ),
1176 
1177             odcid: None,
1178 
1179             rscid: None,
1180 
1181             token: None,
1182 
1183             error: None,
1184 
1185             app_error: None,
1186             app_reason: Vec::new(),
1187 
1188             challenge: None,
1189 
1190             blocked_limit: None,
1191 
1192             idle_timer: None,
1193 
1194             draining_timer: None,
1195 
1196             is_server,
1197 
1198             derived_initial_secrets: false,
1199 
1200             did_version_negotiation: false,
1201 
1202             did_retry: false,
1203 
1204             got_peer_conn_id: false,
1205 
1206             // If we did stateless retry assume the peer's address is verified.
1207             verified_peer_address: odcid.is_some(),
1208 
1209             // Assume clients validate the server's address implicitly.
1210             peer_verified_address: is_server,
1211 
1212             parsed_peer_transport_params: false,
1213 
1214             handshake_done_sent: false,
1215 
1216             handshake_confirmed: false,
1217 
1218             ack_eliciting_sent: false,
1219 
1220             closed: false,
1221 
1222             grease: config.grease,
1223 
1224             keylog: None,
1225 
1226             #[cfg(feature = "qlog")]
1227             qlog_streamer: None,
1228 
1229             #[cfg(feature = "qlog")]
1230             qlogged_peer_params: false,
1231 
1232             dgram_recv_queue: dgram::DatagramQueue::new(
1233                 config.dgram_recv_max_queue_len,
1234             ),
1235 
1236             dgram_send_queue: dgram::DatagramQueue::new(
1237                 config.dgram_send_max_queue_len,
1238             ),
1239         });
1240 
1241         if let Some(odcid) = odcid {
1242             conn.local_transport_params
1243                 .original_destination_connection_id = Some(odcid.to_vec());
1244 
1245             conn.local_transport_params.retry_source_connection_id =
1246                 Some(scid.to_vec());
1247 
1248             conn.did_retry = true;
1249         }
1250 
1251         conn.local_transport_params.initial_source_connection_id =
1252             Some(scid.to_vec());
1253 
1254         conn.handshake.init(&conn)?;
1255 
1256         conn.encode_transport_params()?;
1257 
1258         // Derive initial secrets for the client. We can do this here because
1259         // we already generated the random destination connection ID.
1260         if !is_server {
1261             let mut dcid = [0; 16];
1262             rand::rand_bytes(&mut dcid[..]);
1263 
1264             let (aead_open, aead_seal) = crypto::derive_initial_key_material(
1265                 &dcid,
1266                 conn.version,
1267                 conn.is_server,
1268             )?;
1269 
1270             conn.dcid.extend_from_slice(&dcid);
1271 
1272             conn.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_open =
1273                 Some(aead_open);
1274             conn.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_seal =
1275                 Some(aead_seal);
1276 
1277             conn.derived_initial_secrets = true;
1278         }
1279 
1280         Ok(conn)
1281     }
1282 
1283     /// Sets keylog output to the designated [`Writer`].
1284     ///
1285     /// This needs to be called as soon as the connection is created, to avoid
1286     /// missing some early logs.
1287     ///
1288     /// [`Writer`]: https://doc.rust-lang.org/std/io/trait.Write.html
set_keylog(&mut self, writer: Box<dyn std::io::Write + Send>)1289     pub fn set_keylog(&mut self, writer: Box<dyn std::io::Write + Send>) {
1290         self.keylog = Some(writer);
1291     }
1292 
1293     /// Sets qlog output to the designated [`Writer`].
1294     ///
1295     /// This needs to be called as soon as the connection is created, to avoid
1296     /// missing some early logs.
1297     ///
1298     /// [`Writer`]: https://doc.rust-lang.org/std/io/trait.Write.html
1299     #[cfg(feature = "qlog")]
set_qlog( &mut self, writer: Box<dyn std::io::Write + Send>, title: String, description: String, )1300     pub fn set_qlog(
1301         &mut self, writer: Box<dyn std::io::Write + Send>, title: String,
1302         description: String,
1303     ) {
1304         let vp = if self.is_server {
1305             qlog::VantagePointType::Server
1306         } else {
1307             qlog::VantagePointType::Client
1308         };
1309 
1310         let trace = qlog::Trace::new(
1311             qlog::VantagePoint {
1312                 name: None,
1313                 ty: vp,
1314                 flow: None,
1315             },
1316             Some(title.to_string()),
1317             Some(description.to_string()),
1318             Some(qlog::Configuration {
1319                 time_offset: Some("0".to_string()),
1320                 time_units: Some(qlog::TimeUnits::Ms),
1321                 original_uris: None,
1322             }),
1323             None,
1324         );
1325 
1326         let mut streamer = qlog::QlogStreamer::new(
1327             qlog::QLOG_VERSION.to_string(),
1328             Some(title),
1329             Some(description),
1330             None,
1331             std::time::Instant::now(),
1332             trace,
1333             writer,
1334         );
1335 
1336         streamer.start_log().ok();
1337 
1338         let ev = self.local_transport_params.to_qlog(
1339             qlog::TransportOwner::Local,
1340             self.version,
1341             self.handshake.alpn_protocol(),
1342             self.handshake.cipher(),
1343         );
1344 
1345         streamer.add_event(ev).ok();
1346 
1347         self.qlog_streamer = Some(streamer);
1348     }
1349 
1350     /// Processes QUIC packets received from the peer.
1351     ///
1352     /// On success the number of bytes processed from the input buffer is
1353     /// returned. On error the connection will be closed by calling [`close()`]
1354     /// with the appropriate error code.
1355     ///
1356     /// Coalesced packets will be processed as necessary.
1357     ///
1358     /// Note that the contents of the input buffer `buf` might be modified by
1359     /// this function due to, for example, in-place decryption.
1360     ///
1361     /// [`close()`]: struct.Connection.html#method.close
1362     ///
1363     /// ## Examples:
1364     ///
1365     /// ```no_run
1366     /// # let mut buf = [0; 512];
1367     /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
1368     /// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
1369     /// # let scid = [0xba; 16];
1370     /// # let mut conn = quiche::accept(&scid, None, &mut config)?;
1371     /// loop {
1372     ///     let read = socket.recv(&mut buf).unwrap();
1373     ///
1374     ///     let read = match conn.recv(&mut buf[..read]) {
1375     ///         Ok(v) => v,
1376     ///
1377     ///         Err(e) => {
1378     ///             // An error occurred, handle it.
1379     ///             break;
1380     ///         },
1381     ///     };
1382     /// }
1383     /// # Ok::<(), quiche::Error>(())
1384     /// ```
recv(&mut self, buf: &mut [u8]) -> Result<usize>1385     pub fn recv(&mut self, buf: &mut [u8]) -> Result<usize> {
1386         let len = buf.len();
1387 
1388         // Keep track of how many bytes we received from the client, so we
1389         // can limit bytes sent back before address validation, to a multiple
1390         // of this. The limit needs to be increased early on, so that if there
1391         // is an error there is enough credit to send a CONNECTION_CLOSE.
1392         //
1393         // It doesn't matter if the packets received were valid or not, we only
1394         // need to track the total amount of bytes received.
1395         if !self.verified_peer_address {
1396             self.max_send_bytes += len * MAX_AMPLIFICATION_FACTOR;
1397         }
1398 
1399         let mut done = 0;
1400         let mut left = len;
1401 
1402         // Process coalesced packets.
1403         while left > 0 {
1404             let read = match self.recv_single(&mut buf[len - left..len]) {
1405                 Ok(v) => v,
1406 
1407                 Err(Error::Done) => left,
1408 
1409                 Err(e) => {
1410                     // In case of error processing the incoming packet, close
1411                     // the connection.
1412                     self.close(false, e.to_wire(), b"").ok();
1413                     return Err(e);
1414                 },
1415             };
1416 
1417             done += read;
1418             left -= read;
1419         }
1420 
1421         Ok(done)
1422     }
1423 
1424     /// Processes a single QUIC packet received from the peer.
1425     ///
1426     /// On success the number of bytes processed from the input buffer is
1427     /// returned. When the [`Done`] error is returned, processing of the
1428     /// remainder of the incoming UDP datagram should be interrupted.
1429     ///
1430     /// On error, an error other than [`Done`] is returned.
1431     ///
1432     /// [`Done`]: enum.Error.html#variant.Done
recv_single(&mut self, buf: &mut [u8]) -> Result<usize>1433     fn recv_single(&mut self, buf: &mut [u8]) -> Result<usize> {
1434         let now = time::Instant::now();
1435 
1436         if buf.is_empty() {
1437             return Err(Error::Done);
1438         }
1439 
1440         if self.is_closed() || self.draining_timer.is_some() {
1441             return Err(Error::Done);
1442         }
1443 
1444         let is_closing = self.error.is_some() || self.app_error.is_some();
1445 
1446         if is_closing {
1447             return Err(Error::Done);
1448         }
1449 
1450         let mut b = octets::OctetsMut::with_slice(buf);
1451 
1452         let mut hdr =
1453             Header::from_bytes(&mut b, self.scid.len()).map_err(|e| {
1454                 drop_pkt_on_err(
1455                     e,
1456                     self.recv_count,
1457                     self.is_server,
1458                     &self.trace_id,
1459                 )
1460             })?;
1461 
1462         if hdr.ty == packet::Type::VersionNegotiation {
1463             // Version negotiation packets can only be sent by the server.
1464             if self.is_server {
1465                 return Err(Error::Done);
1466             }
1467 
1468             // Ignore duplicate version negotiation.
1469             if self.did_version_negotiation {
1470                 return Err(Error::Done);
1471             }
1472 
1473             // Ignore version negotiation if any other packet has already been
1474             // successfully processed.
1475             if self.recv_count > 0 {
1476                 return Err(Error::Done);
1477             }
1478 
1479             if hdr.dcid != self.scid {
1480                 return Err(Error::Done);
1481             }
1482 
1483             if hdr.scid != self.dcid {
1484                 return Err(Error::Done);
1485             }
1486 
1487             trace!("{} rx pkt {:?}", self.trace_id, hdr);
1488 
1489             let versions = hdr.versions.ok_or(Error::Done)?;
1490 
1491             // Ignore version negotiation if the version already selected is
1492             // listed.
1493             if versions.iter().any(|&v| v == self.version) {
1494                 return Err(Error::Done);
1495             }
1496 
1497             match versions.iter().filter(|&&v| version_is_supported(v)).max() {
1498                 Some(v) => self.version = *v,
1499 
1500                 None => {
1501                     // We don't support any of the versions offered.
1502                     //
1503                     // While a man-in-the-middle attacker might be able to
1504                     // inject a version negotiation packet that triggers this
1505                     // failure, the window of opportunity is very small and
1506                     // this error is quite useful for debugging, so don't just
1507                     // ignore the packet.
1508                     return Err(Error::UnknownVersion);
1509                 },
1510             };
1511 
1512             self.did_version_negotiation = true;
1513 
1514             // Derive Initial secrets based on the new version.
1515             let (aead_open, aead_seal) = crypto::derive_initial_key_material(
1516                 &self.dcid,
1517                 self.version,
1518                 self.is_server,
1519             )?;
1520 
1521             // Reset connection state to force sending another Initial packet.
1522             self.drop_epoch_state(packet::EPOCH_INITIAL, now);
1523             self.got_peer_conn_id = false;
1524             self.handshake.clear()?;
1525 
1526             self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_open =
1527                 Some(aead_open);
1528             self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_seal =
1529                 Some(aead_seal);
1530 
1531             // Encode transport parameters again, as the new version might be
1532             // using a different format.
1533             self.encode_transport_params()?;
1534 
1535             return Err(Error::Done);
1536         }
1537 
1538         if hdr.ty == packet::Type::Retry {
1539             // Retry packets can only be sent by the server.
1540             if self.is_server {
1541                 return Err(Error::Done);
1542             }
1543 
1544             // Ignore duplicate retry.
1545             if self.did_retry {
1546                 return Err(Error::Done);
1547             }
1548 
1549             // Check if Retry packet is valid.
1550             if packet::verify_retry_integrity(&b, &self.dcid, self.version)
1551                 .is_err()
1552             {
1553                 return Err(Error::Done);
1554             }
1555 
1556             trace!("{} rx pkt {:?}", self.trace_id, hdr);
1557 
1558             self.token = hdr.token;
1559             self.did_retry = true;
1560 
1561             // Remember peer's new connection ID.
1562             self.odcid = Some(self.dcid.clone());
1563 
1564             self.dcid.resize(hdr.scid.len(), 0);
1565             self.dcid.copy_from_slice(&hdr.scid);
1566 
1567             self.rscid = Some(self.dcid.clone());
1568 
1569             // Derive Initial secrets using the new connection ID.
1570             let (aead_open, aead_seal) = crypto::derive_initial_key_material(
1571                 &hdr.scid,
1572                 self.version,
1573                 self.is_server,
1574             )?;
1575 
1576             // Reset connection state to force sending another Initial packet.
1577             self.drop_epoch_state(packet::EPOCH_INITIAL, now);
1578             self.got_peer_conn_id = false;
1579             self.handshake.clear()?;
1580 
1581             self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_open =
1582                 Some(aead_open);
1583             self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_seal =
1584                 Some(aead_seal);
1585 
1586             return Err(Error::Done);
1587         }
1588 
1589         if self.is_server && !self.did_version_negotiation {
1590             if !version_is_supported(hdr.version) {
1591                 return Err(Error::UnknownVersion);
1592             }
1593 
1594             self.version = hdr.version;
1595             self.did_version_negotiation = true;
1596 
1597             // Encode transport parameters again, as the new version might be
1598             // using a different format.
1599             self.encode_transport_params()?;
1600         }
1601 
1602         if hdr.ty != packet::Type::Short && hdr.version != self.version {
1603             // At this point version negotiation was already performed, so
1604             // ignore packets that don't match the connection's version.
1605             return Err(Error::Done);
1606         }
1607 
1608         // Long header packets have an explicit payload length, but short
1609         // packets don't so just use the remaining capacity in the buffer.
1610         let payload_len = if hdr.ty == packet::Type::Short {
1611             b.cap()
1612         } else {
1613             b.get_varint().map_err(|e| {
1614                 drop_pkt_on_err(
1615                     e.into(),
1616                     self.recv_count,
1617                     self.is_server,
1618                     &self.trace_id,
1619                 )
1620             })? as usize
1621         };
1622 
1623         // Derive initial secrets on the server.
1624         if !self.derived_initial_secrets {
1625             let (aead_open, aead_seal) = crypto::derive_initial_key_material(
1626                 &hdr.dcid,
1627                 self.version,
1628                 self.is_server,
1629             )?;
1630 
1631             self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_open =
1632                 Some(aead_open);
1633             self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_seal =
1634                 Some(aead_seal);
1635 
1636             self.derived_initial_secrets = true;
1637         }
1638 
1639         // Select packet number space epoch based on the received packet's type.
1640         let epoch = hdr.ty.to_epoch()?;
1641 
1642         // Select AEAD context used to open incoming packet.
1643         #[allow(clippy::or_fun_call)]
1644         let aead = (self.pkt_num_spaces[epoch].crypto_0rtt_open.as_ref())
1645             // Only use 0-RTT key if incoming packet is 0-RTT.
1646             .filter(|_| hdr.ty == packet::Type::ZeroRTT)
1647             // Otherwise use the packet number space's main key.
1648             .or(self.pkt_num_spaces[epoch].crypto_open.as_ref())
1649             // Finally, discard packet if no usable key is available.
1650             //
1651             // TODO: buffer 0-RTT/1-RTT packets instead of discarding when the
1652             // required key is not available yet, as an optimization.
1653             .ok_or_else(|| {
1654                 drop_pkt_on_err(
1655                     Error::CryptoFail,
1656                     self.recv_count,
1657                     self.is_server,
1658                     &self.trace_id,
1659                 )
1660             })?;
1661 
1662         let aead_tag_len = aead.alg().tag_len();
1663 
1664         packet::decrypt_hdr(&mut b, &mut hdr, &aead).map_err(|e| {
1665             drop_pkt_on_err(e, self.recv_count, self.is_server, &self.trace_id)
1666         })?;
1667 
1668         let pn = packet::decode_pkt_num(
1669             self.pkt_num_spaces[epoch].largest_rx_pkt_num,
1670             hdr.pkt_num,
1671             hdr.pkt_num_len,
1672         );
1673 
1674         let pn_len = hdr.pkt_num_len;
1675 
1676         trace!(
1677             "{} rx pkt {:?} len={} pn={}",
1678             self.trace_id,
1679             hdr,
1680             payload_len,
1681             pn
1682         );
1683 
1684         qlog_with!(self.qlog_streamer, q, {
1685             let packet_size = b.len();
1686 
1687             let qlog_pkt_hdr = qlog::PacketHeader::with_type(
1688                 hdr.ty.to_qlog(),
1689                 pn,
1690                 Some(packet_size as u64),
1691                 Some(payload_len as u64),
1692                 Some(hdr.version),
1693                 Some(&hdr.scid),
1694                 Some(&hdr.dcid),
1695             );
1696 
1697             q.add_event(qlog::event::Event::packet_received(
1698                 hdr.ty.to_qlog(),
1699                 qlog_pkt_hdr,
1700                 Some(Vec::new()),
1701                 None,
1702                 None,
1703                 None,
1704             ))
1705             .ok();
1706         });
1707 
1708         let mut payload = packet::decrypt_pkt(
1709             &mut b,
1710             pn,
1711             pn_len,
1712             payload_len,
1713             &aead,
1714         )
1715         .map_err(|e| {
1716             drop_pkt_on_err(e, self.recv_count, self.is_server, &self.trace_id)
1717         })?;
1718 
1719         if self.pkt_num_spaces[epoch].recv_pkt_num.contains(pn) {
1720             trace!("{} ignored duplicate packet {}", self.trace_id, pn);
1721             return Err(Error::Done);
1722         }
1723 
1724         if !self.is_server && !self.got_peer_conn_id {
1725             if self.odcid.is_none() {
1726                 self.odcid = Some(self.dcid.clone());
1727             }
1728 
1729             // Replace the randomly generated destination connection ID with
1730             // the one supplied by the server.
1731             self.dcid.resize(hdr.scid.len(), 0);
1732             self.dcid.copy_from_slice(&hdr.scid);
1733 
1734             self.got_peer_conn_id = true;
1735         }
1736 
1737         if self.is_server && !self.got_peer_conn_id {
1738             self.dcid.extend_from_slice(&hdr.scid);
1739 
1740             if !self.did_retry && self.version >= PROTOCOL_VERSION_DRAFT28 {
1741                 self.local_transport_params
1742                     .original_destination_connection_id = Some(hdr.dcid.to_vec());
1743 
1744                 self.encode_transport_params()?;
1745             }
1746 
1747             self.got_peer_conn_id = true;
1748         }
1749 
1750         // To avoid sending an ACK in response to an ACK-only packet, we need
1751         // to keep track of whether this packet contains any frame other than
1752         // ACK and PADDING.
1753         let mut ack_elicited = false;
1754 
1755         // Process packet payload.
1756         while payload.cap() > 0 {
1757             let frame = frame::Frame::from_bytes(&mut payload, hdr.ty)?;
1758 
1759             qlog_with!(self.qlog_streamer, q, {
1760                 q.add_frame(frame.to_qlog(), false).ok();
1761             });
1762 
1763             if frame.ack_eliciting() {
1764                 ack_elicited = true;
1765             }
1766 
1767             if let Err(e) = self.process_frame(frame, epoch, now) {
1768                 qlog_with!(self.qlog_streamer, q, {
1769                     // Always conclude frame writing on error.
1770                     q.finish_frames().ok();
1771                 });
1772 
1773                 return Err(e);
1774             }
1775         }
1776 
1777         qlog_with!(self.qlog_streamer, q, {
1778             // Always conclude frame writing.
1779             q.finish_frames().ok();
1780         });
1781 
1782         qlog_with!(self.qlog_streamer, q, {
1783             let ev = self.recovery.to_qlog();
1784             q.add_event(ev).ok();
1785         });
1786 
1787         // Only log the remote transport parameters once the connection is
1788         // established (i.e. after frames have been fully parsed) and only
1789         // once per connection.
1790         if self.is_established() {
1791             qlog_with!(self.qlog_streamer, q, {
1792                 if !self.qlogged_peer_params {
1793                     let ev = self.peer_transport_params.to_qlog(
1794                         qlog::TransportOwner::Remote,
1795                         self.version,
1796                         self.handshake.alpn_protocol(),
1797                         self.handshake.cipher(),
1798                     );
1799 
1800                     q.add_event(ev).ok();
1801 
1802                     self.qlogged_peer_params = true;
1803                 }
1804             });
1805         }
1806 
1807         // Process acked frames.
1808         for acked in self.recovery.acked[epoch].drain(..) {
1809             match acked {
1810                 frame::Frame::ACK { ranges, .. } => {
1811                     // Stop acknowledging packets less than or equal to the
1812                     // largest acknowledged in the sent ACK frame that, in
1813                     // turn, got acked.
1814                     if let Some(largest_acked) = ranges.last() {
1815                         self.pkt_num_spaces[epoch]
1816                             .recv_pkt_need_ack
1817                             .remove_until(largest_acked);
1818                     }
1819                 },
1820 
1821                 frame::Frame::Crypto { data } => {
1822                     self.pkt_num_spaces[epoch]
1823                         .crypto_stream
1824                         .send
1825                         .ack(data.off(), data.len());
1826                 },
1827 
1828                 frame::Frame::Stream { stream_id, data } => {
1829                     let stream = match self.streams.get_mut(stream_id) {
1830                         Some(v) => v,
1831 
1832                         None => continue,
1833                     };
1834 
1835                     stream.send.ack(data.off(), data.len());
1836 
1837                     if stream.is_complete() {
1838                         let local = stream.local;
1839                         self.streams.collect(stream_id, local);
1840                     }
1841                 },
1842 
1843                 _ => (),
1844             }
1845         }
1846 
1847         // We only record the time of arrival of the largest packet number
1848         // that still needs to be acked, to be used for ACK delay calculation.
1849         if self.pkt_num_spaces[epoch].recv_pkt_need_ack.last() < Some(pn) {
1850             self.pkt_num_spaces[epoch].largest_rx_pkt_time = now;
1851         }
1852 
1853         self.pkt_num_spaces[epoch].recv_pkt_num.insert(pn);
1854 
1855         self.pkt_num_spaces[epoch].recv_pkt_need_ack.push_item(pn);
1856 
1857         self.pkt_num_spaces[epoch].ack_elicited =
1858             cmp::max(self.pkt_num_spaces[epoch].ack_elicited, ack_elicited);
1859 
1860         self.pkt_num_spaces[epoch].largest_rx_pkt_num =
1861             cmp::max(self.pkt_num_spaces[epoch].largest_rx_pkt_num, pn);
1862 
1863         if let Some(idle_timeout) = self.idle_timeout() {
1864             self.idle_timer = Some(now + idle_timeout);
1865         }
1866 
1867         self.recv_count += 1;
1868 
1869         let read = b.off() + aead_tag_len;
1870 
1871         // An Handshake packet has been received from the client and has been
1872         // successfully processed, so we can drop the initial state and consider
1873         // the client's address to be verified.
1874         if self.is_server && hdr.ty == packet::Type::Handshake {
1875             self.drop_epoch_state(packet::EPOCH_INITIAL, now);
1876 
1877             self.verified_peer_address = true;
1878         }
1879 
1880         self.ack_eliciting_sent = false;
1881 
1882         Ok(read)
1883     }
1884 
1885     /// Writes a single QUIC packet to be sent to the peer.
1886     ///
1887     /// On success the number of bytes written to the output buffer is
1888     /// returned, or [`Done`] if there was nothing to write.
1889     ///
1890     /// The application should call `send()` multiple times until [`Done`] is
1891     /// returned, indicating that there are no more packets to send. It is
1892     /// recommended that `send()` be called in the following cases:
1893     ///
1894     ///  * When the application receives QUIC packets from the peer (that is,
1895     ///    any time [`recv()`] is also called).
1896     ///
1897     ///  * When the connection timer expires (that is, any time [`on_timeout()`]
1898     ///    is also called).
1899     ///
1900     ///  * When the application sends data to the peer (for examples, any time
1901     ///    [`stream_send()`] or [`stream_shutdown()`] are called).
1902     ///
1903     /// [`Done`]: enum.Error.html#variant.Done
1904     /// [`recv()`]: struct.Connection.html#method.recv
1905     /// [`on_timeout()`]: struct.Connection.html#method.on_timeout
1906     /// [`stream_send()`]: struct.Connection.html#method.stream_send
1907     /// [`stream_shutdown()`]: struct.Connection.html#method.stream_shutdown
1908     ///
1909     /// ## Examples:
1910     ///
1911     /// ```no_run
1912     /// # let mut out = [0; 512];
1913     /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
1914     /// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
1915     /// # let scid = [0xba; 16];
1916     /// # let mut conn = quiche::accept(&scid, None, &mut config)?;
1917     /// loop {
1918     ///     let write = match conn.send(&mut out) {
1919     ///         Ok(v) => v,
1920     ///
1921     ///         Err(quiche::Error::Done) => {
1922     ///             // Done writing.
1923     ///             break;
1924     ///         },
1925     ///
1926     ///         Err(e) => {
1927     ///             // An error occurred, handle it.
1928     ///             break;
1929     ///         },
1930     ///     };
1931     ///
1932     ///     socket.send(&out[..write]).unwrap();
1933     /// }
1934     /// # Ok::<(), quiche::Error>(())
1935     /// ```
send(&mut self, out: &mut [u8]) -> Result<usize>1936     pub fn send(&mut self, out: &mut [u8]) -> Result<usize> {
1937         let now = time::Instant::now();
1938 
1939         if out.is_empty() {
1940             return Err(Error::BufferTooShort);
1941         }
1942 
1943         if self.is_closed() || self.draining_timer.is_some() {
1944             return Err(Error::Done);
1945         }
1946 
1947         // If the Initial secrets have not been derived yet, there's no point
1948         // in trying to send a packet, so return early.
1949         if !self.derived_initial_secrets {
1950             return Err(Error::Done);
1951         }
1952 
1953         let is_closing = self.error.is_some() || self.app_error.is_some();
1954 
1955         if !is_closing {
1956             self.do_handshake()?;
1957         }
1958 
1959         let mut b = octets::OctetsMut::with_slice(out);
1960 
1961         let epoch = self.write_epoch()?;
1962 
1963         let pkt_type = packet::Type::from_epoch(epoch);
1964 
1965         // Process lost frames.
1966         for lost in self.recovery.lost[epoch].drain(..) {
1967             match lost {
1968                 frame::Frame::Crypto { data } => {
1969                     self.pkt_num_spaces[epoch].crypto_stream.send.push(data)?;
1970                 },
1971 
1972                 frame::Frame::Stream { stream_id, data } => {
1973                     let stream = match self.streams.get_mut(stream_id) {
1974                         Some(v) => v,
1975 
1976                         None => continue,
1977                     };
1978 
1979                     let was_flushable = stream.is_flushable();
1980 
1981                     let empty_fin = data.is_empty() && data.fin();
1982 
1983                     stream.send.push(data)?;
1984 
1985                     // If the stream is now flushable push it to the flushable
1986                     // queue, but only if it wasn't already queued.
1987                     //
1988                     // Consider the stream flushable also when we are sending a
1989                     // zero-length frame that has the fin flag set.
1990                     if (stream.is_flushable() || empty_fin) && !was_flushable {
1991                         let urgency = stream.urgency;
1992                         let incremental = stream.incremental;
1993                         self.streams.push_flushable(
1994                             stream_id,
1995                             urgency,
1996                             incremental,
1997                         );
1998                     }
1999                 },
2000 
2001                 frame::Frame::ACK { .. } => {
2002                     self.pkt_num_spaces[epoch].ack_elicited = true;
2003                 },
2004 
2005                 frame::Frame::HandshakeDone => {
2006                     self.handshake_done_sent = false;
2007                 },
2008 
2009                 frame::Frame::MaxStreamData { stream_id, .. } => {
2010                     if self.streams.get(stream_id).is_some() {
2011                         self.streams.mark_almost_full(stream_id, true);
2012                     }
2013                 },
2014 
2015                 frame::Frame::MaxData { .. } => {
2016                     self.almost_full = true;
2017                 },
2018 
2019                 _ => (),
2020             }
2021         }
2022 
2023         let mut left = b.cap();
2024 
2025         // Limit output packet size to respect peer's max_packet_size limit.
2026         left = cmp::min(left, self.max_send_udp_payload_len());
2027 
2028         // Limit output packet size by congestion window size.
2029         left = cmp::min(left, self.recovery.cwnd_available());
2030 
2031         // Limit data sent by the server based on the amount of data received
2032         // from the client before its address is validated.
2033         if !self.verified_peer_address && self.is_server {
2034             left = cmp::min(left, self.max_send_bytes);
2035         }
2036 
2037         let pn = self.pkt_num_spaces[epoch].next_pkt_num;
2038         let pn_len = packet::pkt_num_len(pn)?;
2039 
2040         // The AEAD overhead at the current encryption level.
2041         let crypto_overhead = self.pkt_num_spaces[epoch]
2042             .crypto_overhead()
2043             .ok_or(Error::Done)?;
2044 
2045         let hdr = Header {
2046             ty: pkt_type,
2047             version: self.version,
2048             dcid: self.dcid.clone(),
2049 
2050             // Don't needlessly clone the source connection ID for 1-RTT packets
2051             // as it is not used.
2052             scid: if pkt_type != packet::Type::Short {
2053                 self.scid.clone()
2054             } else {
2055                 Vec::new()
2056             },
2057 
2058             pkt_num: 0,
2059             pkt_num_len: pn_len,
2060 
2061             // Only clone token for Initial packets, as other packets don't have
2062             // this field (Retry doesn't count, as it's not encoded as part of
2063             // this code path).
2064             token: if pkt_type == packet::Type::Initial {
2065                 self.token.clone()
2066             } else {
2067                 None
2068             },
2069 
2070             versions: None,
2071             key_phase: false,
2072         };
2073 
2074         hdr.to_bytes(&mut b)?;
2075 
2076         // Calculate the space required for the packet, including the header
2077         // the payload length, the packet number and the AEAD overhead.
2078         let mut overhead = b.off() + pn_len + crypto_overhead;
2079 
2080         // We assume that the payload length, which is only present in long
2081         // header packets, can always be encoded with a 2-byte varint.
2082         if pkt_type != packet::Type::Short {
2083             overhead += 2;
2084         }
2085 
2086         // Make sure we have enough space left for the packet.
2087         match left.checked_sub(overhead) {
2088             Some(v) => left = v,
2089 
2090             None => {
2091                 // We can't send more because there isn't enough space available
2092                 // in the output buffer.
2093                 //
2094                 // This usually happens when we try to send a new packet but
2095                 // failed because cwnd is almost full. In such case app_limited
2096                 // is set to false here to make cwnd grow when ACK is received.
2097                 self.recovery.update_app_limited(false);
2098                 return Err(Error::Done);
2099             },
2100         }
2101 
2102         let mut frames: Vec<frame::Frame> = Vec::new();
2103 
2104         let mut ack_eliciting = false;
2105         let mut in_flight = false;
2106         let mut has_data = false;
2107 
2108         let mut payload_len = 0;
2109 
2110         // Create ACK frame.
2111         if self.pkt_num_spaces[epoch].recv_pkt_need_ack.len() > 0 &&
2112             (self.pkt_num_spaces[epoch].ack_elicited ||
2113                 self.recovery.loss_probes[epoch] > 0) &&
2114             !is_closing
2115         {
2116             let ack_delay =
2117                 self.pkt_num_spaces[epoch].largest_rx_pkt_time.elapsed();
2118 
2119             let ack_delay = ack_delay.as_micros() as u64 /
2120                 2_u64
2121                     .pow(self.local_transport_params.ack_delay_exponent as u32);
2122 
2123             let frame = frame::Frame::ACK {
2124                 ack_delay,
2125                 ranges: self.pkt_num_spaces[epoch].recv_pkt_need_ack.clone(),
2126             };
2127 
2128             if push_frame_to_pkt!(frames, frame, payload_len, left) {
2129                 self.pkt_num_spaces[epoch].ack_elicited = false;
2130             }
2131         }
2132 
2133         if pkt_type == packet::Type::Short && !is_closing {
2134             // Create HANDSHAKE_DONE frame.
2135             if self.is_established() &&
2136                 !self.handshake_done_sent &&
2137                 self.is_server
2138             {
2139                 let frame = frame::Frame::HandshakeDone;
2140 
2141                 if push_frame_to_pkt!(frames, frame, payload_len, left) {
2142                     self.handshake_done_sent = true;
2143 
2144                     ack_eliciting = true;
2145                     in_flight = true;
2146                 }
2147             }
2148 
2149             // Create MAX_STREAMS_BIDI frame.
2150             if self.streams.should_update_max_streams_bidi() {
2151                 let frame = frame::Frame::MaxStreamsBidi {
2152                     max: self.streams.max_streams_bidi_next(),
2153                 };
2154 
2155                 if push_frame_to_pkt!(frames, frame, payload_len, left) {
2156                     self.streams.update_max_streams_bidi();
2157 
2158                     ack_eliciting = true;
2159                     in_flight = true;
2160                 }
2161             }
2162 
2163             // Create MAX_STREAMS_UNI frame.
2164             if self.streams.should_update_max_streams_uni() {
2165                 let frame = frame::Frame::MaxStreamsUni {
2166                     max: self.streams.max_streams_uni_next(),
2167                 };
2168 
2169                 if push_frame_to_pkt!(frames, frame, payload_len, left) {
2170                     self.streams.update_max_streams_uni();
2171 
2172                     ack_eliciting = true;
2173                     in_flight = true;
2174                 }
2175             }
2176 
2177             // Create MAX_DATA frame as needed.
2178             if self.almost_full {
2179                 let frame = frame::Frame::MaxData {
2180                     max: self.max_rx_data_next,
2181                 };
2182 
2183                 if push_frame_to_pkt!(frames, frame, payload_len, left) {
2184                     self.almost_full = false;
2185 
2186                     // Commits the new max_rx_data limit.
2187                     self.max_rx_data = self.max_rx_data_next;
2188 
2189                     ack_eliciting = true;
2190                     in_flight = true;
2191                 }
2192             }
2193 
2194             // Create DATA_BLOCKED frame.
2195             if let Some(limit) = self.blocked_limit {
2196                 let frame = frame::Frame::DataBlocked { limit };
2197 
2198                 if push_frame_to_pkt!(frames, frame, payload_len, left) {
2199                     self.blocked_limit = None;
2200 
2201                     ack_eliciting = true;
2202                     in_flight = true;
2203                 }
2204             }
2205 
2206             // Create MAX_STREAM_DATA frames as needed.
2207             for stream_id in self.streams.almost_full() {
2208                 let stream = match self.streams.get_mut(stream_id) {
2209                     Some(v) => v,
2210 
2211                     None => {
2212                         // The stream doesn't exist anymore, so remove it from
2213                         // the almost full set.
2214                         self.streams.mark_almost_full(stream_id, false);
2215                         continue;
2216                     },
2217                 };
2218 
2219                 let frame = frame::Frame::MaxStreamData {
2220                     stream_id,
2221                     max: stream.recv.max_data_next(),
2222                 };
2223 
2224                 if push_frame_to_pkt!(frames, frame, payload_len, left) {
2225                     stream.recv.update_max_data();
2226 
2227                     self.streams.mark_almost_full(stream_id, false);
2228 
2229                     ack_eliciting = true;
2230                     in_flight = true;
2231                 }
2232             }
2233 
2234             // Create STREAM_DATA_BLOCKED frames as needed.
2235             for (stream_id, limit) in self
2236                 .streams
2237                 .blocked()
2238                 .map(|(&k, &v)| (k, v))
2239                 .collect::<Vec<(u64, u64)>>()
2240             {
2241                 let frame = frame::Frame::StreamDataBlocked { stream_id, limit };
2242 
2243                 if push_frame_to_pkt!(frames, frame, payload_len, left) {
2244                     self.streams.mark_blocked(stream_id, false, 0);
2245 
2246                     ack_eliciting = true;
2247                     in_flight = true;
2248                 }
2249             }
2250         }
2251 
2252         // Create CONNECTION_CLOSE frame.
2253         if let Some(err) = self.error {
2254             let frame = frame::Frame::ConnectionClose {
2255                 error_code: err,
2256                 frame_type: 0,
2257                 reason: Vec::new(),
2258             };
2259 
2260             if push_frame_to_pkt!(frames, frame, payload_len, left) {
2261                 self.draining_timer = Some(now + (self.recovery.pto() * 3));
2262 
2263                 ack_eliciting = true;
2264                 in_flight = true;
2265             }
2266         }
2267 
2268         // Create APPLICATION_CLOSE frame.
2269         if let Some(err) = self.app_error {
2270             if pkt_type == packet::Type::Short {
2271                 let frame = frame::Frame::ApplicationClose {
2272                     error_code: err,
2273                     reason: self.app_reason.clone(),
2274                 };
2275 
2276                 if push_frame_to_pkt!(frames, frame, payload_len, left) {
2277                     self.draining_timer = Some(now + (self.recovery.pto() * 3));
2278 
2279                     ack_eliciting = true;
2280                     in_flight = true;
2281                 }
2282             }
2283         }
2284 
2285         // Create PATH_RESPONSE frame.
2286         if let Some(ref challenge) = self.challenge {
2287             let frame = frame::Frame::PathResponse {
2288                 data: challenge.clone(),
2289             };
2290 
2291             if push_frame_to_pkt!(frames, frame, payload_len, left) {
2292                 self.challenge = None;
2293 
2294                 ack_eliciting = true;
2295                 in_flight = true;
2296             }
2297         }
2298 
2299         // Create CRYPTO frame.
2300         if self.pkt_num_spaces[epoch].crypto_stream.is_flushable() &&
2301             left > frame::MAX_CRYPTO_OVERHEAD &&
2302             !is_closing
2303         {
2304             let crypto_len = left - frame::MAX_CRYPTO_OVERHEAD;
2305             let crypto_buf = self.pkt_num_spaces[epoch]
2306                 .crypto_stream
2307                 .send
2308                 .pop(crypto_len)?;
2309 
2310             let frame = frame::Frame::Crypto { data: crypto_buf };
2311 
2312             if push_frame_to_pkt!(frames, frame, payload_len, left) {
2313                 ack_eliciting = true;
2314                 in_flight = true;
2315                 has_data = true;
2316             }
2317         }
2318 
2319         // Create DATAGRAM frame.
2320         if pkt_type == packet::Type::Short &&
2321             left > frame::MAX_DGRAM_OVERHEAD &&
2322             !is_closing
2323         {
2324             if let Some(max_dgram_payload) = self.dgram_max_writable_len() {
2325                 while let Some(len) = self.dgram_send_queue.peek_front_len() {
2326                     if (len + frame::MAX_DGRAM_OVERHEAD) <= left {
2327                         // Front of the queue fits this packet, send it
2328                         match self.dgram_send_queue.pop() {
2329                             Some(data) => {
2330                                 let frame = frame::Frame::Datagram { data };
2331 
2332                                 if push_frame_to_pkt!(
2333                                     frames,
2334                                     frame,
2335                                     payload_len,
2336                                     left
2337                                 ) {
2338                                     ack_eliciting = true;
2339                                     in_flight = true;
2340                                 }
2341                             },
2342 
2343                             None => continue,
2344                         };
2345                     } else if len > max_dgram_payload {
2346                         // This dgram frame will never fit. Let's purge it.
2347                         self.dgram_send_queue.pop();
2348                     } else {
2349                         break;
2350                     }
2351                 }
2352             }
2353         }
2354 
2355         // Create a single STREAM frame for the first stream that is flushable.
2356         if pkt_type == packet::Type::Short &&
2357             left > frame::MAX_STREAM_OVERHEAD &&
2358             !is_closing
2359         {
2360             while let Some(stream_id) = self.streams.pop_flushable() {
2361                 let stream = match self.streams.get_mut(stream_id) {
2362                     Some(v) => v,
2363 
2364                     None => continue,
2365                 };
2366 
2367                 let off = stream.send.off_front();
2368 
2369                 // Try to accurately account for the STREAM frame's overhead,
2370                 // such that we can fill as much of the packet buffer as
2371                 // possible.
2372                 let overhead = 1 +
2373                     octets::varint_len(stream_id) +
2374                     octets::varint_len(off) +
2375                     octets::varint_len(left as u64);
2376 
2377                 let max_len = match left.checked_sub(overhead) {
2378                     Some(v) => v,
2379 
2380                     None => continue,
2381                 };
2382 
2383                 let stream_buf = stream.send.pop(max_len)?;
2384 
2385                 if stream_buf.is_empty() && !stream_buf.fin() {
2386                     continue;
2387                 }
2388 
2389                 let frame = frame::Frame::Stream {
2390                     stream_id,
2391                     data: stream_buf,
2392                 };
2393 
2394                 if push_frame_to_pkt!(frames, frame, payload_len, left) {
2395                     ack_eliciting = true;
2396                     in_flight = true;
2397                     has_data = true;
2398                 }
2399 
2400                 // If the stream is still flushable, push it to the back of the
2401                 // queue again.
2402                 if stream.is_flushable() {
2403                     let urgency = stream.urgency;
2404                     let incremental = stream.incremental;
2405                     self.streams.push_flushable(stream_id, urgency, incremental);
2406                 }
2407 
2408                 // When fuzzing, try to coalesce multiple STREAM frames in the
2409                 // same packet, so it's easier to generate fuzz corpora.
2410                 if cfg!(feature = "fuzzing") && left > frame::MAX_STREAM_OVERHEAD
2411                 {
2412                     continue;
2413                 }
2414 
2415                 break;
2416             }
2417         }
2418 
2419         // Create PING for PTO probe if no other ack-elicitng frame is sent.
2420         if self.recovery.loss_probes[epoch] > 0 &&
2421             !ack_eliciting &&
2422             left >= 1 &&
2423             !is_closing
2424         {
2425             let frame = frame::Frame::Ping;
2426 
2427             if push_frame_to_pkt!(frames, frame, payload_len, left) {
2428                 ack_eliciting = true;
2429                 in_flight = true;
2430             }
2431         }
2432 
2433         if ack_eliciting {
2434             self.recovery.loss_probes[epoch] =
2435                 self.recovery.loss_probes[epoch].saturating_sub(1);
2436         }
2437 
2438         if frames.is_empty() {
2439             // When we reach this point we are not able to write more, so set
2440             // app_limited to false.
2441             self.recovery.update_app_limited(false);
2442             return Err(Error::Done);
2443         }
2444 
2445         // Pad the client's initial packet.
2446         if !self.is_server && pkt_type == packet::Type::Initial {
2447             let pkt_len = pn_len + payload_len + crypto_overhead;
2448 
2449             let frame = frame::Frame::Padding {
2450                 len: cmp::min(MIN_CLIENT_INITIAL_LEN - pkt_len, left),
2451             };
2452 
2453             payload_len += frame.wire_len();
2454 
2455             frames.push(frame);
2456 
2457             in_flight = true;
2458         }
2459 
2460         // Pad payload so that it's always at least 4 bytes.
2461         if payload_len < PAYLOAD_MIN_LEN {
2462             let frame = frame::Frame::Padding {
2463                 len: PAYLOAD_MIN_LEN - payload_len,
2464             };
2465 
2466             payload_len += frame.wire_len();
2467 
2468             frames.push(frame);
2469 
2470             in_flight = true;
2471         }
2472 
2473         payload_len += crypto_overhead;
2474 
2475         // Only long header packets have an explicit length field.
2476         if pkt_type != packet::Type::Short {
2477             let len = pn_len + payload_len;
2478             b.put_varint(len as u64)?;
2479         }
2480 
2481         packet::encode_pkt_num(pn, &mut b)?;
2482 
2483         let payload_offset = b.off();
2484 
2485         trace!(
2486             "{} tx pkt {:?} len={} pn={}",
2487             self.trace_id,
2488             hdr,
2489             payload_len,
2490             pn
2491         );
2492 
2493         qlog_with!(self.qlog_streamer, q, {
2494             let qlog_pkt_hdr = qlog::PacketHeader::with_type(
2495                 hdr.ty.to_qlog(),
2496                 pn,
2497                 Some(payload_len as u64 + payload_offset as u64),
2498                 Some(payload_len as u64),
2499                 Some(hdr.version),
2500                 Some(&hdr.scid),
2501                 Some(&hdr.dcid),
2502             );
2503 
2504             let packet_sent_ev = qlog::event::Event::packet_sent_min(
2505                 hdr.ty.to_qlog(),
2506                 qlog_pkt_hdr,
2507                 Some(Vec::new()),
2508             );
2509 
2510             q.add_event(packet_sent_ev).ok();
2511         });
2512 
2513         // Encode frames into the output packet.
2514         for frame in &mut frames {
2515             trace!("{} tx frm {:?}", self.trace_id, frame);
2516 
2517             frame.to_bytes(&mut b)?;
2518 
2519             qlog_with!(self.qlog_streamer, q, {
2520                 q.add_frame(frame.to_qlog(), false).ok();
2521             });
2522 
2523             // Once frames have been serialized they are passed to the Recovery
2524             // module which manages retransmission. However, some frames do not
2525             // contain retransmittable data, so drop it here.
2526             frame.shrink_for_retransmission();
2527         }
2528 
2529         qlog_with!(self.qlog_streamer, q, {
2530             q.finish_frames().ok();
2531         });
2532 
2533         let aead = match self.pkt_num_spaces[epoch].crypto_seal {
2534             Some(ref v) => v,
2535             None => return Err(Error::InvalidState),
2536         };
2537 
2538         let written = packet::encrypt_pkt(
2539             &mut b,
2540             pn,
2541             pn_len,
2542             payload_len,
2543             payload_offset,
2544             aead,
2545         )?;
2546 
2547         let sent_pkt = recovery::Sent {
2548             pkt_num: pn,
2549             frames,
2550             time_sent: now,
2551             time_acked: None,
2552             time_lost: None,
2553             size: if ack_eliciting { written } else { 0 },
2554             ack_eliciting,
2555             in_flight,
2556             delivered: 0,
2557             delivered_time: now,
2558             recent_delivered_packet_sent_time: now,
2559             is_app_limited: false,
2560             has_data,
2561         };
2562 
2563         self.recovery.on_packet_sent(
2564             sent_pkt,
2565             epoch,
2566             self.handshake_status(),
2567             now,
2568             &self.trace_id,
2569         );
2570 
2571         qlog_with!(self.qlog_streamer, q, {
2572             let ev = self.recovery.to_qlog();
2573             q.add_event(ev).ok();
2574         });
2575 
2576         self.pkt_num_spaces[epoch].next_pkt_num += 1;
2577 
2578         self.sent_count += 1;
2579 
2580         if self.dgram_send_queue.byte_size() > self.recovery.cwnd_available() {
2581             self.recovery.update_app_limited(false);
2582         }
2583 
2584         // On the client, drop initial state after sending an Handshake packet.
2585         if !self.is_server && hdr.ty == packet::Type::Handshake {
2586             self.drop_epoch_state(packet::EPOCH_INITIAL, now);
2587         }
2588 
2589         self.max_send_bytes = self.max_send_bytes.saturating_sub(written);
2590 
2591         // (Re)start the idle timer if we are sending the first ack-eliciting
2592         // packet since last receiving a packet.
2593         if ack_eliciting && !self.ack_eliciting_sent {
2594             if let Some(idle_timeout) = self.idle_timeout() {
2595                 self.idle_timer = Some(now + idle_timeout);
2596             }
2597         }
2598 
2599         if ack_eliciting {
2600             self.ack_eliciting_sent = true;
2601         }
2602 
2603         Ok(written)
2604     }
2605 
2606     // Returns the maximum len of a packet to be sent. This is max_packet_size
2607     // as sent by the peer, except during the handshake when we haven't parsed
2608     // transport parameters yet, so use a default value then.
max_send_udp_payload_len(&self) -> usize2609     fn max_send_udp_payload_len(&self) -> usize {
2610         if self.is_established() {
2611             // We cap the maximum packet size to 16KB or so, so that it can be
2612             // always encoded with a 2-byte varint.
2613             cmp::min(16383, self.peer_transport_params.max_udp_payload_size)
2614                 as usize
2615         } else {
2616             // Allow for 1200 bytes (minimum QUIC packet size) during the
2617             // handshake.
2618             MIN_CLIENT_INITIAL_LEN
2619         }
2620     }
2621 
2622     /// Reads contiguous data from a stream into the provided slice.
2623     ///
2624     /// The slice must be sized by the caller and will be populated up to its
2625     /// capacity.
2626     ///
2627     /// On success the amount of bytes read and a flag indicating the fin state
2628     /// is returned as a tuple, or [`Done`] if there is no data to read.
2629     ///
2630     /// [`Done`]: enum.Error.html#variant.Done
2631     ///
2632     /// ## Examples:
2633     ///
2634     /// ```no_run
2635     /// # let mut buf = [0; 512];
2636     /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
2637     /// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
2638     /// # let scid = [0xba; 16];
2639     /// # let mut conn = quiche::accept(&scid, None, &mut config)?;
2640     /// # let stream_id = 0;
2641     /// while let Ok((read, fin)) = conn.stream_recv(stream_id, &mut buf) {
2642     ///     println!("Got {} bytes on stream {}", read, stream_id);
2643     /// }
2644     /// # Ok::<(), quiche::Error>(())
2645     /// ```
stream_recv( &mut self, stream_id: u64, out: &mut [u8], ) -> Result<(usize, bool)>2646     pub fn stream_recv(
2647         &mut self, stream_id: u64, out: &mut [u8],
2648     ) -> Result<(usize, bool)> {
2649         // We can't read on our own unidirectional streams.
2650         if !stream::is_bidi(stream_id) &&
2651             stream::is_local(stream_id, self.is_server)
2652         {
2653             return Err(Error::InvalidStreamState);
2654         }
2655 
2656         let stream = self
2657             .streams
2658             .get_mut(stream_id)
2659             .ok_or(Error::InvalidStreamState)?;
2660 
2661         if !stream.is_readable() {
2662             return Err(Error::Done);
2663         }
2664 
2665         #[cfg(feature = "qlog")]
2666         let offset = stream.recv.off_front();
2667 
2668         let (read, fin) = stream.recv.pop(out)?;
2669 
2670         self.max_rx_data_next = self.max_rx_data_next.saturating_add(read as u64);
2671 
2672         let readable = stream.is_readable();
2673 
2674         let complete = stream.is_complete();
2675 
2676         let local = stream.local;
2677 
2678         if stream.recv.almost_full() {
2679             self.streams.mark_almost_full(stream_id, true);
2680         }
2681 
2682         if !readable {
2683             self.streams.mark_readable(stream_id, false);
2684         }
2685 
2686         if complete {
2687             self.streams.collect(stream_id, local);
2688         }
2689 
2690         qlog_with!(self.qlog_streamer, q, {
2691             let ev = qlog::event::Event::h3_data_moved(
2692                 stream_id.to_string(),
2693                 Some(offset.to_string()),
2694                 Some(read as u64),
2695                 Some(qlog::H3DataRecipient::Transport),
2696                 None,
2697                 None,
2698             );
2699             q.add_event(ev).ok();
2700         });
2701 
2702         if self.should_update_max_data() {
2703             self.almost_full = true;
2704         }
2705 
2706         Ok((read, fin))
2707     }
2708 
2709     /// Writes data to a stream.
2710     ///
2711     /// On success the number of bytes written is returned, or [`Done`] if no
2712     /// data was written (e.g. because the stream has no capacity).
2713     ///
2714     /// Note that in order to avoid buffering an infinite amount of data in the
2715     /// stream's send buffer, streams are only allowed to buffer outgoing data
2716     /// up to the amount that the peer allows it to send (that is, up to the
2717     /// stream's outgoing flow control capacity).
2718     ///
2719     /// This means that the number of written bytes returned can be lower than
2720     /// the length of the input buffer when the stream doesn't have enough
2721     /// capacity for the operation to complete. The application should retry the
2722     /// operation once the stream is reported as writable again.
2723     ///
2724     /// Applications should call this method only after the handshake is
2725     /// completed (whenever [`is_established()`] returns `true`) or during
2726     /// early data if enabled (whenever [`is_in_early_data()`] returns `true`).
2727     ///
2728     /// [`Done`]: enum.Error.html#variant.Done
2729     /// [`is_established()`]: struct.Connection.html#method.is_established
2730     /// [`is_in_early_data()`]: struct.Connection.html#method.is_in_early_data
2731     ///
2732     /// ## Examples:
2733     ///
2734     /// ```no_run
2735     /// # let mut buf = [0; 512];
2736     /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
2737     /// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
2738     /// # let scid = [0xba; 16];
2739     /// # let mut conn = quiche::accept(&scid, None, &mut config)?;
2740     /// # let stream_id = 0;
2741     /// conn.stream_send(stream_id, b"hello", true)?;
2742     /// # Ok::<(), quiche::Error>(())
2743     /// ```
stream_send( &mut self, stream_id: u64, buf: &[u8], fin: bool, ) -> Result<usize>2744     pub fn stream_send(
2745         &mut self, stream_id: u64, buf: &[u8], fin: bool,
2746     ) -> Result<usize> {
2747         // We can't write on the peer's unidirectional streams.
2748         if !stream::is_bidi(stream_id) &&
2749             !stream::is_local(stream_id, self.is_server)
2750         {
2751             return Err(Error::InvalidStreamState);
2752         }
2753 
2754         // Mark the connection as blocked if the connection-level flow control
2755         // limit doesn't let us buffer all the data.
2756         //
2757         // Note that this is separate from "send capacity" as that also takes
2758         // congestion control into consideration.
2759         if self.max_tx_data - self.tx_data < buf.len() as u64 {
2760             self.blocked_limit = Some(self.max_tx_data);
2761         }
2762 
2763         // Truncate the input buffer based on the connection's send capacity if
2764         // necessary.
2765         let cap = self.send_capacity();
2766 
2767         let (buf, fin) = if cap < buf.len() {
2768             (&buf[..cap], false)
2769         } else {
2770             (buf, fin)
2771         };
2772 
2773         // Get existing stream or create a new one.
2774         let stream = self.get_or_create_stream(stream_id, true)?;
2775 
2776         #[cfg(feature = "qlog")]
2777         let offset = stream.send.off_back();
2778 
2779         let was_flushable = stream.is_flushable();
2780 
2781         let sent = stream.send.push_slice(buf, fin)?;
2782 
2783         let urgency = stream.urgency;
2784         let incremental = stream.incremental;
2785 
2786         let flushable = stream.is_flushable();
2787 
2788         let writable = stream.is_writable();
2789 
2790         let empty_fin = buf.is_empty() && fin;
2791 
2792         if sent < buf.len() {
2793             let max_off = stream.send.max_off();
2794 
2795             self.streams.mark_blocked(stream_id, true, max_off);
2796         } else {
2797             self.streams.mark_blocked(stream_id, false, 0);
2798         }
2799 
2800         // If the stream is now flushable push it to the flushable queue, but
2801         // only if it wasn't already queued.
2802         //
2803         // Consider the stream flushable also when we are sending a zero-length
2804         // frame that has the fin flag set.
2805         if (flushable || empty_fin) && !was_flushable {
2806             self.streams.push_flushable(stream_id, urgency, incremental);
2807         }
2808 
2809         if !writable {
2810             self.streams.mark_writable(stream_id, false);
2811         }
2812 
2813         self.tx_data += sent as u64;
2814 
2815         self.recovery.rate_check_app_limited();
2816 
2817         qlog_with!(self.qlog_streamer, q, {
2818             let ev = qlog::event::Event::h3_data_moved(
2819                 stream_id.to_string(),
2820                 Some(offset.to_string()),
2821                 Some(sent as u64),
2822                 None,
2823                 Some(qlog::H3DataRecipient::Transport),
2824                 None,
2825             );
2826             q.add_event(ev).ok();
2827         });
2828 
2829         Ok(sent)
2830     }
2831 
2832     /// Sets the priority for a stream.
2833     ///
2834     /// A stream's priority determines the order in which stream data is sent
2835     /// on the wire (streams with lower priority are sent first). Streams are
2836     /// created with a default priority of `127`.
2837     ///
2838     /// The target stream is created if it did not exist before calling this
2839     /// method.
stream_priority( &mut self, stream_id: u64, urgency: u8, incremental: bool, ) -> Result<()>2840     pub fn stream_priority(
2841         &mut self, stream_id: u64, urgency: u8, incremental: bool,
2842     ) -> Result<()> {
2843         // Get existing stream or create a new one, but if the stream
2844         // has already been closed and collected, ignore the prioritization.
2845         let stream = match self.get_or_create_stream(stream_id, true) {
2846             Ok(v) => v,
2847 
2848             Err(Error::Done) => return Ok(()),
2849 
2850             Err(e) => return Err(e),
2851         };
2852 
2853         if stream.urgency == urgency && stream.incremental == incremental {
2854             return Ok(());
2855         }
2856 
2857         stream.urgency = urgency;
2858         stream.incremental = incremental;
2859 
2860         // TODO: reprioritization
2861 
2862         Ok(())
2863     }
2864 
2865     /// Shuts down reading or writing from/to the specified stream.
2866     ///
2867     /// When the `direction` argument is set to [`Shutdown::Read`], outstanding
2868     /// data in the stream's receive buffer is dropped, and no additional data
2869     /// is added to it. Data received after calling this method is still
2870     /// validated and acked but not stored, and [`stream_recv()`] will not
2871     /// return it to the application.
2872     ///
2873     /// When the `direction` argument is set to [`Shutdown::Write`], outstanding
2874     /// data in the stream's send buffer is dropped, and no additional data
2875     /// is added to it. Data passed to [`stream_send()`] after calling this
2876     /// method will be ignored.
2877     ///
2878     /// [`Shutdown::Read`]: enum.Shutdown.html#variant.Read
2879     /// [`Shutdown::Write`]: enum.Shutdown.html#variant.Write
2880     /// [`stream_recv()`]: struct.Connection.html#method.stream_recv
2881     /// [`stream_send()`]: struct.Connection.html#method.stream_send
stream_shutdown( &mut self, stream_id: u64, direction: Shutdown, _err: u64, ) -> Result<()>2882     pub fn stream_shutdown(
2883         &mut self, stream_id: u64, direction: Shutdown, _err: u64,
2884     ) -> Result<()> {
2885         // Get existing stream.
2886         let stream = self.streams.get_mut(stream_id).ok_or(Error::Done)?;
2887 
2888         match direction {
2889             // TODO: send STOP_SENDING
2890             Shutdown::Read => {
2891                 stream.recv.shutdown()?;
2892 
2893                 // Once shutdown, the stream is guaranteed to be non-readable.
2894                 self.streams.mark_readable(stream_id, false);
2895             },
2896 
2897             // TODO: send RESET_STREAM
2898             Shutdown::Write => {
2899                 stream.send.shutdown()?;
2900 
2901                 // Once shutdown, the stream is guaranteed to be non-writable.
2902                 self.streams.mark_writable(stream_id, false);
2903             },
2904         }
2905 
2906         Ok(())
2907     }
2908 
2909     /// Returns the stream's send capacity in bytes.
stream_capacity(&self, stream_id: u64) -> Result<usize>2910     pub fn stream_capacity(&self, stream_id: u64) -> Result<usize> {
2911         if let Some(stream) = self.streams.get(stream_id) {
2912             let cap = cmp::min(self.send_capacity(), stream.send.cap());
2913             return Ok(cap);
2914         };
2915 
2916         Err(Error::InvalidStreamState)
2917     }
2918 
2919     /// Returns true if all the data has been read from the specified stream.
2920     ///
2921     /// This instructs the application that all the data received from the
2922     /// peer on the stream has been read, and there won't be anymore in the
2923     /// future.
2924     ///
2925     /// Basically this returns true when the peer either set the `fin` flag
2926     /// for the stream, or sent `RESET_STREAM`.
stream_finished(&self, stream_id: u64) -> bool2927     pub fn stream_finished(&self, stream_id: u64) -> bool {
2928         let stream = match self.streams.get(stream_id) {
2929             Some(v) => v,
2930 
2931             None => return true,
2932         };
2933 
2934         stream.recv.is_fin()
2935     }
2936 
2937     /// Initializes the stream's application data.
2938     ///
2939     /// This can be used by applications to store per-stream information without
2940     /// having to maintain their own stream map.
2941     ///
2942     /// Stream data can only be initialized once. Additional calls to this
2943     /// method will return [`Done`].
2944     ///
2945     /// [`Done`]: enum.Error.html#variant.Done
stream_init_application_data<T>( &mut self, stream_id: u64, data: T, ) -> Result<()> where T: std::any::Any + Send,2946     pub fn stream_init_application_data<T>(
2947         &mut self, stream_id: u64, data: T,
2948     ) -> Result<()>
2949     where
2950         T: std::any::Any + Send,
2951     {
2952         // Get existing stream.
2953         let stream = self.streams.get_mut(stream_id).ok_or(Error::Done)?;
2954 
2955         if stream.data.is_some() {
2956             return Err(Error::Done);
2957         }
2958 
2959         stream.data = Some(Box::new(data));
2960 
2961         Ok(())
2962     }
2963 
2964     /// Returns the stream's application data, if any was initialized.
2965     ///
2966     /// This returns a reference to the application data that was initialized
2967     /// by calling [`stream_init_application_data()`].
2968     ///
2969     /// [`stream_init_application_data()`]:
2970     /// struct.Connection.html#method.stream_init_application_data
stream_application_data( &mut self, stream_id: u64, ) -> Option<&mut dyn std::any::Any>2971     pub fn stream_application_data(
2972         &mut self, stream_id: u64,
2973     ) -> Option<&mut dyn std::any::Any> {
2974         // Get existing stream.
2975         let stream = self.streams.get_mut(stream_id)?;
2976 
2977         if let Some(ref mut stream_data) = stream.data {
2978             return Some(stream_data.as_mut());
2979         }
2980 
2981         None
2982     }
2983 
2984     /// Returns an iterator over streams that have outstanding data to read.
2985     ///
2986     /// Note that the iterator will only include streams that were readable at
2987     /// the time the iterator itself was created (i.e. when `readable()` was
2988     /// called). To account for newly readable streams, the iterator needs to
2989     /// be created again.
2990     ///
2991     /// ## Examples:
2992     ///
2993     /// ```no_run
2994     /// # let mut buf = [0; 512];
2995     /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
2996     /// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
2997     /// # let scid = [0xba; 16];
2998     /// # let mut conn = quiche::accept(&scid, None, &mut config)?;
2999     /// // Iterate over readable streams.
3000     /// for stream_id in conn.readable() {
3001     ///     // Stream is readable, read until there's no more data.
3002     ///     while let Ok((read, fin)) = conn.stream_recv(stream_id, &mut buf) {
3003     ///         println!("Got {} bytes on stream {}", read, stream_id);
3004     ///     }
3005     /// }
3006     /// # Ok::<(), quiche::Error>(())
3007     /// ```
readable(&self) -> StreamIter3008     pub fn readable(&self) -> StreamIter {
3009         self.streams.readable()
3010     }
3011 
3012     /// Returns an iterator over streams that can be written to.
3013     ///
3014     /// A "writable" stream is a stream that has enough flow control capacity to
3015     /// send data to the peer. To avoid buffering an infinite amount of data,
3016     /// streams are only allowed to buffer outgoing data up to the amount that
3017     /// the peer allows to send.
3018     ///
3019     /// Note that the iterator will only include streams that were writable at
3020     /// the time the iterator itself was created (i.e. when `writable()` was
3021     /// called). To account for newly writable streams, the iterator needs to
3022     /// be created again.
3023     ///
3024     /// ## Examples:
3025     ///
3026     /// ```no_run
3027     /// # let mut buf = [0; 512];
3028     /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
3029     /// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
3030     /// # let scid = [0xba; 16];
3031     /// # let mut conn = quiche::accept(&scid, None, &mut config)?;
3032     /// // Iterate over writable streams.
3033     /// for stream_id in conn.writable() {
3034     ///     // Stream is writable, write some data.
3035     ///     if let Ok(written) = conn.stream_send(stream_id, &buf, false) {
3036     ///         println!("Written {} bytes on stream {}", written, stream_id);
3037     ///     }
3038     /// }
3039     /// # Ok::<(), quiche::Error>(())
3040     /// ```
writable(&self) -> StreamIter3041     pub fn writable(&self) -> StreamIter {
3042         // If there is not enough connection-level send capacity, none of the
3043         // streams are writable, so return an empty iterator.
3044         if self.send_capacity() == 0 {
3045             return StreamIter::default();
3046         }
3047 
3048         self.streams.writable()
3049     }
3050 
3051     /// Reads the first received DATAGRAM.
3052     ///
3053     /// On success the DATAGRAM's data is returned along with its size.
3054     ///
3055     /// [`Done`] is returned if there is no data to read.
3056     ///
3057     /// [`BufferTooShort`] is returned if the provided buffer is too small for
3058     /// the DATAGRAM.
3059     ///
3060     /// [`Done`]: enum.Error.html#variant.Done
3061     /// [`BufferTooShort`]: enum.Error.html#variant.BufferTooShort
3062     ///
3063     /// ## Examples:
3064     ///
3065     /// ```no_run
3066     /// # let mut buf = [0; 512];
3067     /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
3068     /// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
3069     /// # let scid = [0xba; 16];
3070     /// # let mut conn = quiche::accept(&scid, None, &mut config)?;
3071     /// let mut dgram_buf = [0; 512];
3072     /// while let Ok((len)) = conn.dgram_recv(&mut dgram_buf) {
3073     ///     println!("Got {} bytes of DATAGRAM", len);
3074     /// }
3075     /// # Ok::<(), quiche::Error>(())
3076     /// ```
dgram_recv(&mut self, buf: &mut [u8]) -> Result<usize>3077     pub fn dgram_recv(&mut self, buf: &mut [u8]) -> Result<usize> {
3078         match self.dgram_recv_queue.pop() {
3079             Some(d) => {
3080                 if d.len() > buf.len() {
3081                     return Err(Error::BufferTooShort);
3082                 }
3083 
3084                 buf[..d.len()].copy_from_slice(&d);
3085                 Ok(d.len())
3086             },
3087 
3088             None => Err(Error::Done),
3089         }
3090     }
3091 
3092     /// Reads the first received DATAGRAM without removing it from the queue.
3093     ///
3094     /// On success the DATAGRAM's data is returned along with the actual number
3095     /// of bytes peeked. The requested length cannot exceed the DATAGRAM's
3096     /// actual length.
3097     ///
3098     /// [`Done`] is returned if there is no data to read.
3099     ///
3100     /// [`BufferTooShort`] is returned if the provided buffer is smaller the
3101     /// number of bytes to peek.
3102     ///
3103     /// [`Done`]: enum.Error.html#variant.Done
3104     /// [`BufferTooShort`]: enum.Error.html#variant.BufferTooShort
dgram_recv_peek(&self, buf: &mut [u8], len: usize) -> Result<usize>3105     pub fn dgram_recv_peek(&self, buf: &mut [u8], len: usize) -> Result<usize> {
3106         self.dgram_recv_queue.peek_front_bytes(buf, len)
3107     }
3108 
3109     /// Returns the length of the first stored DATAGRAM.
dgram_recv_front_len(&self) -> Option<usize>3110     pub fn dgram_recv_front_len(&self) -> Option<usize> {
3111         self.dgram_recv_queue.peek_front_len()
3112     }
3113 
3114     /// Sends data in a DATAGRAM frame.
3115     ///
3116     /// [`Done`] is returned if no data was written.
3117     /// [`InvalidState`] is returned if the peer does not support DATAGRAM.
3118     /// [`BufferTooShort`] is returned if the DATAGRAM frame length is larger
3119     /// than peer's supported DATAGRAM frame length. Use
3120     /// [`dgram_max_writable_len()`] to get the largest supported DATAGRAM
3121     /// frame length.
3122     ///
3123     /// Note that there is no flow control of DATAGRAM frames, so in order to
3124     /// avoid buffering an infinite amount of frames we apply an internal
3125     /// limit.
3126     ///
3127     /// [`Done`]: enum.Error.html#variant.Done
3128     /// [`InvalidState`]: enum.Error.html#variant.InvalidState
3129     /// [`BufferTooShort`]: enum.Error.html#variant.BufferTooShort
3130     /// [`dgram_max_writable_len()`]:
3131     /// struct.Connection.html#method.dgram_max_writable_len
3132     ///
3133     /// ## Examples:
3134     ///
3135     /// ```no_run
3136     /// # let mut buf = [0; 512];
3137     /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
3138     /// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
3139     /// # let scid = [0xba; 16];
3140     /// # let mut conn = quiche::accept(&scid, None, &mut config)?;
3141     /// conn.dgram_send(b"hello")?;
3142     /// # Ok::<(), quiche::Error>(())
3143     /// ```
dgram_send(&mut self, buf: &[u8]) -> Result<()>3144     pub fn dgram_send(&mut self, buf: &[u8]) -> Result<()> {
3145         let max_payload_len = match self.dgram_max_writable_len() {
3146             Some(v) => v as usize,
3147             None => {
3148                 return Err(Error::InvalidState);
3149             },
3150         };
3151 
3152         if buf.len() > max_payload_len {
3153             return Err(Error::BufferTooShort);
3154         }
3155 
3156         self.dgram_send_queue.push(buf)?;
3157 
3158         if self.dgram_send_queue.byte_size() > self.recovery.cwnd_available() {
3159             self.recovery.update_app_limited(false);
3160         }
3161 
3162         Ok(())
3163     }
3164 
3165     /// Purges queued outgoing DATAGRAMs matching the predicate.
3166     ///
3167     /// In other words, remove all elements `e` such that `f(&e)` returns true.
3168     ///
3169     /// ## Examples:
3170     /// ```no_run
3171     /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
3172     /// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
3173     /// # let scid = [0xba; 16];
3174     /// # let mut conn = quiche::accept(&scid, None, &mut config)?;
3175     /// conn.dgram_send(b"hello")?;
3176     /// conn.dgram_purge_outgoing(&|d: &[u8]| -> bool { d[0] == 0 });
3177     /// # Ok::<(), quiche::Error>(())
3178     /// ```
dgram_purge_outgoing<F: Fn(&[u8]) -> bool>(&mut self, f: F)3179     pub fn dgram_purge_outgoing<F: Fn(&[u8]) -> bool>(&mut self, f: F) {
3180         self.dgram_send_queue.purge(f);
3181     }
3182 
3183     /// Returns the maximum DATAGRAM payload that can be sent.
3184     ///
3185     /// [`None`] is returned if the peer hasn't advertised a maximum DATAGRAM
3186     /// frame size.
3187     ///
3188     /// ## Examples:
3189     ///
3190     /// ```no_run
3191     /// # let mut buf = [0; 512];
3192     /// # let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
3193     /// # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
3194     /// # let scid = [0xba; 16];
3195     /// # let mut conn = quiche::accept(&scid, None, &mut config)?;
3196     /// if let Some(payload_size) = conn.dgram_max_writable_len() {
3197     ///     if payload_size > 5 {
3198     ///         conn.dgram_send(b"hello")?;
3199     ///     }
3200     /// }
3201     /// # Ok::<(), quiche::Error>(())
3202     /// ```
dgram_max_writable_len(&self) -> Option<usize>3203     pub fn dgram_max_writable_len(&self) -> Option<usize> {
3204         match self.peer_transport_params.max_datagram_frame_size {
3205             None => None,
3206             Some(peer_frame_len) => {
3207                 // Start from the maximum packet size...
3208                 let mut max_len = self.max_send_udp_payload_len();
3209                 // ...subtract the Short packet header overhead...
3210                 // (1 byte of pkt_len + len of dcid)
3211                 max_len = max_len.saturating_sub(1 + self.dcid.len());
3212                 // ...subtract the packet number (max len)...
3213                 max_len = max_len.saturating_sub(packet::MAX_PKT_NUM_LEN);
3214                 // ...subtract the crypto overhead...
3215                 max_len = max_len.saturating_sub(
3216                     self.pkt_num_spaces[packet::EPOCH_APPLICATION]
3217                         .crypto_overhead()?,
3218                 );
3219                 // ...clamp to what peer can support...
3220                 max_len = cmp::min(peer_frame_len as usize, max_len);
3221                 // ...subtract frame overhead, checked for underflow.
3222                 max_len.checked_sub(frame::MAX_DGRAM_OVERHEAD)
3223             },
3224         }
3225     }
3226 
dgram_enabled(&self) -> bool3227     fn dgram_enabled(&self) -> bool {
3228         self.local_transport_params
3229             .max_datagram_frame_size
3230             .is_none()
3231     }
3232 
3233     /// Returns the amount of time until the next timeout event.
3234     ///
3235     /// Once the given duration has elapsed, the [`on_timeout()`] method should
3236     /// be called. A timeout of `None` means that the timer should be disarmed.
3237     ///
3238     /// [`on_timeout()`]: struct.Connection.html#method.on_timeout
timeout(&self) -> Option<time::Duration>3239     pub fn timeout(&self) -> Option<time::Duration> {
3240         if self.is_closed() {
3241             return None;
3242         }
3243 
3244         let timeout = if self.draining_timer.is_some() {
3245             // Draining timer takes precedence over all other timers. If it is
3246             // set it means the connection is closing so there's no point in
3247             // processing the other timers.
3248             self.draining_timer
3249         } else {
3250             // Use the lowest timer value (i.e. "sooner") among idle and loss
3251             // detection timers. If they are both unset (i.e. `None`) then the
3252             // result is `None`, but if at least one of them is set then a
3253             // `Some(...)` value is returned.
3254             let timers = [self.idle_timer, self.recovery.loss_detection_timer()];
3255 
3256             timers.iter().filter_map(|&x| x).min()
3257         };
3258 
3259         if let Some(timeout) = timeout {
3260             let now = time::Instant::now();
3261 
3262             if timeout <= now {
3263                 return Some(time::Duration::new(0, 0));
3264             }
3265 
3266             return Some(timeout.duration_since(now));
3267         }
3268 
3269         None
3270     }
3271 
3272     /// Processes a timeout event.
3273     ///
3274     /// If no timeout has occurred it does nothing.
on_timeout(&mut self)3275     pub fn on_timeout(&mut self) {
3276         let now = time::Instant::now();
3277 
3278         if let Some(draining_timer) = self.draining_timer {
3279             if draining_timer <= now {
3280                 trace!("{} draining timeout expired", self.trace_id);
3281 
3282                 qlog_with!(self.qlog_streamer, q, {
3283                     q.finish_log().ok();
3284                 });
3285 
3286                 self.closed = true;
3287             }
3288 
3289             // Draining timer takes precedence over all other timers. If it is
3290             // set it means the connection is closing so there's no point in
3291             // processing the other timers.
3292             return;
3293         }
3294 
3295         if let Some(timer) = self.idle_timer {
3296             if timer <= now {
3297                 trace!("{} idle timeout expired", self.trace_id);
3298 
3299                 qlog_with!(self.qlog_streamer, q, {
3300                     q.finish_log().ok();
3301                 });
3302 
3303                 self.closed = true;
3304                 return;
3305             }
3306         }
3307 
3308         if let Some(timer) = self.recovery.loss_detection_timer() {
3309             if timer <= now {
3310                 trace!("{} loss detection timeout expired", self.trace_id);
3311 
3312                 self.recovery.on_loss_detection_timeout(
3313                     self.handshake_status(),
3314                     now,
3315                     &self.trace_id,
3316                 );
3317 
3318                 qlog_with!(self.qlog_streamer, q, {
3319                     let ev = self.recovery.to_qlog();
3320                     q.add_event(ev).ok();
3321                 });
3322 
3323                 return;
3324             }
3325         }
3326     }
3327 
3328     /// Closes the connection with the given error and reason.
3329     ///
3330     /// The `app` parameter specifies whether an application close should be
3331     /// sent to the peer. Otherwise a normal connection close is sent.
3332     ///
3333     /// Returns [`Done`] if the connection had already been closed.
3334     ///
3335     /// Note that the connection will not be closed immediately. An application
3336     /// should continue calling [`recv()`], [`send()`] and [`timeout()`] as
3337     /// normal, until the [`is_closed()`] method returns `true`.
3338     ///
3339     /// [`Done`]: enum.Error.html#variant.Done
3340     /// [`recv()`]: struct.Connection.html#method.recv
3341     /// [`send()`]: struct.Connection.html#method.send
3342     /// [`timeout()`]: struct.Connection.html#method.timeout
3343     /// [`is_closed()`]: struct.Connection.html#method.is_closed
close(&mut self, app: bool, err: u64, reason: &[u8]) -> Result<()>3344     pub fn close(&mut self, app: bool, err: u64, reason: &[u8]) -> Result<()> {
3345         if self.is_closed() || self.draining_timer.is_some() {
3346             return Err(Error::Done);
3347         }
3348 
3349         if self.error.is_some() || self.app_error.is_some() {
3350             return Err(Error::Done);
3351         }
3352 
3353         if app {
3354             self.app_error = Some(err);
3355             self.app_reason.extend_from_slice(reason);
3356         } else {
3357             self.error = Some(err);
3358         }
3359 
3360         // When no packet was successfully processed close connection immediately.
3361         if self.recv_count == 0 {
3362             self.closed = true;
3363         }
3364 
3365         Ok(())
3366     }
3367 
3368     /// Returns a string uniquely representing the connection.
3369     ///
3370     /// This can be used for logging purposes to differentiate between multiple
3371     /// connections.
trace_id(&self) -> &str3372     pub fn trace_id(&self) -> &str {
3373         &self.trace_id
3374     }
3375 
3376     /// Returns the negotiated ALPN protocol.
3377     ///
3378     /// If no protocol has been negotiated, the returned value is empty.
application_proto(&self) -> &[u8]3379     pub fn application_proto(&self) -> &[u8] {
3380         self.handshake.alpn_protocol()
3381     }
3382 
3383     /// Returns the peer's leaf certificate (if any) as a DER-encoded buffer.
peer_cert(&self) -> Option<Vec<u8>>3384     pub fn peer_cert(&self) -> Option<Vec<u8>> {
3385         self.handshake.peer_cert()
3386     }
3387 
3388     /// Returns true if the connection handshake is complete.
is_established(&self) -> bool3389     pub fn is_established(&self) -> bool {
3390         self.handshake.is_completed()
3391     }
3392 
3393     /// Returns true if the connection is resumed.
is_resumed(&self) -> bool3394     pub fn is_resumed(&self) -> bool {
3395         self.handshake.is_resumed()
3396     }
3397 
3398     /// Returns true if the connection has a pending handshake that has
3399     /// progressed enough to send or receive early data.
is_in_early_data(&self) -> bool3400     pub fn is_in_early_data(&self) -> bool {
3401         self.handshake.is_in_early_data()
3402     }
3403 
3404     /// Returns true if the connection is closed.
3405     ///
3406     /// If this returns true, the connection object can be dropped.
is_closed(&self) -> bool3407     pub fn is_closed(&self) -> bool {
3408         self.closed
3409     }
3410 
3411     /// Collects and returns statistics about the connection.
stats(&self) -> Stats3412     pub fn stats(&self) -> Stats {
3413         Stats {
3414             recv: self.recv_count,
3415             sent: self.sent_count,
3416             lost: self.recovery.lost_count,
3417             cwnd: self.recovery.cwnd(),
3418             rtt: self.recovery.rtt(),
3419             delivery_rate: self.recovery.delivery_rate(),
3420         }
3421     }
3422 
encode_transport_params(&mut self) -> Result<()>3423     fn encode_transport_params(&mut self) -> Result<()> {
3424         let mut raw_params = [0; 128];
3425 
3426         let raw_params = TransportParams::encode(
3427             &self.local_transport_params,
3428             self.is_server,
3429             &mut raw_params,
3430         )?;
3431 
3432         self.handshake.set_quic_transport_params(raw_params)?;
3433 
3434         Ok(())
3435     }
3436 
3437     /// Continues the handshake.
3438     ///
3439     /// If the connection is already established, it does nothing.
do_handshake(&mut self) -> Result<()>3440     fn do_handshake(&mut self) -> Result<()> {
3441         // Handshake is already complete, there's nothing to do.
3442         if self.is_established() {
3443             return Ok(());
3444         }
3445 
3446         match self.handshake.do_handshake() {
3447             Ok(_) => (),
3448 
3449             Err(Error::Done) => return Ok(()),
3450 
3451             Err(e) => return Err(e),
3452         };
3453 
3454         if self.application_proto().is_empty() {
3455             // Send no_application_proto TLS alert when no protocol
3456             // can be negotiated.
3457             self.error = Some(0x178);
3458             return Err(Error::TlsFail);
3459         }
3460 
3461         trace!("{} connection established: proto={:?} cipher={:?} curve={:?} sigalg={:?} resumed={} {:?}",
3462                &self.trace_id,
3463                std::str::from_utf8(self.application_proto()),
3464                self.handshake.cipher(),
3465                self.handshake.curve(),
3466                self.handshake.sigalg(),
3467                self.is_resumed(),
3468                self.peer_transport_params);
3469 
3470         Ok(())
3471     }
3472 
3473     /// Selects the packet number space for outgoing packets.
write_epoch(&self) -> Result<packet::Epoch>3474     fn write_epoch(&self) -> Result<packet::Epoch> {
3475         // On error send packet in the latest epoch available, but only send
3476         // 1-RTT ones when the handshake is completed.
3477         if self.error.is_some() {
3478             let epoch = match self.handshake.write_level() {
3479                 crypto::Level::Initial => packet::EPOCH_INITIAL,
3480                 crypto::Level::ZeroRTT => unreachable!(),
3481                 crypto::Level::Handshake => packet::EPOCH_HANDSHAKE,
3482                 crypto::Level::OneRTT => packet::EPOCH_APPLICATION,
3483             };
3484 
3485             if epoch == packet::EPOCH_APPLICATION && !self.is_established() {
3486                 // Downgrade the epoch to handshake as the handshake is not
3487                 // completed yet.
3488                 return Ok(packet::EPOCH_HANDSHAKE);
3489             }
3490 
3491             return Ok(epoch);
3492         }
3493 
3494         for epoch in packet::EPOCH_INITIAL..packet::EPOCH_COUNT {
3495             // Only send packets in a space when we have the send keys for it.
3496             if self.pkt_num_spaces[epoch].crypto_seal.is_none() {
3497                 continue;
3498             }
3499 
3500             // We are ready to send data for this packet number space.
3501             if self.pkt_num_spaces[epoch].ready() {
3502                 return Ok(epoch);
3503             }
3504 
3505             // There are lost frames in this packet number space.
3506             if !self.recovery.lost[epoch].is_empty() {
3507                 return Ok(epoch);
3508             }
3509 
3510             // We need to send PTO probe packets.
3511             if self.recovery.loss_probes[epoch] > 0 {
3512                 return Ok(epoch);
3513             }
3514         }
3515 
3516         // If there are flushable, almost full or blocked streams, use the
3517         // Application epoch.
3518         if (self.is_established() || self.is_in_early_data()) &&
3519             (self.almost_full ||
3520                 self.blocked_limit.is_some() ||
3521                 self.dgram_send_queue.has_pending() ||
3522                 self.streams.should_update_max_streams_bidi() ||
3523                 self.streams.should_update_max_streams_uni() ||
3524                 self.streams.has_flushable() ||
3525                 self.streams.has_almost_full() ||
3526                 self.streams.has_blocked())
3527         {
3528             return Ok(packet::EPOCH_APPLICATION);
3529         }
3530 
3531         Err(Error::Done)
3532     }
3533 
3534     /// Returns the mutable stream with the given ID if it exists, or creates
3535     /// a new one otherwise.
get_or_create_stream( &mut self, id: u64, local: bool, ) -> Result<&mut stream::Stream>3536     fn get_or_create_stream(
3537         &mut self, id: u64, local: bool,
3538     ) -> Result<&mut stream::Stream> {
3539         self.streams.get_or_create(
3540             id,
3541             &self.local_transport_params,
3542             &self.peer_transport_params,
3543             local,
3544             self.is_server,
3545         )
3546     }
3547 
3548     /// Processes an incoming frame.
process_frame( &mut self, frame: frame::Frame, epoch: packet::Epoch, now: time::Instant, ) -> Result<()>3549     fn process_frame(
3550         &mut self, frame: frame::Frame, epoch: packet::Epoch, now: time::Instant,
3551     ) -> Result<()> {
3552         trace!("{} rx frm {:?}", self.trace_id, frame);
3553 
3554         match frame {
3555             frame::Frame::Padding { .. } => (),
3556 
3557             frame::Frame::Ping => (),
3558 
3559             frame::Frame::ACK { ranges, ack_delay } => {
3560                 let ack_delay = ack_delay
3561                     .checked_mul(2_u64.pow(
3562                         self.peer_transport_params.ack_delay_exponent as u32,
3563                     ))
3564                     .ok_or(Error::InvalidFrame)?;
3565 
3566                 if epoch == packet::EPOCH_HANDSHAKE {
3567                     self.peer_verified_address = true;
3568                 }
3569 
3570                 // When we receive an ACK for a 1-RTT packet after handshake
3571                 // completion, it means the handshake has been confirmed.
3572                 if epoch == packet::EPOCH_APPLICATION && self.is_established() {
3573                     self.peer_verified_address = true;
3574 
3575                     self.handshake_confirmed = true;
3576                 }
3577 
3578                 self.recovery.on_ack_received(
3579                     &ranges,
3580                     ack_delay,
3581                     epoch,
3582                     self.handshake_status(),
3583                     now,
3584                     &self.trace_id,
3585                 )?;
3586 
3587                 // Once the handshake is confirmed, we can drop Handshake keys.
3588                 if self.handshake_confirmed {
3589                     self.drop_epoch_state(packet::EPOCH_HANDSHAKE, now);
3590                 }
3591             },
3592 
3593             frame::Frame::ResetStream {
3594                 stream_id,
3595                 final_size,
3596                 ..
3597             } => {
3598                 // Peer can't send on our unidirectional streams.
3599                 if !stream::is_bidi(stream_id) &&
3600                     stream::is_local(stream_id, self.is_server)
3601                 {
3602                     return Err(Error::InvalidStreamState);
3603                 }
3604 
3605                 // Get existing stream or create a new one, but if the stream
3606                 // has already been closed and collected, ignore the frame.
3607                 //
3608                 // This can happen if e.g. an ACK frame is lost, and the peer
3609                 // retransmits another frame before it realizes that the stream
3610                 // is gone.
3611                 //
3612                 // Note that it makes it impossible to check if the frame is
3613                 // illegal, since we have no state, but since we ignore the
3614                 // frame, it should be fine.
3615                 let stream = match self.get_or_create_stream(stream_id, false) {
3616                     Ok(v) => v,
3617 
3618                     Err(Error::Done) => return Ok(()),
3619 
3620                     Err(e) => return Err(e),
3621                 };
3622 
3623                 self.rx_data += stream.recv.reset(final_size)? as u64;
3624 
3625                 if self.rx_data > self.max_rx_data {
3626                     return Err(Error::FlowControl);
3627                 }
3628             },
3629 
3630             frame::Frame::StopSending { stream_id, .. } => {
3631                 // STOP_SENDING on a receive-only stream is a fatal error.
3632                 if !stream::is_local(stream_id, self.is_server) &&
3633                     !stream::is_bidi(stream_id)
3634                 {
3635                     return Err(Error::InvalidStreamState);
3636                 }
3637             },
3638 
3639             frame::Frame::Crypto { data } => {
3640                 // Push the data to the stream so it can be re-ordered.
3641                 self.pkt_num_spaces[epoch].crypto_stream.recv.push(data)?;
3642 
3643                 // Feed crypto data to the TLS state, if there's data
3644                 // available at the expected offset.
3645                 let mut crypto_buf = [0; 512];
3646 
3647                 let level = crypto::Level::from_epoch(epoch);
3648 
3649                 let stream = &mut self.pkt_num_spaces[epoch].crypto_stream;
3650 
3651                 while let Ok((read, _)) = stream.recv.pop(&mut crypto_buf) {
3652                     let recv_buf = &crypto_buf[..read];
3653                     self.handshake.provide_data(level, &recv_buf)?;
3654                 }
3655 
3656                 self.do_handshake()?;
3657 
3658                 // Try to parse transport parameters as soon as the first flight
3659                 // of handshake data is processed.
3660                 //
3661                 // This is potentially dangerous as the handshake hasn't been
3662                 // completed yet, though it's required to be able to send data
3663                 // in 0.5 RTT.
3664                 let raw_params = self.handshake.quic_transport_params();
3665 
3666                 if !self.parsed_peer_transport_params && !raw_params.is_empty() {
3667                     let peer_params =
3668                         TransportParams::decode(&raw_params, self.is_server)?;
3669 
3670                     if self.version >= PROTOCOL_VERSION_DRAFT28 {
3671                         // Validate initial_source_connection_id.
3672                         match &peer_params.initial_source_connection_id {
3673                             Some(v) if v != &self.dcid =>
3674                                 return Err(Error::InvalidTransportParam),
3675 
3676                             Some(_) => (),
3677 
3678                             // initial_source_connection_id must be sent by
3679                             // both endpoints.
3680                             None => return Err(Error::InvalidTransportParam),
3681                         }
3682 
3683                         // Validate original_destination_connection_id.
3684                         if let Some(odcid) = &self.odcid {
3685                             match &peer_params.original_destination_connection_id
3686                             {
3687                                 Some(v) if v != odcid =>
3688                                     return Err(Error::InvalidTransportParam),
3689 
3690                                 Some(_) => (),
3691 
3692                                 // original_destination_connection_id must be
3693                                 // sent by the server.
3694                                 None if !self.is_server =>
3695                                     return Err(Error::InvalidTransportParam),
3696 
3697                                 None => (),
3698                             }
3699                         }
3700 
3701                         // Validate retry_source_connection_id.
3702                         if let Some(rscid) = &self.rscid {
3703                             match &peer_params.retry_source_connection_id {
3704                                 Some(v) if v != rscid =>
3705                                     return Err(Error::InvalidTransportParam),
3706 
3707                                 Some(_) => (),
3708 
3709                                 // retry_source_connection_id must be sent by
3710                                 // the server.
3711                                 None => return Err(Error::InvalidTransportParam),
3712                             }
3713                         }
3714                     } else {
3715                         // Legacy validation of the original connection ID when
3716                         // stateless retry is performed, for drafts < 28.
3717                         if self.did_retry &&
3718                             peer_params.original_destination_connection_id !=
3719                                 self.odcid
3720                         {
3721                             return Err(Error::InvalidTransportParam);
3722                         }
3723                     }
3724 
3725                     // Update flow control limits.
3726                     self.max_tx_data = peer_params.initial_max_data;
3727 
3728                     self.streams.update_peer_max_streams_bidi(
3729                         peer_params.initial_max_streams_bidi,
3730                     );
3731                     self.streams.update_peer_max_streams_uni(
3732                         peer_params.initial_max_streams_uni,
3733                     );
3734 
3735                     self.recovery.max_ack_delay =
3736                         time::Duration::from_millis(peer_params.max_ack_delay);
3737 
3738                     self.peer_transport_params = peer_params;
3739 
3740                     self.parsed_peer_transport_params = true;
3741                 }
3742             },
3743 
3744             // TODO: implement stateless retry
3745             frame::Frame::NewToken { .. } => (),
3746 
3747             frame::Frame::Stream { stream_id, data } => {
3748                 // Peer can't send on our unidirectional streams.
3749                 if !stream::is_bidi(stream_id) &&
3750                     stream::is_local(stream_id, self.is_server)
3751                 {
3752                     return Err(Error::InvalidStreamState);
3753                 }
3754 
3755                 let max_rx_data_left = self.max_rx_data - self.rx_data;
3756 
3757                 // Get existing stream or create a new one, but if the stream
3758                 // has already been closed and collected, ignore the frame.
3759                 //
3760                 // This can happen if e.g. an ACK frame is lost, and the peer
3761                 // retransmits another frame before it realizes that the stream
3762                 // is gone.
3763                 //
3764                 // Note that it makes it impossible to check if the frame is
3765                 // illegal, since we have no state, but since we ignore the
3766                 // frame, it should be fine.
3767                 let stream = match self.get_or_create_stream(stream_id, false) {
3768                     Ok(v) => v,
3769 
3770                     Err(Error::Done) => return Ok(()),
3771 
3772                     Err(e) => return Err(e),
3773                 };
3774 
3775                 // Check for the connection-level flow control limit.
3776                 let max_off_delta =
3777                     data.max_off().saturating_sub(stream.recv.max_off());
3778 
3779                 if max_off_delta > max_rx_data_left {
3780                     return Err(Error::FlowControl);
3781                 }
3782 
3783                 stream.recv.push(data)?;
3784 
3785                 if stream.is_readable() {
3786                     self.streams.mark_readable(stream_id, true);
3787                 }
3788 
3789                 self.rx_data += max_off_delta;
3790             },
3791 
3792             frame::Frame::MaxData { max } => {
3793                 self.max_tx_data = cmp::max(self.max_tx_data, max);
3794             },
3795 
3796             frame::Frame::MaxStreamData { stream_id, max } => {
3797                 // Get existing stream or create a new one, but if the stream
3798                 // has already been closed and collected, ignore the frame.
3799                 //
3800                 // This can happen if e.g. an ACK frame is lost, and the peer
3801                 // retransmits another frame before it realizes that the stream
3802                 // is gone.
3803                 //
3804                 // Note that it makes it impossible to check if the frame is
3805                 // illegal, since we have no state, but since we ignore the
3806                 // frame, it should be fine.
3807                 let stream = match self.get_or_create_stream(stream_id, false) {
3808                     Ok(v) => v,
3809 
3810                     Err(Error::Done) => return Ok(()),
3811 
3812                     Err(e) => return Err(e),
3813                 };
3814 
3815                 let was_flushable = stream.is_flushable();
3816 
3817                 stream.send.update_max_data(max);
3818 
3819                 let writable = stream.is_writable();
3820 
3821                 // If the stream is now flushable push it to the flushable queue,
3822                 // but only if it wasn't already queued.
3823                 if stream.is_flushable() && !was_flushable {
3824                     let urgency = stream.urgency;
3825                     let incremental = stream.incremental;
3826                     self.streams.push_flushable(stream_id, urgency, incremental);
3827                 }
3828 
3829                 if writable {
3830                     self.streams.mark_writable(stream_id, true);
3831                 }
3832             },
3833 
3834             frame::Frame::MaxStreamsBidi { max } => {
3835                 if max > MAX_STREAM_ID {
3836                     return Err(Error::InvalidFrame);
3837                 }
3838 
3839                 self.streams.update_peer_max_streams_bidi(max);
3840             },
3841 
3842             frame::Frame::MaxStreamsUni { max } => {
3843                 if max > MAX_STREAM_ID {
3844                     return Err(Error::InvalidFrame);
3845                 }
3846 
3847                 self.streams.update_peer_max_streams_uni(max);
3848             },
3849 
3850             frame::Frame::DataBlocked { .. } => (),
3851 
3852             frame::Frame::StreamDataBlocked { .. } => (),
3853 
3854             frame::Frame::StreamsBlockedBidi { limit } =>
3855                 if limit > MAX_STREAM_ID {
3856                     return Err(Error::InvalidFrame);
3857                 },
3858 
3859             frame::Frame::StreamsBlockedUni { limit } =>
3860                 if limit > MAX_STREAM_ID {
3861                     return Err(Error::InvalidFrame);
3862                 },
3863 
3864             // TODO: implement connection migration
3865             frame::Frame::NewConnectionId { .. } => (),
3866 
3867             // TODO: implement connection migration
3868             frame::Frame::RetireConnectionId { .. } => (),
3869 
3870             frame::Frame::PathChallenge { data } => {
3871                 self.challenge = Some(data);
3872             },
3873 
3874             frame::Frame::PathResponse { .. } => (),
3875 
3876             frame::Frame::ConnectionClose { .. } => {
3877                 self.draining_timer = Some(now + (self.recovery.pto() * 3));
3878             },
3879 
3880             frame::Frame::ApplicationClose { .. } => {
3881                 self.draining_timer = Some(now + (self.recovery.pto() * 3));
3882             },
3883 
3884             frame::Frame::HandshakeDone => {
3885                 if self.is_server {
3886                     return Err(Error::InvalidPacket);
3887                 }
3888 
3889                 self.peer_verified_address = true;
3890 
3891                 self.handshake_confirmed = true;
3892 
3893                 // Once the handshake is confirmed, we can drop Handshake keys.
3894                 self.drop_epoch_state(packet::EPOCH_HANDSHAKE, now);
3895             },
3896 
3897             frame::Frame::Datagram { data } => {
3898                 // Close the connection if DATAGRAMs are not enabled.
3899                 // quiche always advertises support for 64K sized DATAGRAM
3900                 // frames, as recommended by the standard, so we don't need a
3901                 // size check.
3902                 if self.dgram_enabled() {
3903                     return Err(Error::InvalidState);
3904                 }
3905 
3906                 // If recv queue is full, discard oldest
3907                 if self.dgram_recv_queue.is_full() {
3908                     self.dgram_recv_queue.pop();
3909                 }
3910 
3911                 self.dgram_recv_queue.push(&data)?;
3912             },
3913         }
3914 
3915         Ok(())
3916     }
3917 
3918     /// Drops the keys and recovery state for the given epoch.
drop_epoch_state(&mut self, epoch: packet::Epoch, now: time::Instant)3919     fn drop_epoch_state(&mut self, epoch: packet::Epoch, now: time::Instant) {
3920         if self.pkt_num_spaces[epoch].crypto_open.is_none() {
3921             return;
3922         }
3923 
3924         self.pkt_num_spaces[epoch].crypto_open = None;
3925         self.pkt_num_spaces[epoch].crypto_seal = None;
3926         self.pkt_num_spaces[epoch].clear();
3927 
3928         self.recovery.on_pkt_num_space_discarded(
3929             epoch,
3930             self.handshake_status(),
3931             now,
3932         );
3933 
3934         trace!("{} dropped epoch {} state", self.trace_id, epoch);
3935     }
3936 
3937     /// Returns true if the connection-level flow control needs to be updated.
3938     ///
3939     /// This happens when the new max data limit is at least double the amount
3940     /// of data that can be received before blocking.
should_update_max_data(&self) -> bool3941     fn should_update_max_data(&self) -> bool {
3942         self.max_rx_data_next != self.max_rx_data &&
3943             self.max_rx_data_next / 2 > self.max_rx_data - self.rx_data
3944     }
3945 
3946     /// Returns the idle timeout value.
3947     ///
3948     /// `None` is returned if both end-points disabled the idle timeout.
idle_timeout(&mut self) -> Option<time::Duration>3949     fn idle_timeout(&mut self) -> Option<time::Duration> {
3950         // If the transport parameter is set to 0, then the respective endpoint
3951         // decided to disable the idle timeout. If both are disabled we should
3952         // not set any timeout.
3953         if self.local_transport_params.max_idle_timeout == 0 &&
3954             self.peer_transport_params.max_idle_timeout == 0
3955         {
3956             return None;
3957         }
3958 
3959         // If the local endpoint or the peer disabled the idle timeout, use the
3960         // other peer's value, otherwise use the minimum of the two values.
3961         let idle_timeout = if self.local_transport_params.max_idle_timeout == 0 {
3962             self.peer_transport_params.max_idle_timeout
3963         } else if self.peer_transport_params.max_idle_timeout == 0 {
3964             self.local_transport_params.max_idle_timeout
3965         } else {
3966             cmp::min(
3967                 self.local_transport_params.max_idle_timeout,
3968                 self.peer_transport_params.max_idle_timeout,
3969             )
3970         };
3971 
3972         let idle_timeout = time::Duration::from_millis(idle_timeout);
3973         let idle_timeout = cmp::max(idle_timeout, 3 * self.recovery.pto());
3974 
3975         Some(idle_timeout)
3976     }
3977 
3978     /// Returns the connection's overall send capacity.
send_capacity(&self) -> usize3979     fn send_capacity(&self) -> usize {
3980         let cap = self.max_tx_data - self.tx_data;
3981         cmp::min(cap, self.recovery.cwnd_available() as u64) as usize
3982     }
3983 
3984     /// Returns the connection's handshake status for use in loss recovery.
handshake_status(&self) -> recovery::HandshakeStatus3985     fn handshake_status(&self) -> recovery::HandshakeStatus {
3986         recovery::HandshakeStatus {
3987             has_handshake_keys: self.pkt_num_spaces[packet::EPOCH_HANDSHAKE]
3988                 .has_keys(),
3989 
3990             peer_verified_address: self.peer_verified_address,
3991 
3992             completed: self.is_established(),
3993         }
3994     }
3995 }
3996 
3997 /// Maps an `Error` to `Error::Done`, or itself.
3998 ///
3999 /// When a received packet that hasn't yet been authenticated triggers a failure
4000 /// it should, in most cases, be ignored, instead of raising a connection error,
4001 /// to avoid potential man-in-the-middle and man-on-the-side attacks.
4002 ///
4003 /// However, if no other packet was previously received, the connection should
4004 /// indeed be closed as the received packet might just be network background
4005 /// noise, and it shouldn't keep resources occupied indefinitely.
4006 ///
4007 /// This function maps an error to `Error::Done` to ignore a packet failure
4008 /// without aborting the connection, except when no other packet was previously
4009 /// received, in which case the error itself is returned, but only on the
4010 /// server-side as the client will already have armed the idle timer.
4011 ///
4012 /// This must only be used for errors preceding packet authentication. Failures
4013 /// happening after a packet has been authenticated should still cause the
4014 /// connection to be aborted.
drop_pkt_on_err( e: Error, recv_count: usize, is_server: bool, trace_id: &str, ) -> Error4015 fn drop_pkt_on_err(
4016     e: Error, recv_count: usize, is_server: bool, trace_id: &str,
4017 ) -> Error {
4018     // On the server, if no other packet has been successflully processed, abort
4019     // the connection to avoid keeping the connection open when only junk is
4020     // received.
4021     if is_server && recv_count == 0 {
4022         return e;
4023     }
4024 
4025     trace!("{} dropped invalid packet", trace_id);
4026 
4027     // Ignore other invalid packets that haven't been authenticated to prevent
4028     // man-in-the-middle and man-on-the-side attacks.
4029     Error::Done
4030 }
4031 
4032 /// Statistics about the connection.
4033 ///
4034 /// A connections's statistics can be collected using the [`stats()`] method.
4035 ///
4036 /// [`stats()`]: struct.Connection.html#method.stats
4037 #[derive(Clone)]
4038 pub struct Stats {
4039     /// The number of QUIC packets received on this connection.
4040     pub recv: usize,
4041 
4042     /// The number of QUIC packets sent on this connection.
4043     pub sent: usize,
4044 
4045     /// The number of QUIC packets that were lost.
4046     pub lost: usize,
4047 
4048     /// The estimated round-trip time of the connection.
4049     pub rtt: time::Duration,
4050 
4051     /// The size of the connection's congestion window in bytes.
4052     pub cwnd: usize,
4053 
4054     /// The estimated data delivery rate in bytes/s.
4055     pub delivery_rate: u64,
4056 }
4057 
4058 impl std::fmt::Debug for Stats {
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result4059     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
4060         write!(
4061             f,
4062             "recv={} sent={} lost={} rtt={:?} cwnd={} delivery_rate={}",
4063             self.recv,
4064             self.sent,
4065             self.lost,
4066             self.rtt,
4067             self.cwnd,
4068             self.delivery_rate
4069         )
4070     }
4071 }
4072 
4073 #[derive(Clone, Debug, PartialEq)]
4074 struct TransportParams {
4075     pub original_destination_connection_id: Option<Vec<u8>>,
4076     pub max_idle_timeout: u64,
4077     pub stateless_reset_token: Option<Vec<u8>>,
4078     pub max_udp_payload_size: u64,
4079     pub initial_max_data: u64,
4080     pub initial_max_stream_data_bidi_local: u64,
4081     pub initial_max_stream_data_bidi_remote: u64,
4082     pub initial_max_stream_data_uni: u64,
4083     pub initial_max_streams_bidi: u64,
4084     pub initial_max_streams_uni: u64,
4085     pub ack_delay_exponent: u64,
4086     pub max_ack_delay: u64,
4087     pub disable_active_migration: bool,
4088     // pub preferred_address: ...,
4089     pub active_conn_id_limit: u64,
4090     pub initial_source_connection_id: Option<Vec<u8>>,
4091     pub retry_source_connection_id: Option<Vec<u8>>,
4092     pub max_datagram_frame_size: Option<u64>,
4093 }
4094 
4095 impl Default for TransportParams {
default() -> TransportParams4096     fn default() -> TransportParams {
4097         TransportParams {
4098             original_destination_connection_id: None,
4099             max_idle_timeout: 0,
4100             stateless_reset_token: None,
4101             max_udp_payload_size: 65527,
4102             initial_max_data: 0,
4103             initial_max_stream_data_bidi_local: 0,
4104             initial_max_stream_data_bidi_remote: 0,
4105             initial_max_stream_data_uni: 0,
4106             initial_max_streams_bidi: 0,
4107             initial_max_streams_uni: 0,
4108             ack_delay_exponent: 3,
4109             max_ack_delay: 25,
4110             disable_active_migration: false,
4111             active_conn_id_limit: 2,
4112             initial_source_connection_id: None,
4113             retry_source_connection_id: None,
4114             max_datagram_frame_size: None,
4115         }
4116     }
4117 }
4118 
4119 impl TransportParams {
decode(buf: &[u8], is_server: bool) -> Result<TransportParams>4120     fn decode(buf: &[u8], is_server: bool) -> Result<TransportParams> {
4121         let mut params = octets::Octets::with_slice(buf);
4122 
4123         let mut tp = TransportParams::default();
4124 
4125         while params.cap() > 0 {
4126             let id = params.get_varint()?;
4127 
4128             let mut val = params.get_bytes_with_varint_length()?;
4129 
4130             // TODO: forbid duplicated param
4131 
4132             match id {
4133                 0x0000 => {
4134                     if is_server {
4135                         return Err(Error::InvalidTransportParam);
4136                     }
4137 
4138                     tp.original_destination_connection_id = Some(val.to_vec());
4139                 },
4140 
4141                 0x0001 => {
4142                     tp.max_idle_timeout = val.get_varint()?;
4143                 },
4144 
4145                 0x0002 => {
4146                     if is_server {
4147                         return Err(Error::InvalidTransportParam);
4148                     }
4149 
4150                     tp.stateless_reset_token = Some(val.get_bytes(16)?.to_vec());
4151                 },
4152 
4153                 0x0003 => {
4154                     tp.max_udp_payload_size = val.get_varint()?;
4155 
4156                     if tp.max_udp_payload_size < 1200 {
4157                         return Err(Error::InvalidTransportParam);
4158                     }
4159                 },
4160 
4161                 0x0004 => {
4162                     tp.initial_max_data = val.get_varint()?;
4163                 },
4164 
4165                 0x0005 => {
4166                     tp.initial_max_stream_data_bidi_local = val.get_varint()?;
4167                 },
4168 
4169                 0x0006 => {
4170                     tp.initial_max_stream_data_bidi_remote = val.get_varint()?;
4171                 },
4172 
4173                 0x0007 => {
4174                     tp.initial_max_stream_data_uni = val.get_varint()?;
4175                 },
4176 
4177                 0x0008 => {
4178                     let max = val.get_varint()?;
4179 
4180                     if max > MAX_STREAM_ID {
4181                         return Err(Error::InvalidTransportParam);
4182                     }
4183 
4184                     tp.initial_max_streams_bidi = max;
4185                 },
4186 
4187                 0x0009 => {
4188                     let max = val.get_varint()?;
4189 
4190                     if max > MAX_STREAM_ID {
4191                         return Err(Error::InvalidTransportParam);
4192                     }
4193 
4194                     tp.initial_max_streams_uni = max;
4195                 },
4196 
4197                 0x000a => {
4198                     let ack_delay_exponent = val.get_varint()?;
4199 
4200                     if ack_delay_exponent > 20 {
4201                         return Err(Error::InvalidTransportParam);
4202                     }
4203 
4204                     tp.ack_delay_exponent = ack_delay_exponent;
4205                 },
4206 
4207                 0x000b => {
4208                     let max_ack_delay = val.get_varint()?;
4209 
4210                     if max_ack_delay >= 2_u64.pow(14) {
4211                         return Err(Error::InvalidTransportParam);
4212                     }
4213 
4214                     tp.max_ack_delay = max_ack_delay;
4215                 },
4216 
4217                 0x000c => {
4218                     tp.disable_active_migration = true;
4219                 },
4220 
4221                 0x000d => {
4222                     if is_server {
4223                         return Err(Error::InvalidTransportParam);
4224                     }
4225 
4226                     // TODO: decode preferred_address
4227                 },
4228 
4229                 0x000e => {
4230                     let limit = val.get_varint()?;
4231 
4232                     if limit < 2 {
4233                         return Err(Error::InvalidTransportParam);
4234                     }
4235 
4236                     tp.active_conn_id_limit = limit;
4237                 },
4238 
4239                 0x000f => {
4240                     tp.initial_source_connection_id = Some(val.to_vec());
4241                 },
4242 
4243                 0x00010 => {
4244                     if is_server {
4245                         return Err(Error::InvalidTransportParam);
4246                     }
4247 
4248                     tp.retry_source_connection_id = Some(val.to_vec());
4249                 },
4250 
4251                 0x0020 => {
4252                     tp.max_datagram_frame_size = Some(val.get_varint()?);
4253                 },
4254 
4255                 // Ignore unknown parameters.
4256                 _ => (),
4257             }
4258         }
4259 
4260         Ok(tp)
4261     }
4262 
encode_param( b: &mut octets::OctetsMut, ty: u64, len: usize, ) -> Result<()>4263     fn encode_param(
4264         b: &mut octets::OctetsMut, ty: u64, len: usize,
4265     ) -> Result<()> {
4266         b.put_varint(ty)?;
4267         b.put_varint(len as u64)?;
4268 
4269         Ok(())
4270     }
4271 
encode<'a>( tp: &TransportParams, is_server: bool, out: &'a mut [u8], ) -> Result<&'a mut [u8]>4272     fn encode<'a>(
4273         tp: &TransportParams, is_server: bool, out: &'a mut [u8],
4274     ) -> Result<&'a mut [u8]> {
4275         let mut b = octets::OctetsMut::with_slice(out);
4276 
4277         if is_server {
4278             if let Some(ref odcid) = tp.original_destination_connection_id {
4279                 TransportParams::encode_param(&mut b, 0x0000, odcid.len())?;
4280                 b.put_bytes(&odcid)?;
4281             }
4282         };
4283 
4284         if tp.max_idle_timeout != 0 {
4285             TransportParams::encode_param(
4286                 &mut b,
4287                 0x0001,
4288                 octets::varint_len(tp.max_idle_timeout),
4289             )?;
4290             b.put_varint(tp.max_idle_timeout)?;
4291         }
4292 
4293         if is_server {
4294             if let Some(ref token) = tp.stateless_reset_token {
4295                 TransportParams::encode_param(&mut b, 0x0002, token.len())?;
4296                 b.put_bytes(&token)?;
4297             }
4298         }
4299 
4300         if tp.max_udp_payload_size != 0 {
4301             TransportParams::encode_param(
4302                 &mut b,
4303                 0x0003,
4304                 octets::varint_len(tp.max_udp_payload_size),
4305             )?;
4306             b.put_varint(tp.max_udp_payload_size)?;
4307         }
4308 
4309         if tp.initial_max_data != 0 {
4310             TransportParams::encode_param(
4311                 &mut b,
4312                 0x0004,
4313                 octets::varint_len(tp.initial_max_data),
4314             )?;
4315             b.put_varint(tp.initial_max_data)?;
4316         }
4317 
4318         if tp.initial_max_stream_data_bidi_local != 0 {
4319             TransportParams::encode_param(
4320                 &mut b,
4321                 0x0005,
4322                 octets::varint_len(tp.initial_max_stream_data_bidi_local),
4323             )?;
4324             b.put_varint(tp.initial_max_stream_data_bidi_local)?;
4325         }
4326 
4327         if tp.initial_max_stream_data_bidi_remote != 0 {
4328             TransportParams::encode_param(
4329                 &mut b,
4330                 0x0006,
4331                 octets::varint_len(tp.initial_max_stream_data_bidi_remote),
4332             )?;
4333             b.put_varint(tp.initial_max_stream_data_bidi_remote)?;
4334         }
4335 
4336         if tp.initial_max_stream_data_uni != 0 {
4337             TransportParams::encode_param(
4338                 &mut b,
4339                 0x0007,
4340                 octets::varint_len(tp.initial_max_stream_data_uni),
4341             )?;
4342             b.put_varint(tp.initial_max_stream_data_uni)?;
4343         }
4344 
4345         if tp.initial_max_streams_bidi != 0 {
4346             TransportParams::encode_param(
4347                 &mut b,
4348                 0x0008,
4349                 octets::varint_len(tp.initial_max_streams_bidi),
4350             )?;
4351             b.put_varint(tp.initial_max_streams_bidi)?;
4352         }
4353 
4354         if tp.initial_max_streams_uni != 0 {
4355             TransportParams::encode_param(
4356                 &mut b,
4357                 0x0009,
4358                 octets::varint_len(tp.initial_max_streams_uni),
4359             )?;
4360             b.put_varint(tp.initial_max_streams_uni)?;
4361         }
4362 
4363         if tp.ack_delay_exponent != 0 {
4364             TransportParams::encode_param(
4365                 &mut b,
4366                 0x000a,
4367                 octets::varint_len(tp.ack_delay_exponent),
4368             )?;
4369             b.put_varint(tp.ack_delay_exponent)?;
4370         }
4371 
4372         if tp.max_ack_delay != 0 {
4373             TransportParams::encode_param(
4374                 &mut b,
4375                 0x000b,
4376                 octets::varint_len(tp.max_ack_delay),
4377             )?;
4378             b.put_varint(tp.max_ack_delay)?;
4379         }
4380 
4381         if tp.disable_active_migration {
4382             TransportParams::encode_param(&mut b, 0x000c, 0)?;
4383         }
4384 
4385         // TODO: encode preferred_address
4386 
4387         if tp.active_conn_id_limit != 2 {
4388             TransportParams::encode_param(
4389                 &mut b,
4390                 0x000e,
4391                 octets::varint_len(tp.active_conn_id_limit),
4392             )?;
4393             b.put_varint(tp.active_conn_id_limit)?;
4394         }
4395 
4396         if let Some(scid) = &tp.initial_source_connection_id {
4397             TransportParams::encode_param(&mut b, 0x000f, scid.len())?;
4398             b.put_bytes(&scid)?;
4399         }
4400 
4401         if is_server {
4402             if let Some(scid) = &tp.retry_source_connection_id {
4403                 TransportParams::encode_param(&mut b, 0x0010, scid.len())?;
4404                 b.put_bytes(&scid)?;
4405             }
4406         }
4407 
4408         if let Some(max_datagram_frame_size) = tp.max_datagram_frame_size {
4409             TransportParams::encode_param(
4410                 &mut b,
4411                 0x0020,
4412                 octets::varint_len(max_datagram_frame_size),
4413             )?;
4414             b.put_varint(max_datagram_frame_size)?;
4415         }
4416 
4417         let out_len = b.off();
4418 
4419         Ok(&mut out[..out_len])
4420     }
4421 
4422     /// Creates a qlog event for connection transport parameters and TLS fields
4423     #[cfg(feature = "qlog")]
to_qlog( &self, owner: qlog::TransportOwner, version: u32, alpn: &[u8], cipher: Option<crypto::Algorithm>, ) -> qlog::event::Event4424     pub fn to_qlog(
4425         &self, owner: qlog::TransportOwner, version: u32, alpn: &[u8],
4426         cipher: Option<crypto::Algorithm>,
4427     ) -> qlog::event::Event {
4428         let ocid = qlog::HexSlice::maybe_string(
4429             self.original_destination_connection_id.as_ref(),
4430         );
4431         let stateless_reset_token =
4432             qlog::HexSlice::maybe_string(self.stateless_reset_token.as_ref());
4433 
4434         qlog::event::Event::transport_parameters_set(
4435             Some(owner),
4436             None, // resumption
4437             None, // early data
4438             String::from_utf8(alpn.to_vec()).ok(),
4439             Some(format!("{:x?}", version)),
4440             Some(format!("{:?}", cipher)),
4441             ocid,
4442             stateless_reset_token,
4443             Some(self.disable_active_migration),
4444             Some(self.max_idle_timeout),
4445             Some(self.max_udp_payload_size),
4446             Some(self.ack_delay_exponent),
4447             Some(self.max_ack_delay),
4448             Some(self.active_conn_id_limit),
4449             Some(self.initial_max_data.to_string()),
4450             Some(self.initial_max_stream_data_bidi_local.to_string()),
4451             Some(self.initial_max_stream_data_bidi_remote.to_string()),
4452             Some(self.initial_max_stream_data_uni.to_string()),
4453             Some(self.initial_max_streams_bidi.to_string()),
4454             Some(self.initial_max_streams_uni.to_string()),
4455             None, // preferred address
4456         )
4457     }
4458 }
4459 
4460 #[doc(hidden)]
4461 pub mod testing {
4462     use super::*;
4463 
4464     pub struct Pipe {
4465         pub client: Pin<Box<Connection>>,
4466         pub server: Pin<Box<Connection>>,
4467     }
4468 
4469     impl Pipe {
default() -> Result<Pipe>4470         pub fn default() -> Result<Pipe> {
4471             let mut config = Config::new(crate::PROTOCOL_VERSION)?;
4472             config.load_cert_chain_from_pem_file("examples/cert.crt")?;
4473             config.load_priv_key_from_pem_file("examples/cert.key")?;
4474             config.set_application_protos(b"\x06proto1\x06proto2")?;
4475             config.set_initial_max_data(30);
4476             config.set_initial_max_stream_data_bidi_local(15);
4477             config.set_initial_max_stream_data_bidi_remote(15);
4478             config.set_initial_max_stream_data_uni(10);
4479             config.set_initial_max_streams_bidi(3);
4480             config.set_initial_max_streams_uni(3);
4481             config.set_max_idle_timeout(180_000);
4482             config.verify_peer(false);
4483 
4484             Pipe::with_config(&mut config)
4485         }
4486 
with_config(config: &mut Config) -> Result<Pipe>4487         pub fn with_config(config: &mut Config) -> Result<Pipe> {
4488             let mut client_scid = [0; 16];
4489             rand::rand_bytes(&mut client_scid[..]);
4490 
4491             let mut server_scid = [0; 16];
4492             rand::rand_bytes(&mut server_scid[..]);
4493 
4494             Ok(Pipe {
4495                 client: connect(Some("quic.tech"), &client_scid, config)?,
4496                 server: accept(&server_scid, None, config)?,
4497             })
4498         }
4499 
with_client_config(client_config: &mut Config) -> Result<Pipe>4500         pub fn with_client_config(client_config: &mut Config) -> Result<Pipe> {
4501             let mut client_scid = [0; 16];
4502             rand::rand_bytes(&mut client_scid[..]);
4503 
4504             let mut server_scid = [0; 16];
4505             rand::rand_bytes(&mut server_scid[..]);
4506 
4507             let mut config = Config::new(crate::PROTOCOL_VERSION)?;
4508             config.load_cert_chain_from_pem_file("examples/cert.crt")?;
4509             config.load_priv_key_from_pem_file("examples/cert.key")?;
4510             config.set_application_protos(b"\x06proto1\x06proto2")?;
4511             config.set_initial_max_data(30);
4512             config.set_initial_max_stream_data_bidi_local(15);
4513             config.set_initial_max_stream_data_bidi_remote(15);
4514             config.set_initial_max_streams_bidi(3);
4515             config.set_initial_max_streams_uni(3);
4516 
4517             Ok(Pipe {
4518                 client: connect(Some("quic.tech"), &client_scid, client_config)?,
4519                 server: accept(&server_scid, None, &mut config)?,
4520             })
4521         }
4522 
with_server_config(server_config: &mut Config) -> Result<Pipe>4523         pub fn with_server_config(server_config: &mut Config) -> Result<Pipe> {
4524             let mut client_scid = [0; 16];
4525             rand::rand_bytes(&mut client_scid[..]);
4526 
4527             let mut server_scid = [0; 16];
4528             rand::rand_bytes(&mut server_scid[..]);
4529 
4530             let mut config = Config::new(crate::PROTOCOL_VERSION)?;
4531             config.set_application_protos(b"\x06proto1\x06proto2")?;
4532             config.set_initial_max_data(30);
4533             config.set_initial_max_stream_data_bidi_local(15);
4534             config.set_initial_max_stream_data_bidi_remote(15);
4535             config.set_initial_max_streams_bidi(3);
4536             config.set_initial_max_streams_uni(3);
4537 
4538             Ok(Pipe {
4539                 client: connect(Some("quic.tech"), &client_scid, &mut config)?,
4540                 server: accept(&server_scid, None, server_config)?,
4541             })
4542         }
4543 
handshake(&mut self, buf: &mut [u8]) -> Result<()>4544         pub fn handshake(&mut self, buf: &mut [u8]) -> Result<()> {
4545             let mut len = self.client.send(buf)?;
4546 
4547             while !self.client.is_established() && !self.server.is_established() {
4548                 len = recv_send(&mut self.server, buf, len)?;
4549                 len = recv_send(&mut self.client, buf, len)?;
4550             }
4551 
4552             recv_send(&mut self.server, buf, len)?;
4553 
4554             Ok(())
4555         }
4556 
flush_client(&mut self, buf: &mut [u8]) -> Result<()>4557         pub fn flush_client(&mut self, buf: &mut [u8]) -> Result<()> {
4558             loop {
4559                 let len = match self.client.send(buf) {
4560                     Ok(v) => v,
4561 
4562                     Err(Error::Done) => break,
4563 
4564                     Err(e) => return Err(e),
4565                 };
4566 
4567                 match self.server.recv(&mut buf[..len]) {
4568                     Ok(_) => (),
4569 
4570                     Err(Error::Done) => (),
4571 
4572                     Err(e) => return Err(e),
4573                 }
4574             }
4575 
4576             Ok(())
4577         }
4578 
flush_server(&mut self, buf: &mut [u8]) -> Result<()>4579         pub fn flush_server(&mut self, buf: &mut [u8]) -> Result<()> {
4580             loop {
4581                 let len = match self.server.send(buf) {
4582                     Ok(v) => v,
4583 
4584                     Err(Error::Done) => break,
4585 
4586                     Err(e) => return Err(e),
4587                 };
4588 
4589                 match self.client.recv(&mut buf[..len]) {
4590                     Ok(_) => (),
4591 
4592                     Err(Error::Done) => (),
4593 
4594                     Err(e) => return Err(e),
4595                 }
4596             }
4597 
4598             Ok(())
4599         }
4600 
advance(&mut self, buf: &mut [u8]) -> Result<()>4601         pub fn advance(&mut self, buf: &mut [u8]) -> Result<()> {
4602             let mut client_done = false;
4603             let mut server_done = false;
4604 
4605             let mut len = 0;
4606 
4607             while !client_done || !server_done {
4608                 len = recv_send(&mut self.client, buf, len)?;
4609                 client_done = len == 0;
4610 
4611                 len = recv_send(&mut self.server, buf, len)?;
4612                 server_done = len == 0;
4613             }
4614 
4615             Ok(())
4616         }
4617 
send_pkt_to_server( &mut self, pkt_type: packet::Type, frames: &[frame::Frame], buf: &mut [u8], ) -> Result<usize>4618         pub fn send_pkt_to_server(
4619             &mut self, pkt_type: packet::Type, frames: &[frame::Frame],
4620             buf: &mut [u8],
4621         ) -> Result<usize> {
4622             let written = encode_pkt(&mut self.client, pkt_type, frames, buf)?;
4623             recv_send(&mut self.server, buf, written)
4624         }
4625     }
4626 
recv_send( conn: &mut Connection, buf: &mut [u8], len: usize, ) -> Result<usize>4627     pub fn recv_send(
4628         conn: &mut Connection, buf: &mut [u8], len: usize,
4629     ) -> Result<usize> {
4630         let mut left = len;
4631 
4632         while left > 0 {
4633             match conn.recv(&mut buf[len - left..len]) {
4634                 Ok(read) => left -= read,
4635 
4636                 Err(Error::Done) => break,
4637 
4638                 Err(e) => return Err(e),
4639             }
4640         }
4641 
4642         assert_eq!(left, 0);
4643 
4644         let mut off = 0;
4645 
4646         while off < buf.len() {
4647             match conn.send(&mut buf[off..]) {
4648                 Ok(write) => off += write,
4649 
4650                 Err(Error::Done) => break,
4651 
4652                 Err(e) => return Err(e),
4653             }
4654         }
4655 
4656         Ok(off)
4657     }
4658 
encode_pkt( conn: &mut Connection, pkt_type: packet::Type, frames: &[frame::Frame], buf: &mut [u8], ) -> Result<usize>4659     pub fn encode_pkt(
4660         conn: &mut Connection, pkt_type: packet::Type, frames: &[frame::Frame],
4661         buf: &mut [u8],
4662     ) -> Result<usize> {
4663         let mut b = octets::OctetsMut::with_slice(buf);
4664 
4665         let epoch = pkt_type.to_epoch()?;
4666 
4667         let space = &mut conn.pkt_num_spaces[epoch];
4668 
4669         let pn = space.next_pkt_num;
4670         let pn_len = packet::pkt_num_len(pn)?;
4671 
4672         let hdr = Header {
4673             ty: pkt_type,
4674             version: conn.version,
4675             dcid: conn.dcid.clone(),
4676             scid: conn.scid.clone(),
4677             pkt_num: 0,
4678             pkt_num_len: pn_len,
4679             token: conn.token.clone(),
4680             versions: None,
4681             key_phase: false,
4682         };
4683 
4684         hdr.to_bytes(&mut b)?;
4685 
4686         let payload_len = frames.iter().fold(0, |acc, x| acc + x.wire_len()) +
4687             space.crypto_overhead().unwrap();
4688 
4689         if pkt_type != packet::Type::Short {
4690             let len = pn_len + payload_len;
4691             b.put_varint(len as u64)?;
4692         }
4693 
4694         packet::encode_pkt_num(pn, &mut b)?;
4695 
4696         let payload_offset = b.off();
4697 
4698         for frame in frames {
4699             frame.to_bytes(&mut b)?;
4700         }
4701 
4702         let aead = match space.crypto_seal {
4703             Some(ref v) => v,
4704             None => return Err(Error::InvalidState),
4705         };
4706 
4707         let written = packet::encrypt_pkt(
4708             &mut b,
4709             pn,
4710             pn_len,
4711             payload_len,
4712             payload_offset,
4713             aead,
4714         )?;
4715 
4716         space.next_pkt_num += 1;
4717 
4718         Ok(written)
4719     }
4720 
decode_pkt( conn: &mut Connection, buf: &mut [u8], len: usize, ) -> Result<Vec<frame::Frame>>4721     pub fn decode_pkt(
4722         conn: &mut Connection, buf: &mut [u8], len: usize,
4723     ) -> Result<Vec<frame::Frame>> {
4724         let mut b = octets::OctetsMut::with_slice(&mut buf[..len]);
4725 
4726         let mut hdr = Header::from_bytes(&mut b, conn.scid.len()).unwrap();
4727 
4728         let epoch = hdr.ty.to_epoch()?;
4729 
4730         let aead = conn.pkt_num_spaces[epoch].crypto_open.as_ref().unwrap();
4731 
4732         let payload_len = b.cap();
4733 
4734         packet::decrypt_hdr(&mut b, &mut hdr, &aead).unwrap();
4735 
4736         let pn = packet::decode_pkt_num(
4737             conn.pkt_num_spaces[epoch].largest_rx_pkt_num,
4738             hdr.pkt_num,
4739             hdr.pkt_num_len,
4740         );
4741 
4742         let mut payload =
4743             packet::decrypt_pkt(&mut b, pn, hdr.pkt_num_len, payload_len, aead)
4744                 .unwrap();
4745 
4746         let mut frames = Vec::new();
4747 
4748         while payload.cap() > 0 {
4749             let frame = frame::Frame::from_bytes(&mut payload, hdr.ty)?;
4750             frames.push(frame);
4751         }
4752 
4753         Ok(frames)
4754     }
4755 }
4756 
4757 #[cfg(test)]
4758 mod tests {
4759     use super::*;
4760 
4761     #[test]
transport_params()4762     fn transport_params() {
4763         // Server encodes, client decodes.
4764         let tp = TransportParams {
4765             original_destination_connection_id: None,
4766             max_idle_timeout: 30,
4767             stateless_reset_token: Some(vec![0xba; 16]),
4768             max_udp_payload_size: 23_421,
4769             initial_max_data: 424_645_563,
4770             initial_max_stream_data_bidi_local: 154_323_123,
4771             initial_max_stream_data_bidi_remote: 6_587_456,
4772             initial_max_stream_data_uni: 2_461_234,
4773             initial_max_streams_bidi: 12_231,
4774             initial_max_streams_uni: 18_473,
4775             ack_delay_exponent: 20,
4776             max_ack_delay: 2_u64.pow(14) - 1,
4777             disable_active_migration: true,
4778             active_conn_id_limit: 8,
4779             initial_source_connection_id: Some(b"woot woot".to_vec()),
4780             retry_source_connection_id: Some(b"retry".to_vec()),
4781             max_datagram_frame_size: Some(32),
4782         };
4783 
4784         let mut raw_params = [42; 256];
4785         let raw_params =
4786             TransportParams::encode(&tp, true, &mut raw_params).unwrap();
4787         assert_eq!(raw_params.len(), 94);
4788 
4789         let new_tp = TransportParams::decode(&raw_params, false).unwrap();
4790 
4791         assert_eq!(new_tp, tp);
4792 
4793         // Client encodes, server decodes.
4794         let tp = TransportParams {
4795             original_destination_connection_id: None,
4796             max_idle_timeout: 30,
4797             stateless_reset_token: None,
4798             max_udp_payload_size: 23_421,
4799             initial_max_data: 424_645_563,
4800             initial_max_stream_data_bidi_local: 154_323_123,
4801             initial_max_stream_data_bidi_remote: 6_587_456,
4802             initial_max_stream_data_uni: 2_461_234,
4803             initial_max_streams_bidi: 12_231,
4804             initial_max_streams_uni: 18_473,
4805             ack_delay_exponent: 20,
4806             max_ack_delay: 2_u64.pow(14) - 1,
4807             disable_active_migration: true,
4808             active_conn_id_limit: 8,
4809             initial_source_connection_id: Some(b"woot woot".to_vec()),
4810             retry_source_connection_id: None,
4811             max_datagram_frame_size: Some(32),
4812         };
4813 
4814         let mut raw_params = [42; 256];
4815         let raw_params =
4816             TransportParams::encode(&tp, false, &mut raw_params).unwrap();
4817         assert_eq!(raw_params.len(), 69);
4818 
4819         let new_tp = TransportParams::decode(&raw_params, true).unwrap();
4820 
4821         assert_eq!(new_tp, tp);
4822     }
4823 
4824     #[test]
4825     #[ignore = "Android: failure reason unkown."]
unknown_version()4826     fn unknown_version() {
4827         let mut buf = [0; 65535];
4828 
4829         let mut config = Config::new(0xbabababa).unwrap();
4830         config.verify_peer(false);
4831 
4832         let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
4833 
4834         assert_eq!(pipe.handshake(&mut buf), Err(Error::UnknownVersion));
4835     }
4836 
4837     #[test]
version_negotiation()4838     fn version_negotiation() {
4839         let mut buf = [0; 65535];
4840 
4841         let mut config = Config::new(0xbabababa).unwrap();
4842         config
4843             .set_application_protos(b"\x06proto1\x06proto2")
4844             .unwrap();
4845         config.verify_peer(false);
4846 
4847         let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
4848 
4849         let mut len = pipe.client.send(&mut buf).unwrap();
4850 
4851         let hdr = packet::Header::from_slice(&mut buf[..len], 0).unwrap();
4852         len = crate::negotiate_version(&hdr.scid, &hdr.dcid, &mut buf).unwrap();
4853 
4854         assert_eq!(pipe.client.recv(&mut buf[..len]), Ok(len));
4855 
4856         assert_eq!(pipe.handshake(&mut buf), Ok(()));
4857 
4858         assert_eq!(pipe.client.version, PROTOCOL_VERSION);
4859         assert_eq!(pipe.server.version, PROTOCOL_VERSION);
4860     }
4861 
4862     #[test]
verify_custom_root()4863     fn verify_custom_root() {
4864         let mut buf = [0; 65535];
4865 
4866         let mut config = Config::new(PROTOCOL_VERSION).unwrap();
4867         config.verify_peer(true);
4868         config
4869             .load_verify_locations_from_file("examples/rootca.crt")
4870             .unwrap();
4871         config
4872             .set_application_protos(b"\x06proto1\x06proto2")
4873             .unwrap();
4874 
4875         let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
4876         assert_eq!(pipe.handshake(&mut buf), Ok(()));
4877     }
4878 
4879     #[test]
missing_initial_source_connection_id()4880     fn missing_initial_source_connection_id() {
4881         let mut buf = [0; 65535];
4882 
4883         let mut pipe = testing::Pipe::default().unwrap();
4884 
4885         // Reset initial_source_connection_id.
4886         pipe.client
4887             .local_transport_params
4888             .initial_source_connection_id = None;
4889         assert_eq!(pipe.client.encode_transport_params(), Ok(()));
4890 
4891         // Client sends initial flight.
4892         let len = pipe.client.send(&mut buf).unwrap();
4893 
4894         // Server rejects transport parameters.
4895         assert_eq!(
4896             testing::recv_send(&mut pipe.server, &mut buf, len),
4897             Err(Error::InvalidTransportParam)
4898         );
4899     }
4900 
4901     #[test]
invalid_initial_source_connection_id()4902     fn invalid_initial_source_connection_id() {
4903         let mut buf = [0; 65535];
4904 
4905         let mut pipe = testing::Pipe::default().unwrap();
4906 
4907         // Scramble initial_source_connection_id.
4908         pipe.client
4909             .local_transport_params
4910             .initial_source_connection_id = Some(b"bogus value".to_vec());
4911         assert_eq!(pipe.client.encode_transport_params(), Ok(()));
4912 
4913         // Client sends initial flight.
4914         let len = pipe.client.send(&mut buf).unwrap();
4915 
4916         // Server rejects transport parameters.
4917         assert_eq!(
4918             testing::recv_send(&mut pipe.server, &mut buf, len),
4919             Err(Error::InvalidTransportParam)
4920         );
4921     }
4922 
4923     #[test]
handshake()4924     fn handshake() {
4925         let mut buf = [0; 65535];
4926 
4927         let mut pipe = testing::Pipe::default().unwrap();
4928 
4929         assert_eq!(pipe.handshake(&mut buf), Ok(()));
4930 
4931         assert_eq!(
4932             pipe.client.application_proto(),
4933             pipe.server.application_proto()
4934         );
4935     }
4936 
4937     #[test]
handshake_confirmation()4938     fn handshake_confirmation() {
4939         let mut buf = [0; 65535];
4940 
4941         let mut pipe = testing::Pipe::default().unwrap();
4942 
4943         // Client sends initial flight.
4944         let mut len = pipe.client.send(&mut buf).unwrap();
4945 
4946         // Server sends initial flight.
4947         len = testing::recv_send(&mut pipe.server, &mut buf, len).unwrap();
4948 
4949         assert!(!pipe.client.is_established());
4950         assert!(!pipe.client.handshake_confirmed);
4951 
4952         assert!(!pipe.server.is_established());
4953         assert!(!pipe.server.handshake_confirmed);
4954 
4955         // Client sends Handshake packet and completes handshake.
4956         len = testing::recv_send(&mut pipe.client, &mut buf, len).unwrap();
4957 
4958         assert!(pipe.client.is_established());
4959         assert!(!pipe.client.handshake_confirmed);
4960 
4961         assert!(!pipe.server.is_established());
4962         assert!(!pipe.server.handshake_confirmed);
4963 
4964         // Server completes handshake and sends HANDSHAKE_DONE.
4965         len = testing::recv_send(&mut pipe.server, &mut buf, len).unwrap();
4966 
4967         assert!(pipe.client.is_established());
4968         assert!(!pipe.client.handshake_confirmed);
4969 
4970         assert!(pipe.server.is_established());
4971         assert!(!pipe.server.handshake_confirmed);
4972 
4973         // Client acks 1-RTT packet, and confirms handshake.
4974         len = testing::recv_send(&mut pipe.client, &mut buf, len).unwrap();
4975 
4976         assert!(pipe.client.is_established());
4977         assert!(pipe.client.handshake_confirmed);
4978 
4979         assert!(pipe.server.is_established());
4980         assert!(!pipe.server.handshake_confirmed);
4981 
4982         // Server handshake is confirmed.
4983         testing::recv_send(&mut pipe.server, &mut buf, len).unwrap();
4984 
4985         assert!(pipe.client.is_established());
4986         assert!(pipe.client.handshake_confirmed);
4987 
4988         assert!(pipe.server.is_established());
4989         assert!(pipe.server.handshake_confirmed);
4990     }
4991 
4992     #[test]
handshake_alpn_mismatch()4993     fn handshake_alpn_mismatch() {
4994         let mut buf = [0; 65535];
4995 
4996         let mut config = Config::new(PROTOCOL_VERSION).unwrap();
4997         config
4998             .set_application_protos(b"\x06proto3\x06proto4")
4999             .unwrap();
5000         config.verify_peer(false);
5001 
5002         let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
5003 
5004         assert_eq!(pipe.handshake(&mut buf), Err(Error::TlsFail));
5005 
5006         assert_eq!(pipe.client.application_proto(), b"");
5007         assert_eq!(pipe.server.application_proto(), b"");
5008     }
5009 
5010     #[test]
limit_handshake_data()5011     fn limit_handshake_data() {
5012         let mut buf = [0; 65535];
5013 
5014         let mut config = Config::new(PROTOCOL_VERSION).unwrap();
5015         config
5016             .load_cert_chain_from_pem_file("examples/cert-big.crt")
5017             .unwrap();
5018         config
5019             .load_priv_key_from_pem_file("examples/cert.key")
5020             .unwrap();
5021         config
5022             .set_application_protos(b"\x06proto1\06proto2")
5023             .unwrap();
5024 
5025         let mut pipe = testing::Pipe::with_server_config(&mut config).unwrap();
5026 
5027         let client_sent = pipe.client.send(&mut buf).unwrap();
5028         let server_sent =
5029             testing::recv_send(&mut pipe.server, &mut buf, client_sent).unwrap();
5030 
5031         assert_eq!(server_sent, (client_sent - 1) * MAX_AMPLIFICATION_FACTOR);
5032     }
5033 
5034     #[test]
stream()5035     fn stream() {
5036         let mut buf = [0; 65535];
5037 
5038         let mut pipe = testing::Pipe::default().unwrap();
5039 
5040         assert_eq!(pipe.handshake(&mut buf), Ok(()));
5041 
5042         assert_eq!(pipe.client.stream_send(4, b"hello, world", true), Ok(12));
5043 
5044         assert_eq!(pipe.advance(&mut buf), Ok(()));
5045 
5046         assert!(!pipe.server.stream_finished(4));
5047 
5048         let mut r = pipe.server.readable();
5049         assert_eq!(r.next(), Some(4));
5050         assert_eq!(r.next(), None);
5051 
5052         let mut b = [0; 15];
5053         assert_eq!(pipe.server.stream_recv(4, &mut b), Ok((12, true)));
5054         assert_eq!(&b[..12], b"hello, world");
5055 
5056         assert!(pipe.server.stream_finished(4));
5057     }
5058 
5059     #[test]
stream_send_on_32bit_arch()5060     fn stream_send_on_32bit_arch() {
5061         let mut buf = [0; 65535];
5062 
5063         let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
5064         config
5065             .load_cert_chain_from_pem_file("examples/cert.crt")
5066             .unwrap();
5067         config
5068             .load_priv_key_from_pem_file("examples/cert.key")
5069             .unwrap();
5070         config
5071             .set_application_protos(b"\x06proto1\x06proto2")
5072             .unwrap();
5073         config.set_initial_max_data(2_u64.pow(32) + 5);
5074         config.set_initial_max_stream_data_bidi_local(15);
5075         config.set_initial_max_stream_data_bidi_remote(15);
5076         config.set_initial_max_stream_data_uni(10);
5077         config.set_initial_max_streams_bidi(3);
5078         config.set_initial_max_streams_uni(0);
5079         config.verify_peer(false);
5080 
5081         let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
5082 
5083         assert_eq!(pipe.handshake(&mut buf), Ok(()));
5084 
5085         // In 32bit arch, send_capacity() should be min(2^32+5, cwnd),
5086         // not min(5, cwnd)
5087         assert_eq!(pipe.client.stream_send(4, b"hello, world", true), Ok(12));
5088 
5089         assert_eq!(pipe.advance(&mut buf), Ok(()));
5090 
5091         assert!(!pipe.server.stream_finished(4));
5092     }
5093 
5094     #[test]
empty_stream_frame()5095     fn empty_stream_frame() {
5096         let mut buf = [0; 65535];
5097 
5098         let mut pipe = testing::Pipe::default().unwrap();
5099 
5100         assert_eq!(pipe.handshake(&mut buf), Ok(()));
5101 
5102         let frames = [frame::Frame::Stream {
5103             stream_id: 4,
5104             data: stream::RangeBuf::from(b"aaaaa", 0, false),
5105         }];
5106 
5107         let pkt_type = packet::Type::Short;
5108         assert_eq!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf), Ok(39));
5109 
5110         let mut readable = pipe.server.readable();
5111         assert_eq!(readable.next(), Some(4));
5112 
5113         assert_eq!(pipe.server.stream_recv(4, &mut buf), Ok((5, false)));
5114 
5115         let frames = [frame::Frame::Stream {
5116             stream_id: 4,
5117             data: stream::RangeBuf::from(b"", 5, true),
5118         }];
5119 
5120         let pkt_type = packet::Type::Short;
5121         assert_eq!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf), Ok(39));
5122 
5123         let mut readable = pipe.server.readable();
5124         assert_eq!(readable.next(), Some(4));
5125 
5126         assert_eq!(pipe.server.stream_recv(4, &mut buf), Ok((0, true)));
5127 
5128         let frames = [frame::Frame::Stream {
5129             stream_id: 4,
5130             data: stream::RangeBuf::from(b"", 15, true),
5131         }];
5132 
5133         let pkt_type = packet::Type::Short;
5134         assert_eq!(
5135             pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5136             Err(Error::FinalSize)
5137         );
5138     }
5139 
5140     #[test]
flow_control_limit()5141     fn flow_control_limit() {
5142         let mut buf = [0; 65535];
5143 
5144         let mut pipe = testing::Pipe::default().unwrap();
5145 
5146         assert_eq!(pipe.handshake(&mut buf), Ok(()));
5147 
5148         let frames = [
5149             frame::Frame::Stream {
5150                 stream_id: 4,
5151                 data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaa", 0, false),
5152             },
5153             frame::Frame::Stream {
5154                 stream_id: 8,
5155                 data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaa", 0, false),
5156             },
5157             frame::Frame::Stream {
5158                 stream_id: 12,
5159                 data: stream::RangeBuf::from(b"a", 0, false),
5160             },
5161         ];
5162 
5163         let pkt_type = packet::Type::Short;
5164         assert_eq!(
5165             pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5166             Err(Error::FlowControl),
5167         );
5168     }
5169 
5170     #[test]
flow_control_limit_dup()5171     fn flow_control_limit_dup() {
5172         let mut buf = [0; 65535];
5173 
5174         let mut pipe = testing::Pipe::default().unwrap();
5175 
5176         assert_eq!(pipe.handshake(&mut buf), Ok(()));
5177 
5178         let frames = [
5179             // One byte less than stream limit.
5180             frame::Frame::Stream {
5181                 stream_id: 4,
5182                 data: stream::RangeBuf::from(b"aaaaaaaaaaaaaa", 0, false),
5183             },
5184             // Same stream, but one byte more.
5185             frame::Frame::Stream {
5186                 stream_id: 4,
5187                 data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaa", 0, false),
5188             },
5189             frame::Frame::Stream {
5190                 stream_id: 12,
5191                 data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaa", 0, false),
5192             },
5193         ];
5194 
5195         let pkt_type = packet::Type::Short;
5196         assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
5197     }
5198 
5199     #[test]
flow_control_update()5200     fn flow_control_update() {
5201         let mut buf = [0; 65535];
5202 
5203         let mut pipe = testing::Pipe::default().unwrap();
5204 
5205         assert_eq!(pipe.handshake(&mut buf), Ok(()));
5206 
5207         let frames = [
5208             frame::Frame::Stream {
5209                 stream_id: 4,
5210                 data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaa", 0, false),
5211             },
5212             frame::Frame::Stream {
5213                 stream_id: 8,
5214                 data: stream::RangeBuf::from(b"a", 0, false),
5215             },
5216         ];
5217 
5218         let pkt_type = packet::Type::Short;
5219 
5220         assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
5221 
5222         pipe.server.stream_recv(4, &mut buf).unwrap();
5223         pipe.server.stream_recv(8, &mut buf).unwrap();
5224 
5225         let frames = [frame::Frame::Stream {
5226             stream_id: 8,
5227             data: stream::RangeBuf::from(b"a", 1, false),
5228         }];
5229 
5230         let len = pipe
5231             .send_pkt_to_server(pkt_type, &frames, &mut buf)
5232             .unwrap();
5233 
5234         assert!(len > 0);
5235 
5236         let frames =
5237             testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
5238         let mut iter = frames.iter();
5239 
5240         // Ignore ACK.
5241         iter.next().unwrap();
5242 
5243         assert_eq!(iter.next(), Some(&frame::Frame::MaxData { max: 46 }));
5244     }
5245 
5246     #[test]
stream_flow_control_limit_bidi()5247     fn stream_flow_control_limit_bidi() {
5248         let mut buf = [0; 65535];
5249 
5250         let mut pipe = testing::Pipe::default().unwrap();
5251 
5252         assert_eq!(pipe.handshake(&mut buf), Ok(()));
5253 
5254         let frames = [frame::Frame::Stream {
5255             stream_id: 4,
5256             data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaaa", 0, true),
5257         }];
5258 
5259         let pkt_type = packet::Type::Short;
5260         assert_eq!(
5261             pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5262             Err(Error::FlowControl),
5263         );
5264     }
5265 
5266     #[test]
stream_flow_control_limit_uni()5267     fn stream_flow_control_limit_uni() {
5268         let mut buf = [0; 65535];
5269 
5270         let mut pipe = testing::Pipe::default().unwrap();
5271 
5272         assert_eq!(pipe.handshake(&mut buf), Ok(()));
5273 
5274         let frames = [frame::Frame::Stream {
5275             stream_id: 2,
5276             data: stream::RangeBuf::from(b"aaaaaaaaaaa", 0, true),
5277         }];
5278 
5279         let pkt_type = packet::Type::Short;
5280         assert_eq!(
5281             pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5282             Err(Error::FlowControl),
5283         );
5284     }
5285 
5286     #[test]
stream_flow_control_update()5287     fn stream_flow_control_update() {
5288         let mut buf = [0; 65535];
5289 
5290         let mut pipe = testing::Pipe::default().unwrap();
5291 
5292         assert_eq!(pipe.handshake(&mut buf), Ok(()));
5293 
5294         let frames = [frame::Frame::Stream {
5295             stream_id: 4,
5296             data: stream::RangeBuf::from(b"aaaaaaa", 0, false),
5297         }];
5298 
5299         let pkt_type = packet::Type::Short;
5300 
5301         assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
5302 
5303         pipe.server.stream_recv(4, &mut buf).unwrap();
5304 
5305         let frames = [frame::Frame::Stream {
5306             stream_id: 4,
5307             data: stream::RangeBuf::from(b"a", 7, false),
5308         }];
5309 
5310         let len = pipe
5311             .send_pkt_to_server(pkt_type, &frames, &mut buf)
5312             .unwrap();
5313 
5314         assert!(len > 0);
5315 
5316         let frames =
5317             testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
5318         let mut iter = frames.iter();
5319 
5320         // Ignore ACK.
5321         iter.next().unwrap();
5322 
5323         assert_eq!(
5324             iter.next(),
5325             Some(&frame::Frame::MaxStreamData {
5326                 stream_id: 4,
5327                 max: 22,
5328             })
5329         );
5330     }
5331 
5332     #[test]
stream_limit_bidi()5333     fn stream_limit_bidi() {
5334         let mut buf = [0; 65535];
5335 
5336         let mut pipe = testing::Pipe::default().unwrap();
5337 
5338         assert_eq!(pipe.handshake(&mut buf), Ok(()));
5339 
5340         let frames = [
5341             frame::Frame::Stream {
5342                 stream_id: 4,
5343                 data: stream::RangeBuf::from(b"a", 0, false),
5344             },
5345             frame::Frame::Stream {
5346                 stream_id: 8,
5347                 data: stream::RangeBuf::from(b"a", 0, false),
5348             },
5349             frame::Frame::Stream {
5350                 stream_id: 12,
5351                 data: stream::RangeBuf::from(b"a", 0, false),
5352             },
5353             frame::Frame::Stream {
5354                 stream_id: 16,
5355                 data: stream::RangeBuf::from(b"a", 0, false),
5356             },
5357             frame::Frame::Stream {
5358                 stream_id: 20,
5359                 data: stream::RangeBuf::from(b"a", 0, false),
5360             },
5361             frame::Frame::Stream {
5362                 stream_id: 24,
5363                 data: stream::RangeBuf::from(b"a", 0, false),
5364             },
5365             frame::Frame::Stream {
5366                 stream_id: 28,
5367                 data: stream::RangeBuf::from(b"a", 0, false),
5368             },
5369         ];
5370 
5371         let pkt_type = packet::Type::Short;
5372         assert_eq!(
5373             pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5374             Err(Error::StreamLimit),
5375         );
5376     }
5377 
5378     #[test]
stream_limit_max_bidi()5379     fn stream_limit_max_bidi() {
5380         let mut buf = [0; 65535];
5381 
5382         let mut pipe = testing::Pipe::default().unwrap();
5383 
5384         assert_eq!(pipe.handshake(&mut buf), Ok(()));
5385 
5386         let frames = [frame::Frame::MaxStreamsBidi { max: MAX_STREAM_ID }];
5387 
5388         let pkt_type = packet::Type::Short;
5389         assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
5390 
5391         let frames = [frame::Frame::MaxStreamsBidi {
5392             max: MAX_STREAM_ID + 1,
5393         }];
5394 
5395         let pkt_type = packet::Type::Short;
5396         assert_eq!(
5397             pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5398             Err(Error::InvalidFrame),
5399         );
5400     }
5401 
5402     #[test]
stream_limit_uni()5403     fn stream_limit_uni() {
5404         let mut buf = [0; 65535];
5405 
5406         let mut pipe = testing::Pipe::default().unwrap();
5407 
5408         assert_eq!(pipe.handshake(&mut buf), Ok(()));
5409 
5410         let frames = [
5411             frame::Frame::Stream {
5412                 stream_id: 2,
5413                 data: stream::RangeBuf::from(b"a", 0, false),
5414             },
5415             frame::Frame::Stream {
5416                 stream_id: 6,
5417                 data: stream::RangeBuf::from(b"a", 0, false),
5418             },
5419             frame::Frame::Stream {
5420                 stream_id: 10,
5421                 data: stream::RangeBuf::from(b"a", 0, false),
5422             },
5423             frame::Frame::Stream {
5424                 stream_id: 14,
5425                 data: stream::RangeBuf::from(b"a", 0, false),
5426             },
5427             frame::Frame::Stream {
5428                 stream_id: 18,
5429                 data: stream::RangeBuf::from(b"a", 0, false),
5430             },
5431             frame::Frame::Stream {
5432                 stream_id: 22,
5433                 data: stream::RangeBuf::from(b"a", 0, false),
5434             },
5435             frame::Frame::Stream {
5436                 stream_id: 26,
5437                 data: stream::RangeBuf::from(b"a", 0, false),
5438             },
5439         ];
5440 
5441         let pkt_type = packet::Type::Short;
5442         assert_eq!(
5443             pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5444             Err(Error::StreamLimit),
5445         );
5446     }
5447 
5448     #[test]
stream_limit_max_uni()5449     fn stream_limit_max_uni() {
5450         let mut buf = [0; 65535];
5451 
5452         let mut pipe = testing::Pipe::default().unwrap();
5453 
5454         assert_eq!(pipe.handshake(&mut buf), Ok(()));
5455 
5456         let frames = [frame::Frame::MaxStreamsUni { max: MAX_STREAM_ID }];
5457 
5458         let pkt_type = packet::Type::Short;
5459         assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
5460 
5461         let frames = [frame::Frame::MaxStreamsUni {
5462             max: MAX_STREAM_ID + 1,
5463         }];
5464 
5465         let pkt_type = packet::Type::Short;
5466         assert_eq!(
5467             pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5468             Err(Error::InvalidFrame),
5469         );
5470     }
5471 
5472     #[test]
streams_blocked_max_bidi()5473     fn streams_blocked_max_bidi() {
5474         let mut buf = [0; 65535];
5475 
5476         let mut pipe = testing::Pipe::default().unwrap();
5477 
5478         assert_eq!(pipe.handshake(&mut buf), Ok(()));
5479 
5480         let frames = [frame::Frame::StreamsBlockedBidi {
5481             limit: MAX_STREAM_ID,
5482         }];
5483 
5484         let pkt_type = packet::Type::Short;
5485         assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
5486 
5487         let frames = [frame::Frame::StreamsBlockedBidi {
5488             limit: MAX_STREAM_ID + 1,
5489         }];
5490 
5491         let pkt_type = packet::Type::Short;
5492         assert_eq!(
5493             pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5494             Err(Error::InvalidFrame),
5495         );
5496     }
5497 
5498     #[test]
streams_blocked_max_uni()5499     fn streams_blocked_max_uni() {
5500         let mut buf = [0; 65535];
5501 
5502         let mut pipe = testing::Pipe::default().unwrap();
5503 
5504         assert_eq!(pipe.handshake(&mut buf), Ok(()));
5505 
5506         let frames = [frame::Frame::StreamsBlockedUni {
5507             limit: MAX_STREAM_ID,
5508         }];
5509 
5510         let pkt_type = packet::Type::Short;
5511         assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
5512 
5513         let frames = [frame::Frame::StreamsBlockedUni {
5514             limit: MAX_STREAM_ID + 1,
5515         }];
5516 
5517         let pkt_type = packet::Type::Short;
5518         assert_eq!(
5519             pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5520             Err(Error::InvalidFrame),
5521         );
5522     }
5523 
5524     #[test]
stream_data_overlap()5525     fn stream_data_overlap() {
5526         let mut buf = [0; 65535];
5527 
5528         let mut pipe = testing::Pipe::default().unwrap();
5529 
5530         assert_eq!(pipe.handshake(&mut buf), Ok(()));
5531 
5532         let frames = [
5533             frame::Frame::Stream {
5534                 stream_id: 0,
5535                 data: stream::RangeBuf::from(b"aaaaa", 0, false),
5536             },
5537             frame::Frame::Stream {
5538                 stream_id: 0,
5539                 data: stream::RangeBuf::from(b"bbbbb", 3, false),
5540             },
5541             frame::Frame::Stream {
5542                 stream_id: 0,
5543                 data: stream::RangeBuf::from(b"ccccc", 6, false),
5544             },
5545         ];
5546 
5547         let pkt_type = packet::Type::Short;
5548         assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
5549 
5550         let mut b = [0; 15];
5551         assert_eq!(pipe.server.stream_recv(0, &mut b), Ok((11, false)));
5552         assert_eq!(&b[..11], b"aaaaabbbccc");
5553     }
5554 
5555     #[test]
stream_data_overlap_with_reordering()5556     fn stream_data_overlap_with_reordering() {
5557         let mut buf = [0; 65535];
5558 
5559         let mut pipe = testing::Pipe::default().unwrap();
5560 
5561         assert_eq!(pipe.handshake(&mut buf), Ok(()));
5562 
5563         let frames = [
5564             frame::Frame::Stream {
5565                 stream_id: 0,
5566                 data: stream::RangeBuf::from(b"aaaaa", 0, false),
5567             },
5568             frame::Frame::Stream {
5569                 stream_id: 0,
5570                 data: stream::RangeBuf::from(b"ccccc", 6, false),
5571             },
5572             frame::Frame::Stream {
5573                 stream_id: 0,
5574                 data: stream::RangeBuf::from(b"bbbbb", 3, false),
5575             },
5576         ];
5577 
5578         let pkt_type = packet::Type::Short;
5579         assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
5580 
5581         let mut b = [0; 15];
5582         assert_eq!(pipe.server.stream_recv(0, &mut b), Ok((11, false)));
5583         assert_eq!(&b[..11], b"aaaaabccccc");
5584     }
5585 
5586     #[test]
reset_stream_flow_control()5587     fn reset_stream_flow_control() {
5588         let mut buf = [0; 65535];
5589 
5590         let mut pipe = testing::Pipe::default().unwrap();
5591 
5592         assert_eq!(pipe.handshake(&mut buf), Ok(()));
5593 
5594         let frames = [
5595             frame::Frame::Stream {
5596                 stream_id: 4,
5597                 data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaa", 0, false),
5598             },
5599             frame::Frame::Stream {
5600                 stream_id: 8,
5601                 data: stream::RangeBuf::from(b"a", 0, false),
5602             },
5603             frame::Frame::ResetStream {
5604                 stream_id: 8,
5605                 error_code: 0,
5606                 final_size: 15,
5607             },
5608             frame::Frame::Stream {
5609                 stream_id: 12,
5610                 data: stream::RangeBuf::from(b"a", 0, false),
5611             },
5612         ];
5613 
5614         let pkt_type = packet::Type::Short;
5615         assert_eq!(
5616             pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
5617             Err(Error::FlowControl),
5618         );
5619     }
5620 
5621     #[test]
path_challenge()5622     fn path_challenge() {
5623         let mut buf = [0; 65535];
5624 
5625         let mut pipe = testing::Pipe::default().unwrap();
5626 
5627         assert_eq!(pipe.handshake(&mut buf), Ok(()));
5628 
5629         let frames = [frame::Frame::PathChallenge {
5630             data: vec![0xba; 8],
5631         }];
5632 
5633         let pkt_type = packet::Type::Short;
5634 
5635         let len = pipe
5636             .send_pkt_to_server(pkt_type, &frames, &mut buf)
5637             .unwrap();
5638 
5639         assert!(len > 0);
5640 
5641         let frames =
5642             testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
5643         let mut iter = frames.iter();
5644 
5645         // Ignore ACK.
5646         iter.next().unwrap();
5647 
5648         assert_eq!(
5649             iter.next(),
5650             Some(&frame::Frame::PathResponse {
5651                 data: vec![0xba; 8],
5652             })
5653         );
5654     }
5655 
5656     #[test]
5657     /// Simulates reception of an early 1-RTT packet on the server, by
5658     /// delaying the client's Handshake packet that completes the handshake.
early_1rtt_packet()5659     fn early_1rtt_packet() {
5660         let mut buf = [0; 65535];
5661 
5662         let mut pipe = testing::Pipe::default().unwrap();
5663 
5664         // Client sends initial flight
5665         let mut len = pipe.client.send(&mut buf).unwrap();
5666 
5667         // Server sends initial flight..
5668         len = testing::recv_send(&mut pipe.server, &mut buf, len).unwrap();
5669 
5670         // Client sends Handshake packet.
5671         len = testing::recv_send(&mut pipe.client, &mut buf, len).unwrap();
5672 
5673         // Emulate handshake packet delay by not making server process client
5674         // packet.
5675         let mut delayed = (&buf[..len]).to_vec();
5676         testing::recv_send(&mut pipe.server, &mut buf, 0).unwrap();
5677 
5678         assert!(pipe.client.is_established());
5679 
5680         // Send 1-RTT packet #0.
5681         let frames = [frame::Frame::Stream {
5682             stream_id: 0,
5683             data: stream::RangeBuf::from(b"hello, world", 0, true),
5684         }];
5685 
5686         let pkt_type = packet::Type::Short;
5687         let written =
5688             testing::encode_pkt(&mut pipe.client, pkt_type, &frames, &mut buf)
5689                 .unwrap();
5690         assert_eq!(pipe.server.recv(&mut buf[..written]), Ok(written));
5691 
5692         // Send 1-RTT packet #1.
5693         let frames = [frame::Frame::Stream {
5694             stream_id: 4,
5695             data: stream::RangeBuf::from(b"hello, world", 0, true),
5696         }];
5697 
5698         let written =
5699             testing::encode_pkt(&mut pipe.client, pkt_type, &frames, &mut buf)
5700                 .unwrap();
5701         assert_eq!(pipe.server.recv(&mut buf[..written]), Ok(written));
5702 
5703         assert!(!pipe.server.is_established());
5704 
5705         // Client sent 1-RTT packets 0 and 1, but server hasn't received them.
5706         //
5707         // Note that `largest_rx_pkt_num` is initialized to 0, so we need to
5708         // send another 1-RTT packet to make this check meaningful.
5709         assert_eq!(
5710             pipe.server.pkt_num_spaces[packet::EPOCH_APPLICATION]
5711                 .largest_rx_pkt_num,
5712             0
5713         );
5714 
5715         // Process delayed packet.
5716         pipe.server.recv(&mut delayed).unwrap();
5717 
5718         assert!(pipe.server.is_established());
5719 
5720         assert_eq!(
5721             pipe.server.pkt_num_spaces[packet::EPOCH_APPLICATION]
5722                 .largest_rx_pkt_num,
5723             0
5724         );
5725     }
5726 
5727     #[test]
stream_shutdown_read()5728     fn stream_shutdown_read() {
5729         let mut buf = [0; 65535];
5730 
5731         let mut pipe = testing::Pipe::default().unwrap();
5732 
5733         assert_eq!(pipe.handshake(&mut buf), Ok(()));
5734 
5735         assert_eq!(pipe.client.stream_send(4, b"hello, world", false), Ok(12));
5736         assert_eq!(pipe.advance(&mut buf), Ok(()));
5737 
5738         let mut r = pipe.server.readable();
5739         assert_eq!(r.next(), Some(4));
5740         assert_eq!(r.next(), None);
5741 
5742         assert_eq!(pipe.server.stream_shutdown(4, Shutdown::Read, 0), Ok(()));
5743 
5744         let mut r = pipe.server.readable();
5745         assert_eq!(r.next(), None);
5746 
5747         assert_eq!(pipe.client.stream_send(4, b"bye", false), Ok(3));
5748         assert_eq!(pipe.advance(&mut buf), Ok(()));
5749 
5750         let mut r = pipe.server.readable();
5751         assert_eq!(r.next(), None);
5752 
5753         assert_eq!(
5754             pipe.server.stream_shutdown(4, Shutdown::Read, 0),
5755             Err(Error::Done)
5756         );
5757     }
5758 
5759     #[test]
stream_shutdown_write()5760     fn stream_shutdown_write() {
5761         let mut buf = [0; 65535];
5762 
5763         let mut pipe = testing::Pipe::default().unwrap();
5764 
5765         assert_eq!(pipe.handshake(&mut buf), Ok(()));
5766 
5767         assert_eq!(pipe.client.stream_send(4, b"hello, world", false), Ok(12));
5768         assert_eq!(pipe.advance(&mut buf), Ok(()));
5769 
5770         let mut r = pipe.server.readable();
5771         assert_eq!(r.next(), Some(4));
5772         assert_eq!(r.next(), None);
5773 
5774         let mut b = [0; 15];
5775         pipe.server.stream_recv(4, &mut b).unwrap();
5776 
5777         assert_eq!(pipe.client.stream_send(4, b"a", false), Ok(1));
5778         assert_eq!(pipe.client.stream_shutdown(4, Shutdown::Write, 0), Ok(()));
5779         assert_eq!(pipe.advance(&mut buf), Ok(()));
5780 
5781         let mut r = pipe.server.readable();
5782         assert_eq!(r.next(), None);
5783 
5784         assert_eq!(pipe.client.stream_send(4, b"bye", false), Ok(3));
5785         assert_eq!(pipe.advance(&mut buf), Ok(()));
5786 
5787         let mut r = pipe.server.readable();
5788         assert_eq!(r.next(), None);
5789 
5790         assert_eq!(
5791             pipe.client.stream_shutdown(4, Shutdown::Write, 0),
5792             Err(Error::Done)
5793         );
5794     }
5795 
5796     #[test]
5797     /// Tests that the order of flushable streams scheduled on the wire is the
5798     /// same as the order of `stream_send()` calls done by the application.
stream_round_robin()5799     fn stream_round_robin() {
5800         let mut buf = [0; 65535];
5801 
5802         let mut pipe = testing::Pipe::default().unwrap();
5803 
5804         assert_eq!(pipe.handshake(&mut buf), Ok(()));
5805 
5806         assert_eq!(pipe.client.stream_send(8, b"aaaaa", false), Ok(5));
5807         assert_eq!(pipe.client.stream_send(0, b"aaaaa", false), Ok(5));
5808         assert_eq!(pipe.client.stream_send(4, b"aaaaa", false), Ok(5));
5809 
5810         let len = pipe.client.send(&mut buf).unwrap();
5811 
5812         let frames =
5813             testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
5814 
5815         assert_eq!(
5816             frames.iter().next(),
5817             Some(&frame::Frame::Stream {
5818                 stream_id: 8,
5819                 data: stream::RangeBuf::from(b"aaaaa", 0, false),
5820             })
5821         );
5822 
5823         let len = pipe.client.send(&mut buf).unwrap();
5824 
5825         let frames =
5826             testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
5827 
5828         assert_eq!(
5829             frames.iter().next(),
5830             Some(&frame::Frame::Stream {
5831                 stream_id: 0,
5832                 data: stream::RangeBuf::from(b"aaaaa", 0, false),
5833             })
5834         );
5835 
5836         let len = pipe.client.send(&mut buf).unwrap();
5837 
5838         let frames =
5839             testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
5840 
5841         assert_eq!(
5842             frames.iter().next(),
5843             Some(&frame::Frame::Stream {
5844                 stream_id: 4,
5845                 data: stream::RangeBuf::from(b"aaaaa", 0, false),
5846             })
5847         );
5848     }
5849 
5850     #[test]
5851     /// Tests the readable iterator.
stream_readable()5852     fn stream_readable() {
5853         let mut buf = [0; 65535];
5854 
5855         let mut pipe = testing::Pipe::default().unwrap();
5856 
5857         assert_eq!(pipe.handshake(&mut buf), Ok(()));
5858 
5859         // No readable streams.
5860         let mut r = pipe.client.readable();
5861         assert_eq!(r.next(), None);
5862 
5863         assert_eq!(pipe.client.stream_send(4, b"aaaaa", false), Ok(5));
5864 
5865         let mut r = pipe.client.readable();
5866         assert_eq!(r.next(), None);
5867 
5868         let mut r = pipe.server.readable();
5869         assert_eq!(r.next(), None);
5870 
5871         assert_eq!(pipe.advance(&mut buf), Ok(()));
5872 
5873         // Server received stream.
5874         let mut r = pipe.server.readable();
5875         assert_eq!(r.next(), Some(4));
5876         assert_eq!(r.next(), None);
5877 
5878         assert_eq!(
5879             pipe.server.stream_send(4, b"aaaaaaaaaaaaaaa", false),
5880             Ok(15)
5881         );
5882         assert_eq!(pipe.advance(&mut buf), Ok(()));
5883 
5884         let mut r = pipe.client.readable();
5885         assert_eq!(r.next(), Some(4));
5886         assert_eq!(r.next(), None);
5887 
5888         // Client drains stream.
5889         let mut b = [0; 15];
5890         pipe.client.stream_recv(4, &mut b).unwrap();
5891         assert_eq!(pipe.advance(&mut buf), Ok(()));
5892 
5893         let mut r = pipe.client.readable();
5894         assert_eq!(r.next(), None);
5895 
5896         // Server shuts down stream.
5897         let mut r = pipe.server.readable();
5898         assert_eq!(r.next(), Some(4));
5899         assert_eq!(r.next(), None);
5900 
5901         assert_eq!(pipe.server.stream_shutdown(4, Shutdown::Read, 0), Ok(()));
5902 
5903         let mut r = pipe.server.readable();
5904         assert_eq!(r.next(), None);
5905 
5906         // Client creates multiple streams.
5907         assert_eq!(pipe.client.stream_send(8, b"aaaaa", false), Ok(5));
5908         assert_eq!(pipe.advance(&mut buf), Ok(()));
5909 
5910         assert_eq!(pipe.client.stream_send(12, b"aaaaa", false), Ok(5));
5911         assert_eq!(pipe.advance(&mut buf), Ok(()));
5912 
5913         let mut r = pipe.server.readable();
5914         assert_eq!(r.len(), 2);
5915 
5916         assert!(r.next().is_some());
5917         assert!(r.next().is_some());
5918         assert!(r.next().is_none());
5919 
5920         assert_eq!(r.len(), 0);
5921     }
5922 
5923     #[test]
5924     /// Tests the writable iterator.
stream_writable()5925     fn stream_writable() {
5926         let mut buf = [0; 65535];
5927 
5928         let mut pipe = testing::Pipe::default().unwrap();
5929 
5930         assert_eq!(pipe.handshake(&mut buf), Ok(()));
5931 
5932         // No writable streams.
5933         let mut w = pipe.client.writable();
5934         assert_eq!(w.next(), None);
5935 
5936         assert_eq!(pipe.client.stream_send(4, b"aaaaa", false), Ok(5));
5937 
5938         // Client created stream.
5939         let mut w = pipe.client.writable();
5940         assert_eq!(w.next(), Some(4));
5941         assert_eq!(w.next(), None);
5942 
5943         assert_eq!(pipe.advance(&mut buf), Ok(()));
5944 
5945         // Server created stream.
5946         let mut w = pipe.server.writable();
5947         assert_eq!(w.next(), Some(4));
5948         assert_eq!(w.next(), None);
5949 
5950         assert_eq!(
5951             pipe.server.stream_send(4, b"aaaaaaaaaaaaaaa", false),
5952             Ok(15)
5953         );
5954 
5955         // Server stream is full.
5956         let mut w = pipe.server.writable();
5957         assert_eq!(w.next(), None);
5958 
5959         assert_eq!(pipe.advance(&mut buf), Ok(()));
5960 
5961         // Client drains stream.
5962         let mut b = [0; 15];
5963         pipe.client.stream_recv(4, &mut b).unwrap();
5964         assert_eq!(pipe.advance(&mut buf), Ok(()));
5965 
5966         // Server stream is writable again.
5967         let mut w = pipe.server.writable();
5968         assert_eq!(w.next(), Some(4));
5969         assert_eq!(w.next(), None);
5970 
5971         // Server suts down stream.
5972         assert_eq!(pipe.server.stream_shutdown(4, Shutdown::Write, 0), Ok(()));
5973 
5974         let mut w = pipe.server.writable();
5975         assert_eq!(w.next(), None);
5976 
5977         // Client creates multiple streams.
5978         assert_eq!(pipe.client.stream_send(8, b"aaaaa", false), Ok(5));
5979         assert_eq!(pipe.advance(&mut buf), Ok(()));
5980 
5981         assert_eq!(pipe.client.stream_send(12, b"aaaaa", false), Ok(5));
5982         assert_eq!(pipe.advance(&mut buf), Ok(()));
5983 
5984         let mut w = pipe.server.writable();
5985         assert_eq!(w.len(), 2);
5986 
5987         assert!(w.next().is_some());
5988         assert!(w.next().is_some());
5989         assert!(w.next().is_none());
5990 
5991         assert_eq!(w.len(), 0);
5992 
5993         // Server finishes stream.
5994         assert_eq!(pipe.server.stream_send(12, b"aaaaa", true), Ok(5));
5995 
5996         let mut w = pipe.server.writable();
5997         assert_eq!(w.next(), Some(8));
5998         assert_eq!(w.next(), None);
5999     }
6000 
6001     #[test]
6002     /// Tests that we don't exceed the per-connection flow control limit set by
6003     /// the peer.
flow_control_limit_send()6004     fn flow_control_limit_send() {
6005         let mut buf = [0; 65535];
6006 
6007         let mut pipe = testing::Pipe::default().unwrap();
6008 
6009         assert_eq!(pipe.handshake(&mut buf), Ok(()));
6010 
6011         assert_eq!(
6012             pipe.client.stream_send(0, b"aaaaaaaaaaaaaaa", false),
6013             Ok(15)
6014         );
6015         assert_eq!(pipe.advance(&mut buf), Ok(()));
6016         assert_eq!(
6017             pipe.client.stream_send(4, b"aaaaaaaaaaaaaaa", false),
6018             Ok(15)
6019         );
6020         assert_eq!(pipe.advance(&mut buf), Ok(()));
6021         assert_eq!(pipe.client.stream_send(8, b"a", false), Ok(0));
6022         assert_eq!(pipe.advance(&mut buf), Ok(()));
6023 
6024         let mut r = pipe.server.readable();
6025         assert!(r.next().is_some());
6026         assert!(r.next().is_some());
6027         assert!(r.next().is_none());
6028     }
6029 
6030     #[test]
6031     /// Tests that invalid packets received before any other valid ones cause
6032     /// the server to close the connection immediately.
invalid_initial_server()6033     fn invalid_initial_server() {
6034         let mut buf = [0; 65535];
6035         let mut pipe = testing::Pipe::default().unwrap();
6036 
6037         let frames = [frame::Frame::Padding { len: 10 }];
6038 
6039         let written = testing::encode_pkt(
6040             &mut pipe.client,
6041             packet::Type::Initial,
6042             &frames,
6043             &mut buf,
6044         )
6045         .unwrap();
6046 
6047         // Corrupt the packets's last byte to make decryption fail (the last
6048         // byte is part of the AEAD tag, so changing it means that the packet
6049         // cannot be authenticated during decryption).
6050         buf[written - 1] = !buf[written - 1];
6051 
6052         assert_eq!(pipe.server.timeout(), None);
6053 
6054         assert_eq!(
6055             pipe.server.recv(&mut buf[..written]),
6056             Err(Error::CryptoFail)
6057         );
6058 
6059         assert!(pipe.server.is_closed());
6060     }
6061 
6062     #[test]
6063     /// Tests that invalid Initial packets received to cause
6064     /// the client to close the connection immediately.
invalid_initial_client()6065     fn invalid_initial_client() {
6066         let mut buf = [0; 65535];
6067         let mut pipe = testing::Pipe::default().unwrap();
6068 
6069         // Client sends initial flight.
6070         let len = pipe.client.send(&mut buf).unwrap();
6071 
6072         // Server sends initial flight.
6073         assert_eq!(pipe.server.recv(&mut buf[..len]), Ok(1200));
6074 
6075         let frames = [frame::Frame::Padding { len: 10 }];
6076 
6077         let written = testing::encode_pkt(
6078             &mut pipe.server,
6079             packet::Type::Initial,
6080             &frames,
6081             &mut buf,
6082         )
6083         .unwrap();
6084 
6085         // Corrupt the packets's last byte to make decryption fail (the last
6086         // byte is part of the AEAD tag, so changing it means that the packet
6087         // cannot be authenticated during decryption).
6088         buf[written - 1] = !buf[written - 1];
6089 
6090         // Client will ignore invalid packet.
6091         assert_eq!(pipe.client.recv(&mut buf[..written]), Ok(68));
6092 
6093         // The connection should be alive...
6094         assert_eq!(pipe.client.is_closed(), false);
6095 
6096         // ...and the idle timeout should be armed.
6097         assert!(pipe.client.idle_timer.is_some());
6098     }
6099 
6100     #[test]
6101     /// Tests that packets with invalid payload length received before any other
6102     /// valid packet cause the server to close the connection immediately.
invalid_initial_payload()6103     fn invalid_initial_payload() {
6104         let mut buf = [0; 65535];
6105         let mut pipe = testing::Pipe::default().unwrap();
6106 
6107         let mut b = octets::OctetsMut::with_slice(&mut buf);
6108 
6109         let epoch = packet::Type::Initial.to_epoch().unwrap();
6110 
6111         let pn = 0;
6112         let pn_len = packet::pkt_num_len(pn).unwrap();
6113 
6114         let hdr = Header {
6115             ty: packet::Type::Initial,
6116             version: pipe.client.version,
6117             dcid: pipe.client.dcid.clone(),
6118             scid: pipe.client.scid.clone(),
6119             pkt_num: 0,
6120             pkt_num_len: pn_len,
6121             token: pipe.client.token.clone(),
6122             versions: None,
6123             key_phase: false,
6124         };
6125 
6126         hdr.to_bytes(&mut b).unwrap();
6127 
6128         // Payload length is invalid!!!
6129         let payload_len = 4096;
6130 
6131         let len = pn_len + payload_len;
6132         b.put_varint(len as u64).unwrap();
6133 
6134         packet::encode_pkt_num(pn, &mut b).unwrap();
6135 
6136         let payload_offset = b.off();
6137 
6138         let frames = [frame::Frame::Padding { len: 10 }];
6139 
6140         for frame in &frames {
6141             frame.to_bytes(&mut b).unwrap();
6142         }
6143 
6144         let space = &mut pipe.client.pkt_num_spaces[epoch];
6145 
6146         // Use correct payload length when encrypting the packet.
6147         let payload_len = frames.iter().fold(0, |acc, x| acc + x.wire_len()) +
6148             space.crypto_overhead().unwrap();
6149 
6150         let aead = space.crypto_seal.as_ref().unwrap();
6151 
6152         let written = packet::encrypt_pkt(
6153             &mut b,
6154             pn,
6155             pn_len,
6156             payload_len,
6157             payload_offset,
6158             aead,
6159         )
6160         .unwrap();
6161 
6162         assert_eq!(pipe.server.timeout(), None);
6163 
6164         assert_eq!(
6165             pipe.server.recv(&mut buf[..written]),
6166             Err(Error::BufferTooShort)
6167         );
6168 
6169         assert!(pipe.server.is_closed());
6170     }
6171 
6172     #[test]
6173     /// Tests that invalid packets don't cause the connection to be closed.
invalid_packet()6174     fn invalid_packet() {
6175         let mut buf = [0; 65535];
6176         let mut pipe = testing::Pipe::default().unwrap();
6177 
6178         assert_eq!(pipe.handshake(&mut buf), Ok(()));
6179 
6180         let frames = [frame::Frame::Padding { len: 10 }];
6181 
6182         let written = testing::encode_pkt(
6183             &mut pipe.client,
6184             packet::Type::Short,
6185             &frames,
6186             &mut buf,
6187         )
6188         .unwrap();
6189 
6190         // Corrupt the packets's last byte to make decryption fail (the last
6191         // byte is part of the AEAD tag, so changing it means that the packet
6192         // cannot be authenticated during decryption).
6193         buf[written - 1] = !buf[written - 1];
6194 
6195         assert_eq!(pipe.server.recv(&mut buf[..written]), Ok(written));
6196 
6197         // Corrupt the packets's first byte to make the header fail decoding.
6198         buf[0] = 255;
6199 
6200         assert_eq!(pipe.server.recv(&mut buf[..written]), Ok(written));
6201     }
6202 
6203     #[test]
6204     /// Tests that the MAX_STREAMS frame is sent for bidirectional streams.
stream_limit_update_bidi()6205     fn stream_limit_update_bidi() {
6206         let mut buf = [0; 65535];
6207 
6208         let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
6209         config
6210             .load_cert_chain_from_pem_file("examples/cert.crt")
6211             .unwrap();
6212         config
6213             .load_priv_key_from_pem_file("examples/cert.key")
6214             .unwrap();
6215         config
6216             .set_application_protos(b"\x06proto1\x06proto2")
6217             .unwrap();
6218         config.set_initial_max_data(30);
6219         config.set_initial_max_stream_data_bidi_local(15);
6220         config.set_initial_max_stream_data_bidi_remote(15);
6221         config.set_initial_max_stream_data_uni(10);
6222         config.set_initial_max_streams_bidi(3);
6223         config.set_initial_max_streams_uni(0);
6224         config.verify_peer(false);
6225 
6226         let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
6227         assert_eq!(pipe.handshake(&mut buf), Ok(()));
6228 
6229         // Client sends stream data.
6230         assert_eq!(pipe.client.stream_send(0, b"a", false), Ok(1));
6231         assert_eq!(pipe.advance(&mut buf), Ok(()));
6232 
6233         assert_eq!(pipe.client.stream_send(4, b"a", false), Ok(1));
6234         assert_eq!(pipe.advance(&mut buf), Ok(()));
6235 
6236         assert_eq!(pipe.client.stream_send(4, b"b", true), Ok(1));
6237         assert_eq!(pipe.advance(&mut buf), Ok(()));
6238 
6239         assert_eq!(pipe.client.stream_send(0, b"b", true), Ok(1));
6240         assert_eq!(pipe.advance(&mut buf), Ok(()));
6241 
6242         // Server reads stream data.
6243         let mut b = [0; 15];
6244         pipe.server.stream_recv(0, &mut b).unwrap();
6245         pipe.server.stream_recv(4, &mut b).unwrap();
6246         assert_eq!(pipe.advance(&mut buf), Ok(()));
6247 
6248         // Server sends stream data, with fin.
6249         assert_eq!(pipe.server.stream_send(0, b"a", false), Ok(1));
6250         assert_eq!(pipe.advance(&mut buf), Ok(()));
6251 
6252         assert_eq!(pipe.server.stream_send(4, b"a", false), Ok(1));
6253         assert_eq!(pipe.advance(&mut buf), Ok(()));
6254 
6255         assert_eq!(pipe.server.stream_send(4, b"b", true), Ok(1));
6256         assert_eq!(pipe.advance(&mut buf), Ok(()));
6257 
6258         assert_eq!(pipe.server.stream_send(0, b"b", true), Ok(1));
6259 
6260         // Server sends MAX_STREAMS.
6261         assert_eq!(pipe.advance(&mut buf), Ok(()));
6262 
6263         // Client tries to create new streams.
6264         assert_eq!(pipe.client.stream_send(8, b"a", false), Ok(1));
6265         assert_eq!(pipe.advance(&mut buf), Ok(()));
6266 
6267         assert_eq!(pipe.client.stream_send(12, b"a", false), Ok(1));
6268         assert_eq!(pipe.advance(&mut buf), Ok(()));
6269 
6270         assert_eq!(pipe.client.stream_send(16, b"a", false), Ok(1));
6271         assert_eq!(pipe.advance(&mut buf), Ok(()));
6272 
6273         assert_eq!(
6274             pipe.client.stream_send(20, b"a", false),
6275             Err(Error::StreamLimit)
6276         );
6277 
6278         assert_eq!(pipe.server.readable().len(), 3);
6279     }
6280 
6281     #[test]
6282     /// Tests that the MAX_STREAMS frame is sent for unirectional streams.
stream_limit_update_uni()6283     fn stream_limit_update_uni() {
6284         let mut buf = [0; 65535];
6285 
6286         let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
6287         config
6288             .load_cert_chain_from_pem_file("examples/cert.crt")
6289             .unwrap();
6290         config
6291             .load_priv_key_from_pem_file("examples/cert.key")
6292             .unwrap();
6293         config
6294             .set_application_protos(b"\x06proto1\x06proto2")
6295             .unwrap();
6296         config.set_initial_max_data(30);
6297         config.set_initial_max_stream_data_bidi_local(15);
6298         config.set_initial_max_stream_data_bidi_remote(15);
6299         config.set_initial_max_stream_data_uni(10);
6300         config.set_initial_max_streams_bidi(0);
6301         config.set_initial_max_streams_uni(3);
6302         config.verify_peer(false);
6303 
6304         let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
6305         assert_eq!(pipe.handshake(&mut buf), Ok(()));
6306 
6307         // Client sends stream data.
6308         assert_eq!(pipe.client.stream_send(2, b"a", false), Ok(1));
6309         assert_eq!(pipe.advance(&mut buf), Ok(()));
6310 
6311         assert_eq!(pipe.client.stream_send(6, b"a", false), Ok(1));
6312         assert_eq!(pipe.advance(&mut buf), Ok(()));
6313 
6314         assert_eq!(pipe.client.stream_send(6, b"b", true), Ok(1));
6315         assert_eq!(pipe.advance(&mut buf), Ok(()));
6316 
6317         assert_eq!(pipe.client.stream_send(2, b"b", true), Ok(1));
6318         assert_eq!(pipe.advance(&mut buf), Ok(()));
6319 
6320         // Server reads stream data.
6321         let mut b = [0; 15];
6322         pipe.server.stream_recv(2, &mut b).unwrap();
6323         pipe.server.stream_recv(6, &mut b).unwrap();
6324 
6325         // Server sends MAX_STREAMS.
6326         assert_eq!(pipe.advance(&mut buf), Ok(()));
6327 
6328         // Client tries to create new streams.
6329         assert_eq!(pipe.client.stream_send(10, b"a", false), Ok(1));
6330         assert_eq!(pipe.advance(&mut buf), Ok(()));
6331 
6332         assert_eq!(pipe.client.stream_send(14, b"a", false), Ok(1));
6333         assert_eq!(pipe.advance(&mut buf), Ok(()));
6334 
6335         assert_eq!(pipe.client.stream_send(18, b"a", false), Ok(1));
6336         assert_eq!(pipe.advance(&mut buf), Ok(()));
6337 
6338         assert_eq!(
6339             pipe.client.stream_send(22, b"a", false),
6340             Err(Error::StreamLimit)
6341         );
6342 
6343         assert_eq!(pipe.server.readable().len(), 3);
6344     }
6345 
6346     #[test]
6347     /// Tests that the stream's fin flag is properly flushed even if there's no
6348     /// data in the buffer, and that the buffer becomes readable on the other
6349     /// side.
stream_zero_length_fin()6350     fn stream_zero_length_fin() {
6351         let mut buf = [0; 65535];
6352 
6353         let mut pipe = testing::Pipe::default().unwrap();
6354 
6355         assert_eq!(pipe.handshake(&mut buf), Ok(()));
6356 
6357         assert_eq!(
6358             pipe.client.stream_send(0, b"aaaaaaaaaaaaaaa", false),
6359             Ok(15)
6360         );
6361         assert_eq!(pipe.advance(&mut buf), Ok(()));
6362 
6363         let mut r = pipe.server.readable();
6364         assert_eq!(r.next(), Some(0));
6365         assert!(r.next().is_none());
6366 
6367         let mut b = [0; 15];
6368         pipe.server.stream_recv(0, &mut b).unwrap();
6369         assert_eq!(pipe.advance(&mut buf), Ok(()));
6370 
6371         // Client sends zero-length frame.
6372         assert_eq!(pipe.client.stream_send(0, b"", true), Ok(0));
6373         assert_eq!(pipe.advance(&mut buf), Ok(()));
6374 
6375         // Stream should be readable on the server after receiving empty fin.
6376         let mut r = pipe.server.readable();
6377         assert_eq!(r.next(), Some(0));
6378         assert!(r.next().is_none());
6379 
6380         let mut b = [0; 15];
6381         pipe.server.stream_recv(0, &mut b).unwrap();
6382         assert_eq!(pipe.advance(&mut buf), Ok(()));
6383 
6384         // Client sends zero-length frame (again).
6385         assert_eq!(pipe.client.stream_send(0, b"", true), Ok(0));
6386         assert_eq!(pipe.advance(&mut buf), Ok(()));
6387 
6388         // Stream should _not_ be readable on the server after receiving empty
6389         // fin, because it was already finished.
6390         let mut r = pipe.server.readable();
6391         assert_eq!(r.next(), None);
6392     }
6393 
6394     #[test]
6395     /// Tests that completed streams are garbage collected.
collect_streams()6396     fn collect_streams() {
6397         let mut buf = [0; 65535];
6398 
6399         let mut pipe = testing::Pipe::default().unwrap();
6400 
6401         assert_eq!(pipe.handshake(&mut buf), Ok(()));
6402 
6403         assert_eq!(pipe.client.streams.len(), 0);
6404         assert_eq!(pipe.server.streams.len(), 0);
6405 
6406         assert_eq!(pipe.client.stream_send(0, b"aaaaa", true), Ok(5));
6407         assert_eq!(pipe.advance(&mut buf), Ok(()));
6408 
6409         assert!(!pipe.client.stream_finished(0));
6410         assert!(!pipe.server.stream_finished(0));
6411 
6412         assert_eq!(pipe.client.streams.len(), 1);
6413         assert_eq!(pipe.server.streams.len(), 1);
6414 
6415         let mut b = [0; 5];
6416         pipe.server.stream_recv(0, &mut b).unwrap();
6417         assert_eq!(pipe.advance(&mut buf), Ok(()));
6418 
6419         assert_eq!(pipe.server.stream_send(0, b"aaaaa", true), Ok(5));
6420         assert_eq!(pipe.advance(&mut buf), Ok(()));
6421 
6422         assert!(!pipe.client.stream_finished(0));
6423         assert!(pipe.server.stream_finished(0));
6424 
6425         assert_eq!(pipe.client.streams.len(), 1);
6426         assert_eq!(pipe.server.streams.len(), 0);
6427 
6428         let mut b = [0; 5];
6429         pipe.client.stream_recv(0, &mut b).unwrap();
6430         assert_eq!(pipe.advance(&mut buf), Ok(()));
6431 
6432         assert_eq!(pipe.client.streams.len(), 0);
6433         assert_eq!(pipe.server.streams.len(), 0);
6434 
6435         assert!(pipe.client.stream_finished(0));
6436         assert!(pipe.server.stream_finished(0));
6437 
6438         assert_eq!(pipe.client.stream_send(0, b"", true), Err(Error::Done));
6439 
6440         let frames = [frame::Frame::Stream {
6441             stream_id: 0,
6442             data: stream::RangeBuf::from(b"aa", 0, false),
6443         }];
6444 
6445         let pkt_type = packet::Type::Short;
6446         assert_eq!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf), Ok(39));
6447     }
6448 
6449     #[test]
config_set_cc_algorithm_name()6450     fn config_set_cc_algorithm_name() {
6451         let mut config = Config::new(PROTOCOL_VERSION).unwrap();
6452 
6453         assert_eq!(config.set_cc_algorithm_name("reno"), Ok(()));
6454 
6455         // Unknown name.
6456         assert_eq!(
6457             config.set_cc_algorithm_name("???"),
6458             Err(Error::CongestionControl)
6459         );
6460     }
6461 
6462     #[test]
peer_cert()6463     fn peer_cert() {
6464         let mut buf = [0; 65535];
6465 
6466         let mut pipe = testing::Pipe::default().unwrap();
6467 
6468         assert_eq!(pipe.handshake(&mut buf), Ok(()));
6469 
6470         match pipe.client.peer_cert() {
6471             Some(c) => assert_eq!(c.len(), 753),
6472 
6473             None => panic!("missing server certificate"),
6474         }
6475     }
6476 
6477     #[test]
retry()6478     fn retry() {
6479         let mut buf = [0; 65535];
6480 
6481         let mut config = Config::new(PROTOCOL_VERSION).unwrap();
6482         config
6483             .load_cert_chain_from_pem_file("examples/cert.crt")
6484             .unwrap();
6485         config
6486             .load_priv_key_from_pem_file("examples/cert.key")
6487             .unwrap();
6488         config
6489             .set_application_protos(b"\x06proto1\06proto2")
6490             .unwrap();
6491 
6492         let mut pipe = testing::Pipe::with_server_config(&mut config).unwrap();
6493 
6494         // Client sends initial flight.
6495         let mut len = pipe.client.send(&mut buf).unwrap();
6496 
6497         // Server sends Retry packet.
6498         let hdr = Header::from_slice(&mut buf[..len], MAX_CONN_ID_LEN).unwrap();
6499 
6500         let odcid = hdr.dcid.to_vec();
6501 
6502         let mut scid = [0; MAX_CONN_ID_LEN];
6503         rand::rand_bytes(&mut scid[..]);
6504 
6505         let token = b"quiche test retry token";
6506 
6507         len = packet::retry(
6508             &hdr.scid,
6509             &hdr.dcid,
6510             &scid,
6511             token,
6512             hdr.version,
6513             &mut buf,
6514         )
6515         .unwrap();
6516 
6517         // Client receives Retry and sends new Initial.
6518         assert_eq!(pipe.client.recv(&mut buf[..len]), Ok(len));
6519 
6520         len = pipe.client.send(&mut buf).unwrap();
6521 
6522         let hdr = Header::from_slice(&mut buf[..len], MAX_CONN_ID_LEN).unwrap();
6523         assert_eq!(&hdr.token.unwrap(), token);
6524 
6525         // Server accepts connection and send first flight.
6526         pipe.server = accept(&scid, Some(&odcid), &mut config).unwrap();
6527 
6528         len = testing::recv_send(&mut pipe.server, &mut buf, len).unwrap();
6529         len = testing::recv_send(&mut pipe.client, &mut buf, len).unwrap();
6530         testing::recv_send(&mut pipe.server, &mut buf, len).unwrap();
6531 
6532         assert!(pipe.client.is_established());
6533         assert!(pipe.server.is_established());
6534     }
6535 
6536     #[test]
missing_retry_source_connection_id()6537     fn missing_retry_source_connection_id() {
6538         let mut buf = [0; 65535];
6539 
6540         let mut config = Config::new(PROTOCOL_VERSION).unwrap();
6541         config
6542             .load_cert_chain_from_pem_file("examples/cert.crt")
6543             .unwrap();
6544         config
6545             .load_priv_key_from_pem_file("examples/cert.key")
6546             .unwrap();
6547         config
6548             .set_application_protos(b"\x06proto1\06proto2")
6549             .unwrap();
6550 
6551         let mut pipe = testing::Pipe::with_server_config(&mut config).unwrap();
6552 
6553         // Client sends initial flight.
6554         let mut len = pipe.client.send(&mut buf).unwrap();
6555 
6556         // Server sends Retry packet.
6557         let hdr = Header::from_slice(&mut buf[..len], MAX_CONN_ID_LEN).unwrap();
6558 
6559         let mut scid = [0; MAX_CONN_ID_LEN];
6560         rand::rand_bytes(&mut scid[..]);
6561 
6562         let token = b"quiche test retry token";
6563 
6564         len = packet::retry(
6565             &hdr.scid,
6566             &hdr.dcid,
6567             &scid,
6568             token,
6569             hdr.version,
6570             &mut buf,
6571         )
6572         .unwrap();
6573 
6574         // Client receives Retry and sends new Initial.
6575         assert_eq!(pipe.client.recv(&mut buf[..len]), Ok(len));
6576 
6577         len = pipe.client.send(&mut buf).unwrap();
6578 
6579         // Server accepts connection and send first flight. But original
6580         // destination connection ID is ignored.
6581         pipe.server = accept(&scid, None, &mut config).unwrap();
6582 
6583         len = testing::recv_send(&mut pipe.server, &mut buf, len).unwrap();
6584 
6585         assert_eq!(
6586             pipe.client.recv(&mut buf[..len]),
6587             Err(Error::InvalidTransportParam)
6588         );
6589     }
6590 
6591     #[test]
invalid_retry_source_connection_id()6592     fn invalid_retry_source_connection_id() {
6593         let mut buf = [0; 65535];
6594 
6595         let mut config = Config::new(PROTOCOL_VERSION).unwrap();
6596         config
6597             .load_cert_chain_from_pem_file("examples/cert.crt")
6598             .unwrap();
6599         config
6600             .load_priv_key_from_pem_file("examples/cert.key")
6601             .unwrap();
6602         config
6603             .set_application_protos(b"\x06proto1\06proto2")
6604             .unwrap();
6605 
6606         let mut pipe = testing::Pipe::with_server_config(&mut config).unwrap();
6607 
6608         // Client sends initial flight.
6609         let mut len = pipe.client.send(&mut buf).unwrap();
6610 
6611         // Server sends Retry packet.
6612         let hdr = Header::from_slice(&mut buf[..len], MAX_CONN_ID_LEN).unwrap();
6613 
6614         let mut scid = [0; MAX_CONN_ID_LEN];
6615         rand::rand_bytes(&mut scid[..]);
6616 
6617         let token = b"quiche test retry token";
6618 
6619         len = packet::retry(
6620             &hdr.scid,
6621             &hdr.dcid,
6622             &scid,
6623             token,
6624             hdr.version,
6625             &mut buf,
6626         )
6627         .unwrap();
6628 
6629         // Client receives Retry and sends new Initial.
6630         assert_eq!(pipe.client.recv(&mut buf[..len]), Ok(len));
6631 
6632         len = pipe.client.send(&mut buf).unwrap();
6633 
6634         // Server accepts connection and send first flight. But original
6635         // destination connection ID is invalid.
6636         pipe.server = accept(&scid, Some(b"bogus value"), &mut config).unwrap();
6637 
6638         len = testing::recv_send(&mut pipe.server, &mut buf, len).unwrap();
6639 
6640         assert_eq!(
6641             pipe.client.recv(&mut buf[..len]),
6642             Err(Error::InvalidTransportParam)
6643         );
6644     }
6645 
check_send(_: &mut impl Send)6646     fn check_send(_: &mut impl Send) {}
6647 
6648     #[test]
connection_must_be_send()6649     fn connection_must_be_send() {
6650         let mut pipe = testing::Pipe::default().unwrap();
6651         check_send(&mut pipe.client);
6652     }
6653 
6654     #[test]
data_blocked()6655     fn data_blocked() {
6656         let mut buf = [0; 65535];
6657 
6658         let mut pipe = testing::Pipe::default().unwrap();
6659 
6660         assert_eq!(pipe.handshake(&mut buf), Ok(()));
6661 
6662         assert_eq!(pipe.client.stream_send(0, b"aaaaaaaaaa", false), Ok(10));
6663         assert_eq!(pipe.client.blocked_limit, None);
6664         assert_eq!(pipe.advance(&mut buf), Ok(()));
6665 
6666         assert_eq!(pipe.client.stream_send(4, b"aaaaaaaaaa", false), Ok(10));
6667         assert_eq!(pipe.client.blocked_limit, None);
6668         assert_eq!(pipe.advance(&mut buf), Ok(()));
6669 
6670         assert_eq!(pipe.client.stream_send(8, b"aaaaaaaaaaa", false), Ok(10));
6671         assert_eq!(pipe.client.blocked_limit, Some(30));
6672 
6673         let len = pipe.client.send(&mut buf).unwrap();
6674         assert_eq!(pipe.client.blocked_limit, None);
6675 
6676         let frames =
6677             testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
6678 
6679         let mut iter = frames.iter();
6680 
6681         assert_eq!(iter.next(), Some(&frame::Frame::DataBlocked { limit: 30 }));
6682 
6683         assert_eq!(
6684             iter.next(),
6685             Some(&frame::Frame::Stream {
6686                 stream_id: 8,
6687                 data: stream::RangeBuf::from(b"aaaaaaaaaa", 0, false),
6688             })
6689         );
6690 
6691         assert_eq!(iter.next(), None);
6692     }
6693 
6694     #[test]
stream_data_blocked()6695     fn stream_data_blocked() {
6696         let mut buf = [0; 65535];
6697 
6698         let mut pipe = testing::Pipe::default().unwrap();
6699 
6700         assert_eq!(pipe.handshake(&mut buf), Ok(()));
6701 
6702         assert_eq!(pipe.client.stream_send(0, b"aaaaa", false), Ok(5));
6703         assert_eq!(pipe.client.streams.blocked().len(), 0);
6704 
6705         assert_eq!(pipe.client.stream_send(0, b"aaaaa", false), Ok(5));
6706         assert_eq!(pipe.client.streams.blocked().len(), 0);
6707 
6708         assert_eq!(pipe.client.stream_send(0, b"aaaaaa", false), Ok(5));
6709         assert_eq!(pipe.client.streams.blocked().len(), 1);
6710 
6711         let len = pipe.client.send(&mut buf).unwrap();
6712         assert_eq!(pipe.client.streams.blocked().len(), 0);
6713 
6714         let frames =
6715             testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
6716 
6717         let mut iter = frames.iter();
6718 
6719         assert_eq!(
6720             iter.next(),
6721             Some(&frame::Frame::StreamDataBlocked {
6722                 stream_id: 0,
6723                 limit: 15,
6724             })
6725         );
6726 
6727         assert_eq!(
6728             iter.next(),
6729             Some(&frame::Frame::Stream {
6730                 stream_id: 0,
6731                 data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaa", 0, false),
6732             })
6733         );
6734 
6735         assert_eq!(iter.next(), None);
6736 
6737         // Send from another stream, make sure we don't send STREAM_DATA_BLOCKED
6738         // again.
6739         assert_eq!(pipe.client.stream_send(4, b"a", false), Ok(1));
6740 
6741         let len = pipe.client.send(&mut buf).unwrap();
6742         assert_eq!(pipe.client.streams.blocked().len(), 0);
6743 
6744         let frames =
6745             testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
6746 
6747         let mut iter = frames.iter();
6748 
6749         assert_eq!(
6750             iter.next(),
6751             Some(&frame::Frame::Stream {
6752                 stream_id: 4,
6753                 data: stream::RangeBuf::from(b"a", 0, false),
6754             })
6755         );
6756 
6757         assert_eq!(iter.next(), None);
6758 
6759         // Send again from blocked stream and make sure it is marked as blocked
6760         // again.
6761         assert_eq!(pipe.client.stream_send(0, b"aaaaaa", false), Ok(0));
6762         assert_eq!(pipe.client.streams.blocked().len(), 1);
6763 
6764         let len = pipe.client.send(&mut buf).unwrap();
6765         assert_eq!(pipe.client.streams.blocked().len(), 0);
6766 
6767         let frames =
6768             testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
6769 
6770         let mut iter = frames.iter();
6771 
6772         assert_eq!(
6773             iter.next(),
6774             Some(&frame::Frame::StreamDataBlocked {
6775                 stream_id: 0,
6776                 limit: 15,
6777             })
6778         );
6779 
6780         assert_eq!(iter.next(), Some(&frame::Frame::Padding { len: 1 }));
6781 
6782         assert_eq!(iter.next(), None);
6783     }
6784 
6785     #[test]
app_limited_true()6786     fn app_limited_true() {
6787         let mut buf = [0; 65535];
6788 
6789         let mut config = Config::new(PROTOCOL_VERSION).unwrap();
6790         config
6791             .set_application_protos(b"\x06proto1\x06proto2")
6792             .unwrap();
6793         config.set_initial_max_data(50000);
6794         config.set_initial_max_stream_data_bidi_local(50000);
6795         config.set_initial_max_stream_data_bidi_remote(50000);
6796         config.set_max_udp_payload_size(1200);
6797         config.verify_peer(false);
6798 
6799         let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
6800 
6801         assert_eq!(pipe.handshake(&mut buf), Ok(()));
6802 
6803         // Client sends stream data.
6804         assert_eq!(pipe.client.stream_send(0, b"a", true), Ok(1));
6805         assert_eq!(pipe.advance(&mut buf), Ok(()));
6806 
6807         // Server reads stream data.
6808         let mut b = [0; 15];
6809         pipe.server.stream_recv(0, &mut b).unwrap();
6810         assert_eq!(pipe.advance(&mut buf), Ok(()));
6811 
6812         // Server sends stream data smaller than cwnd.
6813         let send_buf = [0; 10000];
6814         assert_eq!(pipe.server.stream_send(0, &send_buf, false), Ok(10000));
6815         assert_eq!(pipe.advance(&mut buf), Ok(()));
6816 
6817         // app_limited should be true because we send less than cwnd.
6818         assert_eq!(pipe.server.recovery.app_limited(), true);
6819     }
6820 
6821     #[test]
app_limited_false()6822     fn app_limited_false() {
6823         let mut buf = [0; 65535];
6824 
6825         let mut config = Config::new(PROTOCOL_VERSION).unwrap();
6826         config
6827             .set_application_protos(b"\x06proto1\x06proto2")
6828             .unwrap();
6829         config.set_initial_max_data(50000);
6830         config.set_initial_max_stream_data_bidi_local(50000);
6831         config.set_initial_max_stream_data_bidi_remote(50000);
6832         config.set_max_udp_payload_size(1200);
6833         config.verify_peer(false);
6834 
6835         let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
6836 
6837         assert_eq!(pipe.handshake(&mut buf), Ok(()));
6838 
6839         // Client sends stream data.
6840         assert_eq!(pipe.client.stream_send(0, b"a", true), Ok(1));
6841         assert_eq!(pipe.advance(&mut buf), Ok(()));
6842 
6843         // Server reads stream data.
6844         let mut b = [0; 15];
6845         pipe.server.stream_recv(0, &mut b).unwrap();
6846         assert_eq!(pipe.advance(&mut buf), Ok(()));
6847 
6848         // Server sends stream data bigger than cwnd.
6849         let send_buf1 = [0; 20000];
6850         assert_eq!(pipe.server.stream_send(0, &send_buf1, false), Ok(14085));
6851         assert_eq!(pipe.advance(&mut buf), Ok(()));
6852 
6853         // We can't create a new packet header because there is no room by cwnd.
6854         // app_limited should be false because we can't send more by cwnd.
6855         assert_eq!(pipe.server.recovery.app_limited(), false);
6856     }
6857 
6858     #[test]
app_limited_false_no_frame()6859     fn app_limited_false_no_frame() {
6860         let mut buf = [0; 65535];
6861 
6862         let mut config = Config::new(PROTOCOL_VERSION).unwrap();
6863         config
6864             .set_application_protos(b"\x06proto1\x06proto2")
6865             .unwrap();
6866         config.set_initial_max_data(50000);
6867         config.set_initial_max_stream_data_bidi_local(50000);
6868         config.set_initial_max_stream_data_bidi_remote(50000);
6869         config.set_max_udp_payload_size(1405);
6870         config.verify_peer(false);
6871 
6872         let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
6873 
6874         assert_eq!(pipe.handshake(&mut buf), Ok(()));
6875 
6876         // Client sends stream data.
6877         assert_eq!(pipe.client.stream_send(0, b"a", true), Ok(1));
6878         assert_eq!(pipe.advance(&mut buf), Ok(()));
6879 
6880         // Server reads stream data.
6881         let mut b = [0; 15];
6882         pipe.server.stream_recv(0, &mut b).unwrap();
6883         assert_eq!(pipe.advance(&mut buf), Ok(()));
6884 
6885         // Server sends stream data bigger than cwnd.
6886         let send_buf1 = [0; 20000];
6887         assert_eq!(pipe.server.stream_send(0, &send_buf1, false), Ok(14085));
6888         assert_eq!(pipe.advance(&mut buf), Ok(()));
6889 
6890         // We can't create a new packet header because there is no room by cwnd.
6891         // app_limited should be false because we can't send more by cwnd.
6892         assert_eq!(pipe.server.recovery.app_limited(), false);
6893     }
6894 
6895     #[test]
app_limited_false_no_header()6896     fn app_limited_false_no_header() {
6897         let mut buf = [0; 65535];
6898 
6899         let mut config = Config::new(PROTOCOL_VERSION).unwrap();
6900         config
6901             .set_application_protos(b"\x06proto1\x06proto2")
6902             .unwrap();
6903         config.set_initial_max_data(50000);
6904         config.set_initial_max_stream_data_bidi_local(50000);
6905         config.set_initial_max_stream_data_bidi_remote(50000);
6906         config.set_max_udp_payload_size(1406);
6907         config.verify_peer(false);
6908 
6909         let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
6910 
6911         assert_eq!(pipe.handshake(&mut buf), Ok(()));
6912 
6913         // Client sends stream data.
6914         assert_eq!(pipe.client.stream_send(0, b"a", true), Ok(1));
6915         assert_eq!(pipe.advance(&mut buf), Ok(()));
6916 
6917         // Server reads stream data.
6918         let mut b = [0; 15];
6919         pipe.server.stream_recv(0, &mut b).unwrap();
6920         assert_eq!(pipe.advance(&mut buf), Ok(()));
6921 
6922         // Server sends stream data bigger than cwnd.
6923         let send_buf1 = [0; 20000];
6924         assert_eq!(pipe.server.stream_send(0, &send_buf1, false), Ok(14085));
6925         assert_eq!(pipe.advance(&mut buf), Ok(()));
6926 
6927         // We can't create a new frame because there is no room by cwnd.
6928         // app_limited should be false because we can't send more by cwnd.
6929         assert_eq!(pipe.server.recovery.app_limited(), false);
6930     }
6931 
6932     #[test]
limit_ack_ranges()6933     fn limit_ack_ranges() {
6934         let mut buf = [0; 65535];
6935 
6936         let mut pipe = testing::Pipe::default().unwrap();
6937 
6938         assert_eq!(pipe.handshake(&mut buf), Ok(()));
6939 
6940         let epoch = packet::EPOCH_APPLICATION;
6941 
6942         assert_eq!(pipe.server.pkt_num_spaces[epoch].recv_pkt_need_ack.len(), 0);
6943 
6944         let frames = [frame::Frame::Ping, frame::Frame::Padding { len: 3 }];
6945 
6946         let pkt_type = packet::Type::Short;
6947 
6948         let mut last_packet_sent = 0;
6949 
6950         for _ in 0..512 {
6951             let recv_count = pipe.server.recv_count;
6952 
6953             last_packet_sent = pipe.client.pkt_num_spaces[epoch].next_pkt_num;
6954 
6955             pipe.send_pkt_to_server(pkt_type, &frames, &mut buf)
6956                 .unwrap();
6957 
6958             assert_eq!(pipe.server.recv_count, recv_count + 1);
6959 
6960             // Skip packet number.
6961             pipe.client.pkt_num_spaces[epoch].next_pkt_num += 1;
6962         }
6963 
6964         assert_eq!(
6965             pipe.server.pkt_num_spaces[epoch].recv_pkt_need_ack.len(),
6966             MAX_ACK_RANGES
6967         );
6968 
6969         assert_eq!(
6970             pipe.server.pkt_num_spaces[epoch].recv_pkt_need_ack.first(),
6971             Some(last_packet_sent - ((MAX_ACK_RANGES as u64) - 1) * 2)
6972         );
6973 
6974         assert_eq!(
6975             pipe.server.pkt_num_spaces[epoch].recv_pkt_need_ack.last(),
6976             Some(last_packet_sent)
6977         );
6978     }
6979 
6980     #[test]
6981     /// Tests that streams are correctly scheduled based on their priority.
stream_priority()6982     fn stream_priority() {
6983         // Limit 1-RTT packet size to avoid congestion control interference.
6984         const MAX_TEST_PACKET_SIZE: usize = 540;
6985 
6986         let mut buf = [0; 65535];
6987 
6988         let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
6989         config
6990             .load_cert_chain_from_pem_file("examples/cert.crt")
6991             .unwrap();
6992         config
6993             .load_priv_key_from_pem_file("examples/cert.key")
6994             .unwrap();
6995         config
6996             .set_application_protos(b"\x06proto1\x06proto2")
6997             .unwrap();
6998         config.set_initial_max_data(1_000_000);
6999         config.set_initial_max_stream_data_bidi_local(1_000_000);
7000         config.set_initial_max_stream_data_bidi_remote(1_000_000);
7001         config.set_initial_max_stream_data_uni(0);
7002         config.set_initial_max_streams_bidi(100);
7003         config.set_initial_max_streams_uni(0);
7004         config.verify_peer(false);
7005 
7006         let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
7007         assert_eq!(pipe.handshake(&mut buf), Ok(()));
7008 
7009         assert_eq!(pipe.client.stream_send(0, b"a", false), Ok(1));
7010         assert_eq!(pipe.advance(&mut buf), Ok(()));
7011 
7012         assert_eq!(pipe.client.stream_send(4, b"a", false), Ok(1));
7013         assert_eq!(pipe.advance(&mut buf), Ok(()));
7014 
7015         assert_eq!(pipe.client.stream_send(8, b"a", false), Ok(1));
7016         assert_eq!(pipe.advance(&mut buf), Ok(()));
7017 
7018         assert_eq!(pipe.client.stream_send(12, b"a", false), Ok(1));
7019         assert_eq!(pipe.advance(&mut buf), Ok(()));
7020 
7021         assert_eq!(pipe.client.stream_send(16, b"a", false), Ok(1));
7022         assert_eq!(pipe.advance(&mut buf), Ok(()));
7023 
7024         assert_eq!(pipe.client.stream_send(20, b"a", false), Ok(1));
7025         assert_eq!(pipe.advance(&mut buf), Ok(()));
7026 
7027         let mut b = [0; 1];
7028 
7029         let out = [b'b'; 500];
7030 
7031         // Server prioritizes streams as follows:
7032         //  * Stream 8 and 16 have the same priority but are non-incremental.
7033         //  * Stream 4, 12 and 20 have the same priority but 20 is non-incremental
7034         //    and 4 and 12 are incremental.
7035         //  * Stream 0 is on its own.
7036 
7037         pipe.server.stream_recv(0, &mut b).unwrap();
7038         assert_eq!(pipe.server.stream_priority(0, 255, true), Ok(()));
7039         pipe.server.stream_send(0, &out, false).unwrap();
7040         pipe.server.stream_send(0, &out, false).unwrap();
7041         pipe.server.stream_send(0, &out, false).unwrap();
7042 
7043         pipe.server.stream_recv(12, &mut b).unwrap();
7044         assert_eq!(pipe.server.stream_priority(12, 42, true), Ok(()));
7045         pipe.server.stream_send(12, &out, false).unwrap();
7046         pipe.server.stream_send(12, &out, false).unwrap();
7047         pipe.server.stream_send(12, &out, false).unwrap();
7048 
7049         pipe.server.stream_recv(16, &mut b).unwrap();
7050         assert_eq!(pipe.server.stream_priority(16, 10, false), Ok(()));
7051         pipe.server.stream_send(16, &out, false).unwrap();
7052         pipe.server.stream_send(16, &out, false).unwrap();
7053         pipe.server.stream_send(16, &out, false).unwrap();
7054 
7055         pipe.server.stream_recv(4, &mut b).unwrap();
7056         assert_eq!(pipe.server.stream_priority(4, 42, true), Ok(()));
7057         pipe.server.stream_send(4, &out, false).unwrap();
7058         pipe.server.stream_send(4, &out, false).unwrap();
7059         pipe.server.stream_send(4, &out, false).unwrap();
7060 
7061         pipe.server.stream_recv(8, &mut b).unwrap();
7062         assert_eq!(pipe.server.stream_priority(8, 10, false), Ok(()));
7063         pipe.server.stream_send(8, &out, false).unwrap();
7064         pipe.server.stream_send(8, &out, false).unwrap();
7065         pipe.server.stream_send(8, &out, false).unwrap();
7066 
7067         pipe.server.stream_recv(20, &mut b).unwrap();
7068         assert_eq!(pipe.server.stream_priority(20, 42, false), Ok(()));
7069         pipe.server.stream_send(20, &out, false).unwrap();
7070         pipe.server.stream_send(20, &out, false).unwrap();
7071         pipe.server.stream_send(20, &out, false).unwrap();
7072 
7073         // First is stream 8.
7074         let mut off = 0;
7075 
7076         for _ in 1..=3 {
7077             let len = pipe.server.send(&mut buf[..MAX_TEST_PACKET_SIZE]).unwrap();
7078 
7079             let frames =
7080                 testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
7081             let stream = frames.iter().next().unwrap();
7082 
7083             assert_eq!(stream, &frame::Frame::Stream {
7084                 stream_id: 8,
7085                 data: stream::RangeBuf::from(&out, off, false),
7086             });
7087 
7088             off = match stream {
7089                 frame::Frame::Stream { data, .. } => data.max_off(),
7090 
7091                 _ => unreachable!(),
7092             };
7093         }
7094 
7095         // Then is stream 16.
7096         let mut off = 0;
7097 
7098         for _ in 1..=3 {
7099             let len = pipe.server.send(&mut buf[..MAX_TEST_PACKET_SIZE]).unwrap();
7100 
7101             let frames =
7102                 testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
7103             let stream = frames.iter().next().unwrap();
7104 
7105             assert_eq!(stream, &frame::Frame::Stream {
7106                 stream_id: 16,
7107                 data: stream::RangeBuf::from(&out, off, false),
7108             });
7109 
7110             off = match stream {
7111                 frame::Frame::Stream { data, .. } => data.max_off(),
7112 
7113                 _ => unreachable!(),
7114             };
7115         }
7116 
7117         // Then is stream 20.
7118         let mut off = 0;
7119 
7120         for _ in 1..=3 {
7121             let len = pipe.server.send(&mut buf[..MAX_TEST_PACKET_SIZE]).unwrap();
7122 
7123             let frames =
7124                 testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
7125             let stream = frames.iter().next().unwrap();
7126 
7127             assert_eq!(stream, &frame::Frame::Stream {
7128                 stream_id: 20,
7129                 data: stream::RangeBuf::from(&out, off, false),
7130             });
7131 
7132             off = match stream {
7133                 frame::Frame::Stream { data, .. } => data.max_off(),
7134 
7135                 _ => unreachable!(),
7136             };
7137         }
7138 
7139         // Then are stream 12 and 4, with the same priority, incrementally.
7140         let mut off = 0;
7141 
7142         for _ in 1..=3 {
7143             let len = pipe.server.send(&mut buf[..MAX_TEST_PACKET_SIZE]).unwrap();
7144 
7145             let frames =
7146                 testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
7147 
7148             assert_eq!(
7149                 frames.iter().next(),
7150                 Some(&frame::Frame::Stream {
7151                     stream_id: 12,
7152                     data: stream::RangeBuf::from(&out, off, false),
7153                 })
7154             );
7155 
7156             let len = pipe.server.send(&mut buf[..MAX_TEST_PACKET_SIZE]).unwrap();
7157 
7158             let frames =
7159                 testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
7160 
7161             let stream = frames.iter().next().unwrap();
7162 
7163             assert_eq!(stream, &frame::Frame::Stream {
7164                 stream_id: 4,
7165                 data: stream::RangeBuf::from(&out, off, false),
7166             });
7167 
7168             off = match stream {
7169                 frame::Frame::Stream { data, .. } => data.max_off(),
7170 
7171                 _ => unreachable!(),
7172             };
7173         }
7174 
7175         // Final is stream 0.
7176         let mut off = 0;
7177 
7178         for _ in 1..=3 {
7179             let len = pipe.server.send(&mut buf[..MAX_TEST_PACKET_SIZE]).unwrap();
7180 
7181             let frames =
7182                 testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
7183             let stream = frames.iter().next().unwrap();
7184 
7185             assert_eq!(stream, &frame::Frame::Stream {
7186                 stream_id: 0,
7187                 data: stream::RangeBuf::from(&out, off, false),
7188             });
7189 
7190             off = match stream {
7191                 frame::Frame::Stream { data, .. } => data.max_off(),
7192 
7193                 _ => unreachable!(),
7194             };
7195         }
7196 
7197         assert_eq!(pipe.server.send(&mut buf), Err(Error::Done));
7198     }
7199 
7200     #[test]
7201     /// Tests that changing a stream's priority is correctly propagated.
7202     ///
7203     /// Re-prioritization is not supported, so this should fail.
7204     #[should_panic]
stream_reprioritize()7205     fn stream_reprioritize() {
7206         let mut buf = [0; 65535];
7207 
7208         let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
7209         config
7210             .load_cert_chain_from_pem_file("examples/cert.crt")
7211             .unwrap();
7212         config
7213             .load_priv_key_from_pem_file("examples/cert.key")
7214             .unwrap();
7215         config
7216             .set_application_protos(b"\x06proto1\x06proto2")
7217             .unwrap();
7218         config.set_initial_max_data(30);
7219         config.set_initial_max_stream_data_bidi_local(15);
7220         config.set_initial_max_stream_data_bidi_remote(15);
7221         config.set_initial_max_stream_data_uni(0);
7222         config.set_initial_max_streams_bidi(5);
7223         config.set_initial_max_streams_uni(0);
7224         config.verify_peer(false);
7225 
7226         let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
7227         assert_eq!(pipe.handshake(&mut buf), Ok(()));
7228 
7229         assert_eq!(pipe.client.stream_send(0, b"a", false), Ok(1));
7230         assert_eq!(pipe.advance(&mut buf), Ok(()));
7231 
7232         assert_eq!(pipe.client.stream_send(4, b"a", false), Ok(1));
7233         assert_eq!(pipe.advance(&mut buf), Ok(()));
7234 
7235         assert_eq!(pipe.client.stream_send(8, b"a", false), Ok(1));
7236         assert_eq!(pipe.advance(&mut buf), Ok(()));
7237 
7238         assert_eq!(pipe.client.stream_send(12, b"a", false), Ok(1));
7239         assert_eq!(pipe.advance(&mut buf), Ok(()));
7240 
7241         let mut b = [0; 1];
7242 
7243         pipe.server.stream_recv(0, &mut b).unwrap();
7244         assert_eq!(pipe.server.stream_priority(0, 255, true), Ok(()));
7245         pipe.server.stream_send(0, b"b", false).unwrap();
7246 
7247         pipe.server.stream_recv(12, &mut b).unwrap();
7248         assert_eq!(pipe.server.stream_priority(12, 42, true), Ok(()));
7249         pipe.server.stream_send(12, b"b", false).unwrap();
7250 
7251         pipe.server.stream_recv(8, &mut b).unwrap();
7252         assert_eq!(pipe.server.stream_priority(8, 10, true), Ok(()));
7253         pipe.server.stream_send(8, b"b", false).unwrap();
7254 
7255         pipe.server.stream_recv(4, &mut b).unwrap();
7256         assert_eq!(pipe.server.stream_priority(4, 42, true), Ok(()));
7257         pipe.server.stream_send(4, b"b", false).unwrap();
7258 
7259         // Stream 0 is re-prioritized!!!
7260         assert_eq!(pipe.server.stream_priority(0, 20, true), Ok(()));
7261 
7262         // First is stream 8.
7263         let len = pipe.server.send(&mut buf).unwrap();
7264 
7265         let frames =
7266             testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
7267 
7268         assert_eq!(
7269             frames.iter().next(),
7270             Some(&frame::Frame::Stream {
7271                 stream_id: 8,
7272                 data: stream::RangeBuf::from(b"b", 0, false),
7273             })
7274         );
7275 
7276         // Then is stream 0.
7277         let len = pipe.server.send(&mut buf).unwrap();
7278 
7279         let frames =
7280             testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
7281 
7282         assert_eq!(
7283             frames.iter().next(),
7284             Some(&frame::Frame::Stream {
7285                 stream_id: 0,
7286                 data: stream::RangeBuf::from(b"b", 0, false),
7287             })
7288         );
7289 
7290         // Then are stream 12 and 4, with the same priority.
7291         let len = pipe.server.send(&mut buf).unwrap();
7292 
7293         let frames =
7294             testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
7295 
7296         assert_eq!(
7297             frames.iter().next(),
7298             Some(&frame::Frame::Stream {
7299                 stream_id: 12,
7300                 data: stream::RangeBuf::from(b"b", 0, false),
7301             })
7302         );
7303 
7304         let len = pipe.server.send(&mut buf).unwrap();
7305 
7306         let frames =
7307             testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
7308 
7309         assert_eq!(
7310             frames.iter().next(),
7311             Some(&frame::Frame::Stream {
7312                 stream_id: 4,
7313                 data: stream::RangeBuf::from(b"b", 0, false),
7314             })
7315         );
7316 
7317         assert_eq!(pipe.server.send(&mut buf), Err(Error::Done));
7318     }
7319 
7320     #[test]
7321     /// Tests that old data is retransmitted on PTO.
early_retransmit()7322     fn early_retransmit() {
7323         let mut buf = [0; 65535];
7324 
7325         let mut pipe = testing::Pipe::default().unwrap();
7326         assert_eq!(pipe.handshake(&mut buf), Ok(()));
7327 
7328         // Client sends stream data.
7329         assert_eq!(pipe.client.stream_send(0, b"a", false), Ok(1));
7330         assert_eq!(pipe.advance(&mut buf), Ok(()));
7331 
7332         // Client sends more stream data, but packet is lost
7333         assert_eq!(pipe.client.stream_send(4, b"b", false), Ok(1));
7334         assert!(pipe.client.send(&mut buf).is_ok());
7335 
7336         // Wait until PTO expires. Since the RTT is very low, wait a bit more.
7337         let timer = pipe.client.timeout().unwrap();
7338         std::thread::sleep(timer + time::Duration::from_millis(1));
7339 
7340         pipe.client.on_timeout();
7341 
7342         let epoch = packet::EPOCH_APPLICATION;
7343         assert_eq!(pipe.client.recovery.loss_probes[epoch], 1);
7344 
7345         // Client retransmits stream data in PTO probe.
7346         let len = pipe.client.send(&mut buf).unwrap();
7347         assert_eq!(pipe.client.recovery.loss_probes[epoch], 0);
7348 
7349         let frames =
7350             testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
7351 
7352         let mut iter = frames.iter();
7353 
7354         // Skip ACK frame.
7355         iter.next();
7356 
7357         assert_eq!(
7358             iter.next(),
7359             Some(&frame::Frame::Stream {
7360                 stream_id: 4,
7361                 data: stream::RangeBuf::from(b"b", 0, false),
7362             })
7363         );
7364     }
7365 
7366     #[test]
7367     /// Tests that client avoids handshake deadlock by arming PTO.
handshake_anti_deadlock()7368     fn handshake_anti_deadlock() {
7369         let mut buf = [0; 65535];
7370 
7371         let mut config = Config::new(PROTOCOL_VERSION).unwrap();
7372         config
7373             .load_cert_chain_from_pem_file("examples/cert-big.crt")
7374             .unwrap();
7375         config
7376             .load_priv_key_from_pem_file("examples/cert.key")
7377             .unwrap();
7378         config
7379             .set_application_protos(b"\x06proto1\06proto2")
7380             .unwrap();
7381 
7382         let mut pipe = testing::Pipe::with_server_config(&mut config).unwrap();
7383 
7384         assert_eq!(pipe.client.handshake_status().has_handshake_keys, false);
7385         assert_eq!(pipe.client.handshake_status().peer_verified_address, false);
7386         assert_eq!(pipe.server.handshake_status().has_handshake_keys, false);
7387         assert_eq!(pipe.server.handshake_status().peer_verified_address, true);
7388 
7389         // Client sends padded Initial.
7390         let len = pipe.client.send(&mut buf).unwrap();
7391         assert_eq!(len, 1200);
7392 
7393         // Server receives client's Initial and sends own Initial and Handshake
7394         // until it's blocked by the anti-amplification limit.
7395         let len = testing::recv_send(&mut pipe.server, &mut buf, len).unwrap();
7396         assert_eq!(pipe.server.send(&mut buf[len..]), Err(Error::Done));
7397 
7398         assert_eq!(pipe.client.handshake_status().has_handshake_keys, false);
7399         assert_eq!(pipe.client.handshake_status().peer_verified_address, false);
7400         assert_eq!(pipe.server.handshake_status().has_handshake_keys, true);
7401         assert_eq!(pipe.server.handshake_status().peer_verified_address, true);
7402 
7403         // Client receives the server flight and sends Handshake ACK, but it is
7404         // lost.
7405         assert!(testing::recv_send(&mut pipe.client, &mut buf, len).is_ok());
7406 
7407         assert_eq!(pipe.client.handshake_status().has_handshake_keys, true);
7408         assert_eq!(pipe.client.handshake_status().peer_verified_address, false);
7409         assert_eq!(pipe.server.handshake_status().has_handshake_keys, true);
7410         assert_eq!(pipe.server.handshake_status().peer_verified_address, true);
7411 
7412         // Make sure client's PTO timer is armed.
7413         assert!(pipe.client.timeout().is_some());
7414     }
7415 
7416     #[test]
7417     /// Tests that packets with corrupted type (from Handshake to Initial) are
7418     /// properly ignored.
handshake_packet_type_corruption()7419     fn handshake_packet_type_corruption() {
7420         let mut buf = [0; 65535];
7421 
7422         let mut pipe = testing::Pipe::default().unwrap();
7423 
7424         // Client sends padded Initial.
7425         let len = pipe.client.send(&mut buf).unwrap();
7426         assert_eq!(len, 1200);
7427 
7428         // Server receives client's Initial and sends own Initial and Handshake.
7429         let len = testing::recv_send(&mut pipe.server, &mut buf, len).unwrap();
7430         assert_eq!(pipe.client.recv(&mut buf[..len]), Ok(len));
7431 
7432         // Client sends Initial packet with ACK.
7433         let len = pipe.client.send(&mut buf).unwrap();
7434 
7435         let hdr = Header::from_slice(&mut buf[..len], 0).unwrap();
7436         assert_eq!(hdr.ty, Type::Initial);
7437 
7438         assert_eq!(pipe.server.recv(&mut buf[..len]), Ok(len));
7439 
7440         // Client sends Handshake packet.
7441         let len = pipe.client.send(&mut buf).unwrap();
7442 
7443         let hdr = Header::from_slice(&mut buf[..len], 0).unwrap();
7444         assert_eq!(hdr.ty, Type::Handshake);
7445 
7446         // Packet type is corrupted to Initial..
7447         buf[0] &= !(0x20);
7448 
7449         let hdr = Header::from_slice(&mut buf[..len], 0).unwrap();
7450         assert_eq!(hdr.ty, Type::Initial);
7451 
7452         // Server receives corrupted packet without returning an error.
7453         assert_eq!(pipe.server.recv(&mut buf[..len]), Ok(len));
7454     }
7455 
7456     #[test]
dgram_send_fails_invalidstate()7457     fn dgram_send_fails_invalidstate() {
7458         let mut buf = [0; 65535];
7459 
7460         let mut pipe = testing::Pipe::default().unwrap();
7461 
7462         assert_eq!(pipe.handshake(&mut buf), Ok(()));
7463 
7464         assert_eq!(
7465             pipe.client.dgram_send(b"hello, world"),
7466             Err(Error::InvalidState)
7467         );
7468     }
7469 
7470     #[test]
dgram_send_app_limited()7471     fn dgram_send_app_limited() {
7472         let mut buf = [0; 65535];
7473         let send_buf = [0xcf; 1000];
7474 
7475         let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
7476         config
7477             .load_cert_chain_from_pem_file("examples/cert.crt")
7478             .unwrap();
7479         config
7480             .load_priv_key_from_pem_file("examples/cert.key")
7481             .unwrap();
7482         config
7483             .set_application_protos(b"\x06proto1\x06proto2")
7484             .unwrap();
7485         config.set_initial_max_data(30);
7486         config.set_initial_max_stream_data_bidi_local(15);
7487         config.set_initial_max_stream_data_bidi_remote(15);
7488         config.set_initial_max_stream_data_uni(10);
7489         config.set_initial_max_streams_bidi(3);
7490         config.set_initial_max_streams_uni(3);
7491         config.enable_dgram(true, 1000, 1000);
7492         config.set_max_udp_payload_size(1200);
7493         config.verify_peer(false);
7494 
7495         let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
7496 
7497         assert_eq!(pipe.handshake(&mut buf), Ok(()));
7498         assert_eq!(pipe.advance(&mut buf), Ok(()));
7499 
7500         for _ in 0..1000 {
7501             assert_eq!(pipe.client.dgram_send(&send_buf), Ok(()));
7502         }
7503 
7504         assert!(!pipe.client.recovery.app_limited());
7505         assert_eq!(pipe.client.dgram_send_queue.byte_size(), 1_000_000);
7506 
7507         let len = pipe.client.send(&mut buf).unwrap();
7508 
7509         assert_ne!(pipe.client.dgram_send_queue.byte_size(), 0);
7510         assert_ne!(pipe.client.dgram_send_queue.byte_size(), 1_000_000);
7511         assert!(!pipe.client.recovery.app_limited());
7512 
7513         testing::recv_send(&mut pipe.client, &mut buf, len).unwrap();
7514         testing::recv_send(&mut pipe.server, &mut buf, len).unwrap();
7515 
7516         assert_ne!(pipe.client.dgram_send_queue.byte_size(), 0);
7517         assert_ne!(pipe.client.dgram_send_queue.byte_size(), 1_000_000);
7518 
7519         assert!(!pipe.client.recovery.app_limited());
7520     }
7521 
7522     #[test]
dgram_single_datagram()7523     fn dgram_single_datagram() {
7524         let mut buf = [0; 65535];
7525 
7526         let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
7527         config
7528             .load_cert_chain_from_pem_file("examples/cert.crt")
7529             .unwrap();
7530         config
7531             .load_priv_key_from_pem_file("examples/cert.key")
7532             .unwrap();
7533         config
7534             .set_application_protos(b"\x06proto1\x06proto2")
7535             .unwrap();
7536         config.set_initial_max_data(30);
7537         config.set_initial_max_stream_data_bidi_local(15);
7538         config.set_initial_max_stream_data_bidi_remote(15);
7539         config.set_initial_max_stream_data_uni(10);
7540         config.set_initial_max_streams_bidi(3);
7541         config.set_initial_max_streams_uni(3);
7542         config.enable_dgram(true, 10, 10);
7543         config.verify_peer(false);
7544 
7545         let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
7546 
7547         assert_eq!(pipe.handshake(&mut buf), Ok(()));
7548 
7549         assert_eq!(pipe.client.dgram_send(b"hello, world"), Ok(()));
7550 
7551         assert_eq!(pipe.advance(&mut buf), Ok(()));
7552 
7553         let result1 = pipe.server.dgram_recv(&mut buf);
7554         assert_eq!(result1, Ok(12));
7555 
7556         let result2 = pipe.server.dgram_recv(&mut buf);
7557         assert_eq!(result2, Err(Error::Done));
7558     }
7559 
7560     #[test]
dgram_multiple_datagrams()7561     fn dgram_multiple_datagrams() {
7562         let mut buf = [0; 65535];
7563 
7564         let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
7565         config
7566             .load_cert_chain_from_pem_file("examples/cert.crt")
7567             .unwrap();
7568         config
7569             .load_priv_key_from_pem_file("examples/cert.key")
7570             .unwrap();
7571         config
7572             .set_application_protos(b"\x06proto1\x06proto2")
7573             .unwrap();
7574         config.set_initial_max_data(30);
7575         config.set_initial_max_stream_data_bidi_local(15);
7576         config.set_initial_max_stream_data_bidi_remote(15);
7577         config.set_initial_max_stream_data_uni(10);
7578         config.set_initial_max_streams_bidi(3);
7579         config.set_initial_max_streams_uni(3);
7580         config.enable_dgram(true, 10, 10);
7581         config.verify_peer(false);
7582 
7583         let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
7584 
7585         assert_eq!(pipe.handshake(&mut buf), Ok(()));
7586 
7587         assert_eq!(pipe.client.dgram_send(b"hello, world"), Ok(()));
7588         assert_eq!(pipe.client.dgram_send(b"ciao, mondo"), Ok(()));
7589         assert_eq!(pipe.client.dgram_send(b"hola, mundo"), Ok(()));
7590 
7591         pipe.client
7592             .dgram_purge_outgoing(|d: &[u8]| -> bool { d[0] == b'c' });
7593 
7594         assert_eq!(pipe.advance(&mut buf), Ok(()));
7595 
7596         let result1 = pipe.server.dgram_recv(&mut buf);
7597         assert_eq!(result1, Ok(12));
7598         assert_eq!(buf[0], b'h');
7599         assert_eq!(buf[1], b'e');
7600 
7601         let result2 = pipe.server.dgram_recv(&mut buf);
7602         assert_eq!(result2, Ok(11));
7603         assert_eq!(buf[0], b'h');
7604         assert_eq!(buf[1], b'o');
7605 
7606         let result3 = pipe.server.dgram_recv(&mut buf);
7607         assert_eq!(result3, Err(Error::Done));
7608     }
7609 
7610     #[test]
dgram_send_queue_overflow()7611     fn dgram_send_queue_overflow() {
7612         let mut buf = [0; 65535];
7613 
7614         let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
7615         config
7616             .load_cert_chain_from_pem_file("examples/cert.crt")
7617             .unwrap();
7618         config
7619             .load_priv_key_from_pem_file("examples/cert.key")
7620             .unwrap();
7621         config
7622             .set_application_protos(b"\x06proto1\x06proto2")
7623             .unwrap();
7624         config.set_initial_max_data(30);
7625         config.set_initial_max_stream_data_bidi_local(15);
7626         config.set_initial_max_stream_data_bidi_remote(15);
7627         config.set_initial_max_stream_data_uni(10);
7628         config.set_initial_max_streams_bidi(3);
7629         config.set_initial_max_streams_uni(3);
7630         config.enable_dgram(true, 10, 2);
7631         config.verify_peer(false);
7632 
7633         let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
7634 
7635         assert_eq!(pipe.advance(&mut buf), Ok(()));
7636 
7637         assert_eq!(pipe.client.dgram_send(b"hello, world"), Ok(()));
7638         assert_eq!(pipe.client.dgram_send(b"ciao, mondo"), Ok(()));
7639         assert_eq!(pipe.client.dgram_send(b"hola, mundo"), Err(Error::Done));
7640 
7641         assert_eq!(pipe.advance(&mut buf), Ok(()));
7642 
7643         let result1 = pipe.server.dgram_recv(&mut buf);
7644         assert_eq!(result1, Ok(12));
7645         assert_eq!(buf[0], b'h');
7646         assert_eq!(buf[1], b'e');
7647 
7648         let result2 = pipe.server.dgram_recv(&mut buf);
7649         assert_eq!(result2, Ok(11));
7650         assert_eq!(buf[0], b'c');
7651         assert_eq!(buf[1], b'i');
7652 
7653         let result3 = pipe.server.dgram_recv(&mut buf);
7654         assert_eq!(result3, Err(Error::Done));
7655     }
7656 
7657     #[test]
dgram_recv_queue_overflow()7658     fn dgram_recv_queue_overflow() {
7659         let mut buf = [0; 65535];
7660 
7661         let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
7662         config
7663             .load_cert_chain_from_pem_file("examples/cert.crt")
7664             .unwrap();
7665         config
7666             .load_priv_key_from_pem_file("examples/cert.key")
7667             .unwrap();
7668         config
7669             .set_application_protos(b"\x06proto1\x06proto2")
7670             .unwrap();
7671         config.set_initial_max_data(30);
7672         config.set_initial_max_stream_data_bidi_local(15);
7673         config.set_initial_max_stream_data_bidi_remote(15);
7674         config.set_initial_max_stream_data_uni(10);
7675         config.set_initial_max_streams_bidi(3);
7676         config.set_initial_max_streams_uni(3);
7677         config.enable_dgram(true, 2, 10);
7678         config.set_max_udp_payload_size(1200);
7679         config.verify_peer(false);
7680 
7681         let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
7682 
7683         assert_eq!(pipe.handshake(&mut buf), Ok(()));
7684 
7685         assert_eq!(pipe.client.dgram_send(b"hello, world"), Ok(()));
7686         assert_eq!(pipe.client.dgram_send(b"ciao, mondo"), Ok(()));
7687         assert_eq!(pipe.client.dgram_send(b"hola, mundo"), Ok(()));
7688 
7689         assert_eq!(pipe.advance(&mut buf), Ok(()));
7690 
7691         let result1 = pipe.server.dgram_recv(&mut buf);
7692         assert_eq!(result1, Ok(11));
7693         assert_eq!(buf[0], b'c');
7694         assert_eq!(buf[1], b'i');
7695 
7696         let result2 = pipe.server.dgram_recv(&mut buf);
7697         assert_eq!(result2, Ok(11));
7698         assert_eq!(buf[0], b'h');
7699         assert_eq!(buf[1], b'o');
7700 
7701         let result3 = pipe.server.dgram_recv(&mut buf);
7702         assert_eq!(result3, Err(Error::Done));
7703     }
7704 
7705     #[test]
dgram_send_max_size()7706     fn dgram_send_max_size() {
7707         let mut buf = [0; MAX_DGRAM_FRAME_SIZE as usize];
7708 
7709         let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
7710         config
7711             .load_cert_chain_from_pem_file("examples/cert.crt")
7712             .unwrap();
7713         config
7714             .load_priv_key_from_pem_file("examples/cert.key")
7715             .unwrap();
7716         config
7717             .set_application_protos(b"\x06proto1\x06proto2")
7718             .unwrap();
7719         config.set_initial_max_data(30);
7720         config.set_initial_max_stream_data_bidi_local(15);
7721         config.set_initial_max_stream_data_bidi_remote(15);
7722         config.set_initial_max_stream_data_uni(10);
7723         config.set_initial_max_streams_bidi(3);
7724         config.set_initial_max_streams_uni(3);
7725         config.enable_dgram(true, 10, 10);
7726         config.set_max_udp_payload_size(1452);
7727         config.verify_peer(false);
7728 
7729         let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
7730 
7731         // Before handshake (before peer settings) we don't know max dgram size
7732         assert_eq!(pipe.client.dgram_max_writable_len(), None);
7733 
7734         assert_eq!(pipe.handshake(&mut buf), Ok(()));
7735 
7736         let max_dgram_size = pipe.client.dgram_max_writable_len().unwrap();
7737 
7738         let dgram_packet: Vec<u8> = vec![42; max_dgram_size];
7739 
7740         assert_eq!(pipe.client.dgram_send(&dgram_packet), Ok(()));
7741 
7742         assert_eq!(pipe.advance(&mut buf), Ok(()));
7743 
7744         let result1 = pipe.server.dgram_recv(&mut buf);
7745         assert_eq!(result1, Ok(max_dgram_size));
7746 
7747         let result2 = pipe.server.dgram_recv(&mut buf);
7748         assert_eq!(result2, Err(Error::Done));
7749     }
7750 }
7751 
7752 pub use crate::packet::Header;
7753 pub use crate::packet::Type;
7754 pub use crate::recovery::CongestionControlAlgorithm;
7755 pub use crate::stream::StreamIter;
7756 
7757 mod crypto;
7758 mod dgram;
7759 mod ffi;
7760 mod frame;
7761 pub mod h3;
7762 mod minmax;
7763 mod octets;
7764 mod packet;
7765 mod rand;
7766 mod ranges;
7767 mod recovery;
7768 mod stream;
7769 mod tls;
7770