1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 *
7 *      http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 
16 //! Defines a backing task to keep a HTTP/3 connection running
17 
18 use crate::boot_time;
19 use crate::boot_time::BootTime;
20 use crate::metrics::log_handshake_event_stats;
21 use log::{debug, info, warn};
22 use quiche::h3;
23 use std::collections::HashMap;
24 use std::default::Default;
25 use std::future;
26 use std::io;
27 use std::time::Instant;
28 use thiserror::Error;
29 use tokio::net::UdpSocket;
30 use tokio::select;
31 use tokio::sync::{mpsc, oneshot, watch};
32 
33 use super::Status;
34 
35 #[derive(Copy, Clone, Debug)]
36 pub enum Cause {
37     Probe,
38     Reconnect,
39     Retry,
40 }
41 
42 #[derive(Clone)]
43 #[allow(dead_code)]
44 pub enum HandshakeResult {
45     Unknown,
46     Success,
47     Timeout,
48     TlsFail,
49     ServerUnreachable,
50 }
51 
52 #[derive(Copy, Clone, Debug)]
53 pub struct HandshakeInfo {
54     pub cause: Cause,
55     pub sent_bytes: u64,
56     pub recv_bytes: u64,
57     pub elapsed: u128,
58     pub quic_version: u32,
59     pub network_type: u32,
60     pub private_dns_mode: u32,
61     pub session_hit_checker: bool,
62 }
63 
64 impl std::fmt::Display for HandshakeInfo {
65     #[inline]
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result66     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
67         write!(
68             f,
69             "cause={:?}, sent_bytes={}, recv_bytes={}, quic_version={}, session_hit_checker={}",
70             self.cause,
71             self.sent_bytes,
72             self.recv_bytes,
73             self.quic_version,
74             self.session_hit_checker
75         )
76     }
77 }
78 
79 #[derive(Error, Debug)]
80 pub enum Error {
81     #[error("network IO error: {0}")]
82     Network(#[from] io::Error),
83     #[error("QUIC error: {0}")]
84     Quic(#[from] quiche::Error),
85     #[error("HTTP/3 error: {0}")]
86     H3(#[from] h3::Error),
87     #[error("Response delivery error: {0}")]
88     StreamSend(#[from] mpsc::error::SendError<Stream>),
89     #[error("Connection closed")]
90     Closed,
91 }
92 
93 pub type Result<T> = std::result::Result<T, Error>;
94 
95 #[derive(Debug)]
96 /// HTTP/3 Request to be sent on the connection
97 pub struct Request {
98     /// Request headers
99     pub headers: Vec<h3::Header>,
100     /// Expiry time for the request, relative to `CLOCK_BOOTTIME`
101     pub expiry: Option<BootTime>,
102     /// Channel to send the response to
103     pub response_tx: oneshot::Sender<Stream>,
104 }
105 
106 #[derive(Debug)]
107 /// HTTP/3 Response
108 pub struct Stream {
109     /// Response headers
110     pub headers: Vec<h3::Header>,
111     /// Response body
112     pub data: Vec<u8>,
113     /// Error code if stream was reset
114     pub error: Option<u64>,
115 }
116 
117 impl Stream {
new(headers: Vec<h3::Header>) -> Self118     fn new(headers: Vec<h3::Header>) -> Self {
119         Self { headers, data: Vec::new(), error: None }
120     }
121 }
122 
123 const MAX_UDP_PACKET_SIZE: usize = 65536;
124 
125 struct Driver {
126     request_rx: mpsc::Receiver<Request>,
127     status_tx: watch::Sender<Status>,
128     quiche_conn: quiche::Connection,
129     socket: UdpSocket,
130     // This buffer is large, boxing it will keep it
131     // off the stack and prevent it being copied during
132     // moves of the driver.
133     buffer: Box<[u8; MAX_UDP_PACKET_SIZE]>,
134     net_id: u32,
135     // Used to check if the connection has entered closing or draining state. A connection can
136     // enter closing state if the sender of request_rx's channel has been dropped.
137     // Note that we can't check if a receiver is dead without potentially receiving a message, and
138     // if we poll on a dead receiver in a select! it will immediately return None. As a result, we
139     // need this to gate whether or not to include .recv() in our select!
140     closing: bool,
141     handshake_info: HandshakeInfo,
142     connection_start: Instant,
143 }
144 
145 struct H3Driver {
146     driver: Driver,
147     // h3_conn sometimes can't "fit" a request in its available windows.
148     // This value holds a peeked request in that case, waiting for
149     // transmission to become possible.
150     buffered_request: Option<Request>,
151     h3_conn: h3::Connection,
152     requests: HashMap<u64, Request>,
153     streams: HashMap<u64, Stream>,
154 }
155 
optional_timeout(timeout: Option<boot_time::Duration>, net_id: u32)156 async fn optional_timeout(timeout: Option<boot_time::Duration>, net_id: u32) {
157     info!("optional_timeout: timeout={:?}, network {}", timeout, net_id);
158     match timeout {
159         Some(timeout) => boot_time::sleep(timeout).await,
160         None => future::pending().await,
161     }
162 }
163 
164 /// Creates a future which when polled will handle events related to a HTTP/3 connection.
165 /// The returned error code will explain why the connection terminated.
drive( request_rx: mpsc::Receiver<Request>, status_tx: watch::Sender<Status>, quiche_conn: quiche::Connection, socket: UdpSocket, net_id: u32, handshake_info: HandshakeInfo, ) -> Result<()>166 pub async fn drive(
167     request_rx: mpsc::Receiver<Request>,
168     status_tx: watch::Sender<Status>,
169     quiche_conn: quiche::Connection,
170     socket: UdpSocket,
171     net_id: u32,
172     handshake_info: HandshakeInfo,
173 ) -> Result<()> {
174     Driver::new(request_rx, status_tx, quiche_conn, socket, net_id, handshake_info).drive().await
175 }
176 
177 impl Driver {
new( request_rx: mpsc::Receiver<Request>, status_tx: watch::Sender<Status>, quiche_conn: quiche::Connection, socket: UdpSocket, net_id: u32, handshake_info: HandshakeInfo, ) -> Self178     fn new(
179         request_rx: mpsc::Receiver<Request>,
180         status_tx: watch::Sender<Status>,
181         quiche_conn: quiche::Connection,
182         socket: UdpSocket,
183         net_id: u32,
184         handshake_info: HandshakeInfo,
185     ) -> Self {
186         Self {
187             request_rx,
188             status_tx,
189             quiche_conn,
190             socket,
191             buffer: Box::new([0; MAX_UDP_PACKET_SIZE]),
192             net_id,
193             closing: false,
194             handshake_info,
195             connection_start: Instant::now(),
196         }
197     }
198 
drive(mut self) -> Result<()>199     async fn drive(mut self) -> Result<()> {
200         self.connection_start = Instant::now();
201         // Prime connection
202         self.flush_tx().await?;
203         loop {
204             self = self.drive_once().await?
205         }
206     }
207 
handle_closed(&self) -> Result<()>208     fn handle_closed(&self) -> Result<()> {
209         if self.quiche_conn.is_closed() {
210             // TODO: Also log local_error() once Quiche 0.10.0 is available.
211             info!(
212                 "Connection {} closed on network {}, peer_error={:x?}",
213                 self.quiche_conn.trace_id(),
214                 self.net_id,
215                 self.quiche_conn.peer_error()
216             );
217             // We don't care if the receiver has hung up
218             let session = self.quiche_conn.session().map(<[_]>::to_vec);
219             let _ = self.status_tx.send(Status::Dead { session });
220             Err(Error::Closed)
221         } else {
222             Ok(())
223         }
224     }
225 
handle_draining(&mut self)226     fn handle_draining(&mut self) {
227         if self.quiche_conn.is_draining() && !self.closing {
228             // TODO: Also log local_error() once Quiche 0.10.0 is available.
229             info!(
230                 "Connection {} is draining on network {}, peer_error={:x?}",
231                 self.quiche_conn.trace_id(),
232                 self.net_id,
233                 self.quiche_conn.peer_error()
234             );
235             // We don't care if the receiver has hung up
236             let session = self.quiche_conn.session().map(<[_]>::to_vec);
237             let _ = self.status_tx.send(Status::Dead { session });
238 
239             self.request_rx.close();
240             // Drain the pending DNS requests from the queue to make their corresponding future
241             // tasks return some error quickly rather than timeout. However, the DNS requests
242             // that has been sent will still time out.
243             // TODO: re-issue the outstanding DNS requests, such as passing H3Driver.requests
244             // along with Status::Dead to the `Network` that can re-issue the DNS requests.
245             while self.request_rx.try_recv().is_ok() {}
246             self.closing = true;
247         }
248     }
249 
drive_once(mut self) -> Result<Self>250     async fn drive_once(mut self) -> Result<Self> {
251         // If the QUIC connection is live, but the HTTP/3 is not, try to bring it up
252         if self.quiche_conn.is_established() || self.quiche_conn.is_in_early_data() {
253             info!(
254                 "Connection {} established on network {}",
255                 self.quiche_conn.trace_id(),
256                 self.net_id
257             );
258             self.handshake_info.elapsed = self.connection_start.elapsed().as_micros();
259             // In Stats, sent_bytes implements the way that omits the length of padding data
260             // append to the datagram.
261             self.handshake_info.sent_bytes = self.quiche_conn.stats().sent_bytes;
262             self.handshake_info.recv_bytes = self.quiche_conn.stats().recv_bytes;
263             self.handshake_info.quic_version = quiche::PROTOCOL_VERSION;
264             log_handshake_event_stats(HandshakeResult::Success, self.handshake_info);
265             let h3_config = h3::Config::new()?;
266             let h3_conn = h3::Connection::with_transport(&mut self.quiche_conn, &h3_config)?;
267             self = H3Driver::new(self, h3_conn).drive().await?;
268             let _ = self.status_tx.send(Status::QUIC);
269         }
270 
271         let timer = optional_timeout(self.quiche_conn.timeout(), self.net_id);
272         select! {
273             // If a quiche timer would fire, call their callback
274             _ = timer => {
275                 info!("Driver: Timer expired on network {}", self.net_id);
276                 self.quiche_conn.on_timeout();
277 
278                 if !self.quiche_conn.is_established() && self.quiche_conn.is_closed() {
279                     info!(
280                         "Connection {} timeouted on network {}",
281                         self.quiche_conn.trace_id(),
282                         self.net_id
283                     );
284                     self.handshake_info.elapsed = self.connection_start.elapsed().as_micros();
285                     log_handshake_event_stats(
286                         HandshakeResult::Timeout,
287                         self.handshake_info,
288                     );
289                 }
290             }
291             // If we got packets from our peer, pass them to quiche
292             Ok((size, from)) = self.socket.recv_from(self.buffer.as_mut()) => {
293                 let local = self.socket.local_addr()?;
294                 self.quiche_conn.recv(&mut self.buffer[..size], quiche::RecvInfo { from, to: local })?;
295                 debug!("Received {} bytes on network {}", size, self.net_id);
296             }
297         };
298 
299         // Any of the actions in the select could require us to send packets to the peer
300         self.flush_tx().await?;
301 
302         // If the connection has entered draining state (the server is closing the connection),
303         // tell the status watcher not to use the connection. Besides, per Quiche document,
304         // the connection should not be dropped until is_closed() returns true.
305         // This tokio task will become unowned and get dropped when is_closed() returns true.
306         self.handle_draining();
307 
308         // If the connection has closed, tear down
309         self.handle_closed()?;
310 
311         Ok(self)
312     }
313 
flush_tx(&mut self) -> Result<()>314     async fn flush_tx(&mut self) -> Result<()> {
315         let send_buf = self.buffer.as_mut();
316         loop {
317             match self.quiche_conn.send(send_buf) {
318                 Err(quiche::Error::Done) => return Ok(()),
319                 Err(e) => return Err(e.into()),
320                 Ok((valid_len, send_info)) => {
321                     self.socket.send_to(&send_buf[..valid_len], send_info.to).await?;
322                     debug!("Sent {} bytes on network {}", valid_len, self.net_id);
323                 }
324             }
325         }
326     }
327 }
328 
329 impl H3Driver {
new(driver: Driver, h3_conn: h3::Connection) -> Self330     fn new(driver: Driver, h3_conn: h3::Connection) -> Self {
331         Self {
332             driver,
333             h3_conn,
334             requests: HashMap::new(),
335             streams: HashMap::new(),
336             buffered_request: None,
337         }
338     }
339 
drive(mut self) -> Result<Driver>340     async fn drive(mut self) -> Result<Driver> {
341         let _ = self.driver.status_tx.send(Status::H3);
342         loop {
343             if let Err(e) = self.drive_once().await {
344                 let session = self.driver.quiche_conn.session().map(<[_]>::to_vec);
345                 let _ = self.driver.status_tx.send(Status::Dead { session });
346                 return Err(e);
347             }
348         }
349     }
350 
drive_once(&mut self) -> Result<()>351     async fn drive_once(&mut self) -> Result<()> {
352         // We can't call self.driver.drive_once at the same time as
353         // self.driver.request_rx.recv() due to ownership
354         let timer = optional_timeout(self.driver.quiche_conn.timeout(), self.driver.net_id);
355         // If we've buffered a request (due to the connection being full)
356         // try to resend that first
357         if let Some(request) = self.buffered_request.take() {
358             self.handle_request(request)?;
359             self.driver.flush_tx().await?;
360         }
361         select! {
362             // Only attempt to enqueue new requests if we have no buffered request and aren't
363             // closing. Maybe limit the number of in-flight queries if the handshake
364             // still hasn't finished.
365             msg = self.driver.request_rx.recv(), if !self.driver.closing && self.buffered_request.is_none() => {
366                 match msg {
367                     Some(request) => self.handle_request(request)?,
368                     None => self.shutdown(true, b"DONE").await?,
369                 }
370             },
371             // If a quiche timer would fire, call their callback
372             _ = timer => {
373                 info!("H3Driver: Timer expired on network {}", self.driver.net_id);
374                 self.driver.quiche_conn.on_timeout()
375             }
376             // If we got packets from our peer, pass them to quiche
377             Ok((size, from)) = self.driver.socket.recv_from(self.driver.buffer.as_mut()) => {
378                 let local = self.driver.socket.local_addr()?;
379                 self.driver.quiche_conn.recv(&mut self.driver.buffer[..size], quiche::RecvInfo { from, to: local }).map(|_| ())?;
380 
381                 debug!("Received {} bytes on network {}", size, self.driver.net_id);
382             }
383         };
384 
385         // Any of the actions in the select could require us to send packets to the peer
386         self.driver.flush_tx().await?;
387 
388         // Process any incoming HTTP/3 events
389         self.flush_h3().await?;
390 
391         // If the connection has entered draining state (the server is closing the connection),
392         // tell the status watcher not to use the connection. Besides, per Quiche document,
393         // the connection should not be dropped until is_closed() returns true.
394         // This tokio task will become unowned and get dropped when is_closed() returns true.
395         self.driver.handle_draining();
396 
397         // If the connection has closed, tear down
398         self.driver.handle_closed()
399     }
400 
handle_request(&mut self, request: Request) -> Result<()>401     fn handle_request(&mut self, request: Request) -> Result<()> {
402         info!("Handling DNS request on network {}, is_in_early_data={}, stats=[{:?}], peer_streams_left_bidi={}, peer_streams_left_uni={}",
403                 self.driver.net_id, self.driver.quiche_conn.is_in_early_data(), self.driver.quiche_conn.stats(), self.driver.quiche_conn.peer_streams_left_bidi(), self.driver.quiche_conn.peer_streams_left_uni());
404         // If the request has already timed out, don't issue it to the server.
405         if let Some(expiry) = request.expiry {
406             if BootTime::now() > expiry {
407                 warn!("Abandoning expired DNS request");
408                 return Ok(());
409             }
410         }
411         let stream_id =
412             // If h3_conn says the stream is blocked, this error is recoverable just by trying
413             // again once the stream has made progress. Buffer the request for a later retry.
414             match self.h3_conn.send_request(&mut self.driver.quiche_conn, &request.headers, true) {
415                 Err(h3::Error::StreamBlocked) | Err(h3::Error::TransportError(quiche::Error::StreamLimit)) => {
416                     // We only call handle_request on a value that has just come out of
417                     // buffered_request, or when buffered_request is empty. This assert just
418                     // validates that we don't break that assumption later, as it could result in
419                     // requests being dropped on the floor under high load.
420                     info!("Stream has become blocked, buffering one request.");
421                     assert!(self.buffered_request.is_none());
422                     self.buffered_request = Some(request);
423                     return Ok(())
424                 }
425                 result => result?,
426             };
427         info!(
428             "Handled DNS request: stream ID {}, network {}, stream_capacity={:?}",
429             stream_id,
430             self.driver.net_id,
431             self.driver.quiche_conn.stream_capacity(stream_id)
432         );
433         self.requests.insert(stream_id, request);
434         Ok(())
435     }
436 
recv_body(&mut self, stream_id: u64) -> Result<()>437     async fn recv_body(&mut self, stream_id: u64) -> Result<()> {
438         const STREAM_READ_CHUNK: usize = 4096;
439         if let Some(stream) = self.streams.get_mut(&stream_id) {
440             loop {
441                 let base_len = stream.data.len();
442                 stream.data.resize(base_len + STREAM_READ_CHUNK, 0);
443                 match self.h3_conn.recv_body(
444                     &mut self.driver.quiche_conn,
445                     stream_id,
446                     &mut stream.data[base_len..],
447                 ) {
448                     Err(h3::Error::Done) => {
449                         stream.data.truncate(base_len);
450                         return Ok(());
451                     }
452                     Err(e) => {
453                         info!("recv_body: Error={:?}", e);
454                         stream.data.truncate(base_len);
455                         return Err(e.into());
456                     }
457                     Ok(recvd) => {
458                         stream.data.truncate(base_len + recvd);
459                         info!(
460                             "Got {} bytes of response data from stream ID {} on network {}",
461                             recvd, stream_id, self.driver.net_id
462                         );
463                     }
464                 }
465             }
466         } else {
467             warn!("Received body for untracked stream ID {}", stream_id);
468         }
469         Ok(())
470     }
471 
discard_datagram(&mut self, _flow_id: u64) -> Result<()>472     fn discard_datagram(&mut self, _flow_id: u64) -> Result<()> {
473         loop {
474             match self.h3_conn.recv_dgram(&mut self.driver.quiche_conn, self.driver.buffer.as_mut())
475             {
476                 Err(h3::Error::Done) => return Ok(()),
477                 Err(e) => return Err(e.into()),
478                 _ => (),
479             }
480         }
481     }
482 
flush_h3(&mut self) -> Result<()>483     async fn flush_h3(&mut self) -> Result<()> {
484         loop {
485             match self.h3_conn.poll(&mut self.driver.quiche_conn) {
486                 Err(h3::Error::Done) => return Ok(()),
487                 Err(e) => return Err(e.into()),
488                 Ok((stream_id, event)) => self.process_h3_event(stream_id, event).await?,
489             }
490         }
491     }
492 
process_h3_event(&mut self, stream_id: u64, event: h3::Event) -> Result<()>493     async fn process_h3_event(&mut self, stream_id: u64, event: h3::Event) -> Result<()> {
494         if !self.requests.contains_key(&stream_id) {
495             warn!("Received event {:?} for stream_id {} without a request.", event, stream_id);
496         }
497         match event {
498             h3::Event::Headers { list, has_body } => {
499                 debug!(
500                     "process_h3_event: h3::Event::Headers on stream ID {}, network {}",
501                     stream_id, self.driver.net_id
502                 );
503                 let stream = Stream::new(list);
504                 if self.streams.insert(stream_id, stream).is_some() {
505                     warn!("Re-using stream ID {} before it was completed.", stream_id)
506                 }
507                 if !has_body {
508                     self.respond(stream_id);
509                 }
510             }
511             h3::Event::Data => {
512                 debug!(
513                     "process_h3_event: h3::Event::Data on stream ID {}, network {}",
514                     stream_id, self.driver.net_id
515                 );
516                 self.recv_body(stream_id).await?;
517             }
518             h3::Event::Finished => {
519                 debug!(
520                     "process_h3_event: h3::Event::Finished on stream ID {}, network {}",
521                     stream_id, self.driver.net_id
522                 );
523                 self.respond(stream_id)
524             }
525             h3::Event::Reset(e) => {
526                 warn!(
527                     "process_h3_event: h3::Event::Reset with error code {} on stream ID {}, network {}",
528                     e, stream_id, self.driver.net_id
529                 );
530                 if let Some(stream) = self.streams.get_mut(&stream_id) {
531                     stream.error = Some(e)
532                 }
533                 self.respond(stream_id);
534             }
535             h3::Event::Datagram => {
536                 warn!("Unexpected Datagram received");
537                 // We don't care if something went wrong with the datagram, we didn't
538                 // want it anyways.
539                 let _ = self.discard_datagram(stream_id);
540             }
541             h3::Event::PriorityUpdate => {
542                 debug!(
543                     "process_h3_event: h3::Event::PriorityUpdate on stream ID {}, network {}",
544                     stream_id, self.driver.net_id
545                 );
546                 // It tells us that PRIORITY_UPDATE frame is received, but we are not
547                 // using it in our code currently. No-op should be fine.
548             }
549             h3::Event::GoAway => self.shutdown(false, b"SERVER GOAWAY").await?,
550         }
551         Ok(())
552     }
553 
shutdown(&mut self, send_goaway: bool, msg: &[u8]) -> Result<()>554     async fn shutdown(&mut self, send_goaway: bool, msg: &[u8]) -> Result<()> {
555         info!(
556             "Closing connection {} on network {} with msg {:?}",
557             self.driver.quiche_conn.trace_id(),
558             self.driver.net_id,
559             msg
560         );
561         self.driver.request_rx.close();
562         while self.driver.request_rx.recv().await.is_some() {}
563         self.driver.closing = true;
564         if send_goaway {
565             self.h3_conn.send_goaway(&mut self.driver.quiche_conn, 0)?;
566         }
567         if self.driver.quiche_conn.close(true, 0, msg).is_err() {
568             warn!("Trying to close already closed QUIC connection");
569         }
570         Ok(())
571     }
572 
respond(&mut self, stream_id: u64)573     fn respond(&mut self, stream_id: u64) {
574         match (self.streams.remove(&stream_id), self.requests.remove(&stream_id)) {
575             (Some(stream), Some(request)) => {
576                 debug!(
577                     "Sending answer back to resolv, stream ID: {}, network {}",
578                     stream_id, self.driver.net_id
579                 );
580                 // We don't care about the error, because it means the requestor has left.
581                 let _ = request.response_tx.send(stream);
582             }
583             (None, _) => warn!("Tried to deliver untracked stream {}", stream_id),
584             (_, None) => warn!("Tried to deliver stream {} to untracked requestor", stream_id),
585         }
586     }
587 }
588