1 // Copyright (C) 2019, Cloudflare, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 //     * Redistributions of source code must retain the above copyright notice,
9 //       this list of conditions and the following disclaimer.
10 //
11 //     * Redistributions in binary form must reproduce the above copyright
12 //       notice, this list of conditions and the following disclaimer in the
13 //       documentation and/or other materials provided with the distribution.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 
27 use super::Error;
28 use super::Result;
29 
30 use crate::octets;
31 
32 use super::frame;
33 
34 pub const HTTP3_CONTROL_STREAM_TYPE_ID: u64 = 0x0;
35 pub const HTTP3_PUSH_STREAM_TYPE_ID: u64 = 0x1;
36 pub const QPACK_ENCODER_STREAM_TYPE_ID: u64 = 0x2;
37 pub const QPACK_DECODER_STREAM_TYPE_ID: u64 = 0x3;
38 
39 const MAX_STATE_BUF_SIZE: usize = (1 << 24) - 1;
40 
41 #[derive(Clone, Copy, Debug, PartialEq)]
42 pub enum Type {
43     Control,
44     Request,
45     Push,
46     QpackEncoder,
47     QpackDecoder,
48     Unknown,
49 }
50 
51 #[derive(Clone, Copy, Debug, PartialEq)]
52 pub enum State {
53     /// Reading the stream's type.
54     StreamType,
55 
56     /// Reading the stream's current frame's type.
57     FrameType,
58 
59     /// Reading the stream's current frame's payload length.
60     FramePayloadLen,
61 
62     /// Reading the stream's current frame's payload.
63     FramePayload,
64 
65     /// Reading DATA payload.
66     Data,
67 
68     /// Reading the push ID.
69     PushId,
70 
71     /// Reading a QPACK instruction.
72     QpackInstruction,
73 
74     /// Reading and discarding data.
75     Drain,
76 }
77 
78 impl Type {
deserialize(v: u64) -> Result<Type>79     pub fn deserialize(v: u64) -> Result<Type> {
80         match v {
81             HTTP3_CONTROL_STREAM_TYPE_ID => Ok(Type::Control),
82             HTTP3_PUSH_STREAM_TYPE_ID => Ok(Type::Push),
83             QPACK_ENCODER_STREAM_TYPE_ID => Ok(Type::QpackEncoder),
84             QPACK_DECODER_STREAM_TYPE_ID => Ok(Type::QpackDecoder),
85 
86             _ => Ok(Type::Unknown),
87         }
88     }
89 }
90 
91 /// An HTTP/3 stream.
92 ///
93 /// This maintains the HTTP/3 state for streams of any type (control, request,
94 /// QPACK, ...).
95 ///
96 /// A number of bytes, depending on the current stream's state, is read from the
97 /// transport stream into the HTTP/3 stream's "state buffer". This intermediate
98 /// buffering is required due to the fact that data read from the transport
99 /// might not be complete (e.g. a varint might be split across multiple QUIC
100 /// packets).
101 ///
102 /// When enough data to complete the current state has been buffered, it is
103 /// consumed from the state buffer and the stream is transitioned to the next
104 /// state (see `State` for a list of possible states).
105 #[derive(Debug)]
106 pub struct Stream {
107     /// The corresponding transport stream's ID.
108     id: u64,
109 
110     /// The stream's type (if known).
111     ty: Option<Type>,
112 
113     /// The current stream state.
114     state: State,
115 
116     /// The buffer holding partial data for the current state.
117     state_buf: Vec<u8>,
118 
119     /// The expected amount of bytes required to complete the state.
120     state_len: usize,
121 
122     /// The write offset in the state buffer, that is, how many bytes have
123     /// already been read from the transport for the current state. When
124     /// it reaches `stream_len` the state can be completed.
125     state_off: usize,
126 
127     /// The type of the frame currently being parsed.
128     frame_type: Option<u64>,
129 
130     /// Whether the stream was created locally, or by the peer.
131     is_local: bool,
132 
133     /// Whether the stream has been remotely initialized.
134     remote_initialized: bool,
135 
136     /// Whether the stream has been locally initialized.
137     local_initialized: bool,
138 }
139 
140 impl Stream {
141     /// Creates a new HTTP/3 stream.
142     ///
143     /// The `is_local` parameter indicates whether the stream was created by the
144     /// local endpoint, or by the peer.
new(id: u64, is_local: bool) -> Stream145     pub fn new(id: u64, is_local: bool) -> Stream {
146         let (ty, state) = if crate::stream::is_bidi(id) {
147             // All bidirectional streams are "request" streams, so we don't
148             // need to read the stream type.
149             (Some(Type::Request), State::FrameType)
150         } else {
151             // The stream's type is yet to be determined.
152             (None, State::StreamType)
153         };
154 
155         Stream {
156             id,
157             ty,
158 
159             state,
160 
161             // Pre-allocate a buffer to avoid multiple tiny early allocations.
162             state_buf: vec![0; 16],
163 
164             // Expect one byte for the initial state, to parse the initial
165             // varint length.
166             state_len: 1,
167             state_off: 0,
168 
169             frame_type: None,
170 
171             is_local,
172             remote_initialized: false,
173             local_initialized: false,
174         }
175     }
176 
state(&self) -> State177     pub fn state(&self) -> State {
178         self.state
179     }
180 
181     /// Sets the stream's type and transitions to the next state.
set_ty(&mut self, ty: Type) -> Result<()>182     pub fn set_ty(&mut self, ty: Type) -> Result<()> {
183         assert_eq!(self.state, State::StreamType);
184 
185         self.ty = Some(ty);
186 
187         let state = match ty {
188             Type::Control | Type::Request => State::FrameType,
189 
190             Type::Push => State::PushId,
191 
192             Type::QpackEncoder | Type::QpackDecoder => {
193                 self.remote_initialized = true;
194 
195                 State::QpackInstruction
196             },
197 
198             Type::Unknown => State::Drain,
199         };
200 
201         self.state_transition(state, 1, true)?;
202 
203         Ok(())
204     }
205 
206     /// Sets the push ID and transitions to the next state.
set_push_id(&mut self, _id: u64) -> Result<()>207     pub fn set_push_id(&mut self, _id: u64) -> Result<()> {
208         assert_eq!(self.state, State::PushId);
209 
210         // TODO: implement push ID.
211 
212         self.state_transition(State::FrameType, 1, true)?;
213 
214         Ok(())
215     }
216 
217     /// Sets the frame type and transitions to the next state.
set_frame_type(&mut self, ty: u64) -> Result<()>218     pub fn set_frame_type(&mut self, ty: u64) -> Result<()> {
219         assert_eq!(self.state, State::FrameType);
220 
221         // Only expect frames on Control, Request and Push streams.
222         match self.ty {
223             Some(Type::Control) => {
224                 // Control stream starts uninitialized and only SETTINGS is
225                 // accepted in that state. Other frames cause an error. Once
226                 // initialized, no more SETTINGS are permitted.
227                 match (ty, self.remote_initialized) {
228                     // Initialize control stream.
229                     (frame::SETTINGS_FRAME_TYPE_ID, false) =>
230                         self.remote_initialized = true,
231 
232                     // Non-SETTINGS frames not allowed on control stream
233                     // before initialization.
234                     (_, false) => return Err(Error::MissingSettings),
235 
236                     // Additional SETTINGS frame.
237                     (frame::SETTINGS_FRAME_TYPE_ID, true) =>
238                         return Err(Error::FrameUnexpected),
239 
240                     // Frames that can't be received on control stream
241                     // after initialization.
242                     (frame::DATA_FRAME_TYPE_ID, true) =>
243                         return Err(Error::FrameUnexpected),
244 
245                     (frame::HEADERS_FRAME_TYPE_ID, true) =>
246                         return Err(Error::FrameUnexpected),
247 
248                     (frame::PUSH_PROMISE_FRAME_TYPE_ID, true) =>
249                         return Err(Error::FrameUnexpected),
250 
251                     // All other frames are ignored after initialization.
252                     (_, true) => (),
253                 }
254             },
255 
256             Some(Type::Request) => {
257                 // Request stream starts uninitialized and only HEADERS
258                 // is accepted. Other frames cause an error.
259                 if !self.is_local {
260                     match (ty, self.remote_initialized) {
261                         (frame::HEADERS_FRAME_TYPE_ID, false) =>
262                             self.remote_initialized = true,
263 
264                         (frame::CANCEL_PUSH_FRAME_TYPE_ID, _) =>
265                             return Err(Error::FrameUnexpected),
266 
267                         (frame::SETTINGS_FRAME_TYPE_ID, _) =>
268                             return Err(Error::FrameUnexpected),
269 
270                         (frame::GOAWAY_FRAME_TYPE_ID, _) =>
271                             return Err(Error::FrameUnexpected),
272 
273                         (frame::MAX_PUSH_FRAME_TYPE_ID, _) =>
274                             return Err(Error::FrameUnexpected),
275 
276                         // All other frames can be ignored regardless of stream
277                         // state.
278                         (_, false) => (),
279 
280                         (_, true) => (),
281                     }
282                 }
283             },
284 
285             Some(Type::Push) => {
286                 match ty {
287                     // Frames that can never be received on request streams.
288                     frame::CANCEL_PUSH_FRAME_TYPE_ID =>
289                         return Err(Error::FrameUnexpected),
290 
291                     frame::SETTINGS_FRAME_TYPE_ID =>
292                         return Err(Error::FrameUnexpected),
293 
294                     frame::PUSH_PROMISE_FRAME_TYPE_ID =>
295                         return Err(Error::FrameUnexpected),
296 
297                     frame::GOAWAY_FRAME_TYPE_ID =>
298                         return Err(Error::FrameUnexpected),
299 
300                     frame::MAX_PUSH_FRAME_TYPE_ID =>
301                         return Err(Error::FrameUnexpected),
302 
303                     _ => (),
304                 }
305             },
306 
307             _ => return Err(Error::FrameUnexpected),
308         }
309 
310         self.frame_type = Some(ty);
311 
312         self.state_transition(State::FramePayloadLen, 1, true)?;
313 
314         Ok(())
315     }
316 
317     /// Sets the frame's payload length and transitions to the next state.
set_frame_payload_len(&mut self, len: u64) -> Result<()>318     pub fn set_frame_payload_len(&mut self, len: u64) -> Result<()> {
319         assert_eq!(self.state, State::FramePayloadLen);
320 
321         // Only expect frames on Control, Request and Push streams.
322         if self.ty == Some(Type::Control) ||
323             self.ty == Some(Type::Request) ||
324             self.ty == Some(Type::Push)
325         {
326             let (state, resize) = match self.frame_type {
327                 Some(frame::DATA_FRAME_TYPE_ID) => (State::Data, false),
328 
329                 _ => (State::FramePayload, true),
330             };
331 
332             self.state_transition(state, len as usize, resize)?;
333 
334             return Ok(());
335         }
336 
337         Err(Error::InternalError)
338     }
339 
340     /// Tries to fill the state buffer by reading data from the corresponding
341     /// transport stream.
342     ///
343     /// When not enough data can be read to complete the state, this returns
344     /// `Error::Done`.
try_fill_buffer( &mut self, conn: &mut crate::Connection, ) -> Result<()>345     pub fn try_fill_buffer(
346         &mut self, conn: &mut crate::Connection,
347     ) -> Result<()> {
348         let buf = &mut self.state_buf[self.state_off..self.state_len];
349 
350         let (read, _) = conn.stream_recv(self.id, buf)?;
351 
352         trace!(
353             "{} read {} bytes on stream {}",
354             conn.trace_id(),
355             read,
356             self.id,
357         );
358 
359         self.state_off += read;
360 
361         if !self.state_buffer_complete() {
362             return Err(Error::Done);
363         }
364 
365         Ok(())
366     }
367 
368     /// Initialize the local part of the stream.
initialize_local(&mut self)369     pub fn initialize_local(&mut self) {
370         self.local_initialized = true
371     }
372 
373     /// Whether the stream has been locally initialized.
local_initialized(&self) -> bool374     pub fn local_initialized(&self) -> bool {
375         self.local_initialized
376     }
377 
378     /// Tries to fill the state buffer by reading data from the given cursor.
379     ///
380     /// This is intended to replace `try_fill_buffer()` in tests, in order to
381     /// avoid having to setup a transport connection.
382     #[cfg(test)]
try_fill_buffer_for_tests( &mut self, stream: &mut std::io::Cursor<Vec<u8>>, ) -> Result<()>383     fn try_fill_buffer_for_tests(
384         &mut self, stream: &mut std::io::Cursor<Vec<u8>>,
385     ) -> Result<()> {
386         let buf = &mut self.state_buf[self.state_off..self.state_len];
387 
388         let read = std::io::Read::read(stream, buf).unwrap();
389 
390         self.state_off += read;
391 
392         if !self.state_buffer_complete() {
393             return Err(Error::Done);
394         }
395 
396         Ok(())
397     }
398 
399     /// Tries to parse a varint (including length) from the state buffer.
try_consume_varint(&mut self) -> Result<u64>400     pub fn try_consume_varint(&mut self) -> Result<u64> {
401         if self.state_off == 1 {
402             self.state_len = octets::varint_parse_len(self.state_buf[0]);
403             self.state_buf.resize(self.state_len, 0);
404         }
405 
406         // Return early if we don't have enough data in the state buffer to
407         // parse the whole varint.
408         if !self.state_buffer_complete() {
409             return Err(Error::Done);
410         }
411 
412         let varint = octets::Octets::with_slice(&self.state_buf).get_varint()?;
413 
414         Ok(varint)
415     }
416 
417     /// Tries to parse a frame from the state buffer.
try_consume_frame(&mut self) -> Result<frame::Frame>418     pub fn try_consume_frame(&mut self) -> Result<frame::Frame> {
419         // TODO: properly propagate frame parsing errors.
420         let frame = frame::Frame::from_bytes(
421             self.frame_type.unwrap(),
422             self.state_len as u64,
423             &self.state_buf,
424         )?;
425 
426         self.state_transition(State::FrameType, 1, true)?;
427 
428         Ok(frame)
429     }
430 
431     /// Tries to read DATA payload from the transport stream.
try_consume_data( &mut self, conn: &mut crate::Connection, out: &mut [u8], ) -> Result<usize>432     pub fn try_consume_data(
433         &mut self, conn: &mut crate::Connection, out: &mut [u8],
434     ) -> Result<usize> {
435         let left = std::cmp::min(out.len(), self.state_len - self.state_off);
436 
437         let (len, _) = conn.stream_recv(self.id, &mut out[..left])?;
438 
439         self.state_off += len;
440 
441         if self.state_buffer_complete() {
442             self.state_transition(State::FrameType, 1, true)?;
443         }
444 
445         Ok(len)
446     }
447 
448     /// Tries to read DATA payload from the given cursor.
449     ///
450     /// This is intended to replace `try_consume_data()` in tests, in order to
451     /// avoid having to setup a transport connection.
452     #[cfg(test)]
try_consume_data_for_tests( &mut self, stream: &mut std::io::Cursor<Vec<u8>>, out: &mut [u8], ) -> Result<usize>453     fn try_consume_data_for_tests(
454         &mut self, stream: &mut std::io::Cursor<Vec<u8>>, out: &mut [u8],
455     ) -> Result<usize> {
456         let left = std::cmp::min(out.len(), self.state_len - self.state_off);
457 
458         let len = std::io::Read::read(stream, &mut out[..left]).unwrap();
459 
460         self.state_off += len;
461 
462         if self.state_buffer_complete() {
463             self.state_transition(State::FrameType, 1, true)?;
464         }
465 
466         Ok(len)
467     }
468 
469     /// Returns true if the state buffer has enough data to complete the state.
state_buffer_complete(&self) -> bool470     fn state_buffer_complete(&self) -> bool {
471         self.state_off == self.state_len
472     }
473 
474     /// Transitions the stream to a new state, and optionally resets the state
475     /// buffer.
state_transition( &mut self, new_state: State, expected_len: usize, resize: bool, ) -> Result<()>476     fn state_transition(
477         &mut self, new_state: State, expected_len: usize, resize: bool,
478     ) -> Result<()> {
479         self.state = new_state;
480         self.state_off = 0;
481         self.state_len = expected_len;
482 
483         // Some states don't need the state buffer, so don't resize it if not
484         // necessary.
485         if resize {
486             // A peer can influence the size of the state buffer (e.g. with the
487             // payload size of a GREASE frame), so we need to limit the maximum
488             // size to avoid DoS.
489             if self.state_len > MAX_STATE_BUF_SIZE {
490                 return Err(Error::InternalError);
491             }
492 
493             self.state_buf.resize(self.state_len, 0);
494         }
495 
496         Ok(())
497     }
498 }
499 
500 #[cfg(test)]
501 mod tests {
502     use super::*;
503 
504     #[test]
505     /// Process incoming SETTINGS frame on control stream.
control_good()506     fn control_good() {
507         let mut stream = Stream::new(3, false);
508         assert_eq!(stream.state, State::StreamType);
509 
510         let mut d = vec![42; 40];
511         let mut b = octets::OctetsMut::with_slice(&mut d);
512 
513         let frame = frame::Frame::Settings {
514             max_header_list_size: Some(0),
515             qpack_max_table_capacity: Some(0),
516             qpack_blocked_streams: Some(0),
517             grease: None,
518         };
519 
520         b.put_varint(HTTP3_CONTROL_STREAM_TYPE_ID).unwrap();
521         frame.to_bytes(&mut b).unwrap();
522 
523         let mut cursor = std::io::Cursor::new(d);
524 
525         // Parse stream type.
526         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
527 
528         let stream_ty = stream.try_consume_varint().unwrap();
529         assert_eq!(stream_ty, HTTP3_CONTROL_STREAM_TYPE_ID);
530         stream
531             .set_ty(Type::deserialize(stream_ty).unwrap())
532             .unwrap();
533         assert_eq!(stream.state, State::FrameType);
534 
535         // Parse the SETTINGS frame type.
536         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
537 
538         let frame_ty = stream.try_consume_varint().unwrap();
539         assert_eq!(frame_ty, frame::SETTINGS_FRAME_TYPE_ID);
540 
541         stream.set_frame_type(frame_ty).unwrap();
542         assert_eq!(stream.state, State::FramePayloadLen);
543 
544         // Parse the SETTINGS frame payload length.
545         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
546 
547         let frame_payload_len = stream.try_consume_varint().unwrap();
548         assert_eq!(frame_payload_len, 6);
549         stream.set_frame_payload_len(frame_payload_len).unwrap();
550         assert_eq!(stream.state, State::FramePayload);
551 
552         // Parse the SETTINGS frame payload.
553         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
554 
555         assert_eq!(stream.try_consume_frame(), Ok(frame));
556         assert_eq!(stream.state, State::FrameType);
557     }
558 
559     #[test]
560     /// Process duplicate SETTINGS frame on control stream.
control_bad_multiple_settings()561     fn control_bad_multiple_settings() {
562         let mut stream = Stream::new(3, false);
563         assert_eq!(stream.state, State::StreamType);
564 
565         let mut d = vec![42; 40];
566         let mut b = octets::OctetsMut::with_slice(&mut d);
567 
568         let frame = frame::Frame::Settings {
569             max_header_list_size: Some(0),
570             qpack_max_table_capacity: Some(0),
571             qpack_blocked_streams: Some(0),
572             grease: None,
573         };
574 
575         b.put_varint(HTTP3_CONTROL_STREAM_TYPE_ID).unwrap();
576         frame.to_bytes(&mut b).unwrap();
577         frame.to_bytes(&mut b).unwrap();
578 
579         let mut cursor = std::io::Cursor::new(d);
580 
581         // Parse stream type.
582         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
583 
584         let stream_ty = stream.try_consume_varint().unwrap();
585         assert_eq!(stream_ty, HTTP3_CONTROL_STREAM_TYPE_ID);
586         stream
587             .set_ty(Type::deserialize(stream_ty).unwrap())
588             .unwrap();
589         assert_eq!(stream.state, State::FrameType);
590 
591         // Parse the SETTINGS frame type.
592         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
593 
594         let frame_ty = stream.try_consume_varint().unwrap();
595         assert_eq!(frame_ty, frame::SETTINGS_FRAME_TYPE_ID);
596 
597         stream.set_frame_type(frame_ty).unwrap();
598         assert_eq!(stream.state, State::FramePayloadLen);
599 
600         // Parse the SETTINGS frame payload length.
601         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
602 
603         let frame_payload_len = stream.try_consume_varint().unwrap();
604         assert_eq!(frame_payload_len, 6);
605         stream.set_frame_payload_len(frame_payload_len).unwrap();
606         assert_eq!(stream.state, State::FramePayload);
607 
608         // Parse the SETTINGS frame payload.
609         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
610 
611         assert_eq!(stream.try_consume_frame(), Ok(frame));
612         assert_eq!(stream.state, State::FrameType);
613 
614         // Parse the second SETTINGS frame type.
615         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
616 
617         let frame_ty = stream.try_consume_varint().unwrap();
618         assert_eq!(stream.set_frame_type(frame_ty), Err(Error::FrameUnexpected));
619     }
620 
621     #[test]
622     /// Process other frame before SETTINGS frame on control stream.
control_bad_late_settings()623     fn control_bad_late_settings() {
624         let mut stream = Stream::new(3, false);
625         assert_eq!(stream.state, State::StreamType);
626 
627         let mut d = vec![42; 40];
628         let mut b = octets::OctetsMut::with_slice(&mut d);
629 
630         let goaway = frame::Frame::GoAway { id: 0 };
631 
632         let settings = frame::Frame::Settings {
633             max_header_list_size: Some(0),
634             qpack_max_table_capacity: Some(0),
635             qpack_blocked_streams: Some(0),
636             grease: None,
637         };
638 
639         b.put_varint(HTTP3_CONTROL_STREAM_TYPE_ID).unwrap();
640         goaway.to_bytes(&mut b).unwrap();
641         settings.to_bytes(&mut b).unwrap();
642 
643         let mut cursor = std::io::Cursor::new(d);
644 
645         // Parse stream type.
646         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
647 
648         let stream_ty = stream.try_consume_varint().unwrap();
649         assert_eq!(stream_ty, HTTP3_CONTROL_STREAM_TYPE_ID);
650         stream
651             .set_ty(Type::deserialize(stream_ty).unwrap())
652             .unwrap();
653         assert_eq!(stream.state, State::FrameType);
654 
655         // Parse GOAWAY.
656         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
657 
658         let frame_ty = stream.try_consume_varint().unwrap();
659         assert_eq!(stream.set_frame_type(frame_ty), Err(Error::MissingSettings));
660     }
661 
662     #[test]
663     /// Process not-allowed frame on control stream.
control_bad_frame()664     fn control_bad_frame() {
665         let mut stream = Stream::new(3, false);
666         assert_eq!(stream.state, State::StreamType);
667 
668         let mut d = vec![42; 40];
669         let mut b = octets::OctetsMut::with_slice(&mut d);
670 
671         let header_block = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
672         let hdrs = frame::Frame::Headers { header_block };
673 
674         let settings = frame::Frame::Settings {
675             max_header_list_size: Some(0),
676             qpack_max_table_capacity: Some(0),
677             qpack_blocked_streams: Some(0),
678             grease: None,
679         };
680 
681         b.put_varint(HTTP3_CONTROL_STREAM_TYPE_ID).unwrap();
682         settings.to_bytes(&mut b).unwrap();
683         hdrs.to_bytes(&mut b).unwrap();
684 
685         let mut cursor = std::io::Cursor::new(d);
686 
687         // Parse stream type.
688         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
689 
690         let stream_ty = stream.try_consume_varint().unwrap();
691         stream
692             .set_ty(Type::deserialize(stream_ty).unwrap())
693             .unwrap();
694 
695         // Parse first SETTINGS frame.
696         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
697 
698         let frame_ty = stream.try_consume_varint().unwrap();
699         stream.set_frame_type(frame_ty).unwrap();
700 
701         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
702 
703         let frame_payload_len = stream.try_consume_varint().unwrap();
704         stream.set_frame_payload_len(frame_payload_len).unwrap();
705 
706         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
707 
708         assert!(stream.try_consume_frame().is_ok());
709 
710         // Parse HEADERS.
711         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
712 
713         let frame_ty = stream.try_consume_varint().unwrap();
714         assert_eq!(stream.set_frame_type(frame_ty), Err(Error::FrameUnexpected));
715     }
716 
717     #[test]
request_no_data()718     fn request_no_data() {
719         let mut stream = Stream::new(0, false);
720 
721         assert_eq!(stream.ty, Some(Type::Request));
722         assert_eq!(stream.state, State::FrameType);
723 
724         assert_eq!(stream.try_consume_varint(), Err(Error::Done));
725     }
726 
727     #[test]
request_good()728     fn request_good() {
729         let mut stream = Stream::new(0, false);
730 
731         let mut d = vec![42; 128];
732         let mut b = octets::OctetsMut::with_slice(&mut d);
733 
734         let header_block = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
735         let payload = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
736         let hdrs = frame::Frame::Headers { header_block };
737         let data = frame::Frame::Data {
738             payload: payload.clone(),
739         };
740 
741         hdrs.to_bytes(&mut b).unwrap();
742         data.to_bytes(&mut b).unwrap();
743 
744         let mut cursor = std::io::Cursor::new(d);
745 
746         // Parse the HEADERS frame type.
747         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
748 
749         let frame_ty = stream.try_consume_varint().unwrap();
750         assert_eq!(frame_ty, frame::HEADERS_FRAME_TYPE_ID);
751 
752         stream.set_frame_type(frame_ty).unwrap();
753         assert_eq!(stream.state, State::FramePayloadLen);
754 
755         // Parse the HEADERS frame payload length.
756         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
757 
758         let frame_payload_len = stream.try_consume_varint().unwrap();
759         assert_eq!(frame_payload_len, 12);
760 
761         stream.set_frame_payload_len(frame_payload_len).unwrap();
762         assert_eq!(stream.state, State::FramePayload);
763 
764         // Parse the HEADERS frame.
765         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
766 
767         assert_eq!(stream.try_consume_frame(), Ok(hdrs));
768         assert_eq!(stream.state, State::FrameType);
769 
770         // Parse the DATA frame type.
771         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
772 
773         let frame_ty = stream.try_consume_varint().unwrap();
774         assert_eq!(frame_ty, frame::DATA_FRAME_TYPE_ID);
775 
776         stream.set_frame_type(frame_ty).unwrap();
777         assert_eq!(stream.state, State::FramePayloadLen);
778 
779         // Parse the DATA frame payload length.
780         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
781 
782         let frame_payload_len = stream.try_consume_varint().unwrap();
783         assert_eq!(frame_payload_len, 12);
784 
785         stream.set_frame_payload_len(frame_payload_len).unwrap();
786         assert_eq!(stream.state, State::Data);
787 
788         // Parse the DATA payload.
789         let mut recv_buf = vec![0; payload.len()];
790         assert_eq!(
791             stream.try_consume_data_for_tests(&mut cursor, &mut recv_buf),
792             Ok(payload.len())
793         );
794         assert_eq!(payload, recv_buf);
795 
796         assert_eq!(stream.state, State::FrameType);
797     }
798 
799     #[test]
push_good()800     fn push_good() {
801         let mut stream = Stream::new(2, false);
802 
803         let mut d = vec![42; 128];
804         let mut b = octets::OctetsMut::with_slice(&mut d);
805 
806         let header_block = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
807         let payload = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
808         let hdrs = frame::Frame::Headers { header_block };
809         let data = frame::Frame::Data {
810             payload: payload.clone(),
811         };
812 
813         b.put_varint(HTTP3_PUSH_STREAM_TYPE_ID).unwrap();
814         b.put_varint(1).unwrap();
815         hdrs.to_bytes(&mut b).unwrap();
816         data.to_bytes(&mut b).unwrap();
817 
818         let mut cursor = std::io::Cursor::new(d);
819 
820         // Parse stream type.
821         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
822 
823         let stream_ty = stream.try_consume_varint().unwrap();
824         assert_eq!(stream_ty, HTTP3_PUSH_STREAM_TYPE_ID);
825         stream
826             .set_ty(Type::deserialize(stream_ty).unwrap())
827             .unwrap();
828         assert_eq!(stream.state, State::PushId);
829 
830         // Parse push ID.
831         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
832 
833         let push_id = stream.try_consume_varint().unwrap();
834         assert_eq!(push_id, 1);
835 
836         stream.set_push_id(push_id).unwrap();
837         assert_eq!(stream.state, State::FrameType);
838 
839         // Parse the HEADERS frame type.
840         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
841 
842         let frame_ty = stream.try_consume_varint().unwrap();
843         assert_eq!(frame_ty, frame::HEADERS_FRAME_TYPE_ID);
844 
845         stream.set_frame_type(frame_ty).unwrap();
846         assert_eq!(stream.state, State::FramePayloadLen);
847 
848         // Parse the HEADERS frame payload length.
849         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
850 
851         let frame_payload_len = stream.try_consume_varint().unwrap();
852         assert_eq!(frame_payload_len, 12);
853 
854         stream.set_frame_payload_len(frame_payload_len).unwrap();
855         assert_eq!(stream.state, State::FramePayload);
856 
857         // Parse the HEADERS frame.
858         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
859 
860         assert_eq!(stream.try_consume_frame(), Ok(hdrs));
861         assert_eq!(stream.state, State::FrameType);
862 
863         // Parse the DATA frame type.
864         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
865 
866         let frame_ty = stream.try_consume_varint().unwrap();
867         assert_eq!(frame_ty, frame::DATA_FRAME_TYPE_ID);
868 
869         stream.set_frame_type(frame_ty).unwrap();
870         assert_eq!(stream.state, State::FramePayloadLen);
871 
872         // Parse the DATA frame payload length.
873         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
874 
875         let frame_payload_len = stream.try_consume_varint().unwrap();
876         assert_eq!(frame_payload_len, 12);
877 
878         stream.set_frame_payload_len(frame_payload_len).unwrap();
879         assert_eq!(stream.state, State::Data);
880 
881         // Parse the DATA payload.
882         let mut recv_buf = vec![0; payload.len()];
883         assert_eq!(
884             stream.try_consume_data_for_tests(&mut cursor, &mut recv_buf),
885             Ok(payload.len())
886         );
887         assert_eq!(payload, recv_buf);
888 
889         assert_eq!(stream.state, State::FrameType);
890     }
891 
892     #[test]
grease()893     fn grease() {
894         let mut stream = Stream::new(2, false);
895 
896         let mut d = vec![42; 20];
897         let mut b = octets::OctetsMut::with_slice(&mut d);
898 
899         b.put_varint(33).unwrap();
900 
901         let mut cursor = std::io::Cursor::new(d);
902 
903         // Parse stream type.
904         stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
905 
906         let stream_ty = stream.try_consume_varint().unwrap();
907         assert_eq!(stream_ty, 33);
908         stream
909             .set_ty(Type::deserialize(stream_ty).unwrap())
910             .unwrap();
911         assert_eq!(stream.state, State::Drain);
912     }
913 }
914