1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 //! Defines a backing task to keep a HTTP/3 connection running
17
18 use crate::boot_time;
19 use crate::boot_time::BootTime;
20 use crate::metrics::log_handshake_event_stats;
21 use log::{debug, info, warn};
22 use quiche::h3;
23 use std::collections::HashMap;
24 use std::default::Default;
25 use std::future;
26 use std::io;
27 use std::time::Instant;
28 use thiserror::Error;
29 use tokio::net::UdpSocket;
30 use tokio::select;
31 use tokio::sync::{mpsc, oneshot, watch};
32
33 use super::Status;
34
35 #[derive(Copy, Clone, Debug)]
36 pub enum Cause {
37 Probe,
38 Reconnect,
39 Retry,
40 }
41
42 #[derive(Clone)]
43 #[allow(dead_code)]
44 pub enum HandshakeResult {
45 Unknown,
46 Success,
47 Timeout,
48 TlsFail,
49 ServerUnreachable,
50 }
51
52 #[derive(Copy, Clone, Debug)]
53 pub struct HandshakeInfo {
54 pub cause: Cause,
55 pub sent_bytes: u64,
56 pub recv_bytes: u64,
57 pub elapsed: u128,
58 pub quic_version: u32,
59 pub network_type: u32,
60 pub private_dns_mode: u32,
61 pub session_hit_checker: bool,
62 }
63
64 impl std::fmt::Display for HandshakeInfo {
65 #[inline]
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result66 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
67 write!(
68 f,
69 "cause={:?}, sent_bytes={}, recv_bytes={}, quic_version={}, session_hit_checker={}",
70 self.cause,
71 self.sent_bytes,
72 self.recv_bytes,
73 self.quic_version,
74 self.session_hit_checker
75 )
76 }
77 }
78
79 #[derive(Error, Debug)]
80 pub enum Error {
81 #[error("network IO error: {0}")]
82 Network(#[from] io::Error),
83 #[error("QUIC error: {0}")]
84 Quic(#[from] quiche::Error),
85 #[error("HTTP/3 error: {0}")]
86 H3(#[from] h3::Error),
87 #[error("Response delivery error: {0}")]
88 StreamSend(#[from] mpsc::error::SendError<Stream>),
89 #[error("Connection closed")]
90 Closed,
91 }
92
93 pub type Result<T> = std::result::Result<T, Error>;
94
95 #[derive(Debug)]
96 /// HTTP/3 Request to be sent on the connection
97 pub struct Request {
98 /// Request headers
99 pub headers: Vec<h3::Header>,
100 /// Expiry time for the request, relative to `CLOCK_BOOTTIME`
101 pub expiry: Option<BootTime>,
102 /// Channel to send the response to
103 pub response_tx: oneshot::Sender<Stream>,
104 }
105
106 #[derive(Debug)]
107 /// HTTP/3 Response
108 pub struct Stream {
109 /// Response headers
110 pub headers: Vec<h3::Header>,
111 /// Response body
112 pub data: Vec<u8>,
113 /// Error code if stream was reset
114 pub error: Option<u64>,
115 }
116
117 impl Stream {
new(headers: Vec<h3::Header>) -> Self118 fn new(headers: Vec<h3::Header>) -> Self {
119 Self { headers, data: Vec::new(), error: None }
120 }
121 }
122
123 const MAX_UDP_PACKET_SIZE: usize = 65536;
124
125 struct Driver {
126 request_rx: mpsc::Receiver<Request>,
127 status_tx: watch::Sender<Status>,
128 quiche_conn: quiche::Connection,
129 socket: UdpSocket,
130 // This buffer is large, boxing it will keep it
131 // off the stack and prevent it being copied during
132 // moves of the driver.
133 buffer: Box<[u8; MAX_UDP_PACKET_SIZE]>,
134 net_id: u32,
135 // Used to check if the connection has entered closing or draining state. A connection can
136 // enter closing state if the sender of request_rx's channel has been dropped.
137 // Note that we can't check if a receiver is dead without potentially receiving a message, and
138 // if we poll on a dead receiver in a select! it will immediately return None. As a result, we
139 // need this to gate whether or not to include .recv() in our select!
140 closing: bool,
141 handshake_info: HandshakeInfo,
142 connection_start: Instant,
143 }
144
145 struct H3Driver {
146 driver: Driver,
147 // h3_conn sometimes can't "fit" a request in its available windows.
148 // This value holds a peeked request in that case, waiting for
149 // transmission to become possible.
150 buffered_request: Option<Request>,
151 h3_conn: h3::Connection,
152 requests: HashMap<u64, Request>,
153 streams: HashMap<u64, Stream>,
154 }
155
optional_timeout(timeout: Option<boot_time::Duration>, net_id: u32)156 async fn optional_timeout(timeout: Option<boot_time::Duration>, net_id: u32) {
157 info!("optional_timeout: timeout={:?}, network {}", timeout, net_id);
158 match timeout {
159 Some(timeout) => boot_time::sleep(timeout).await,
160 None => future::pending().await,
161 }
162 }
163
164 /// Creates a future which when polled will handle events related to a HTTP/3 connection.
165 /// The returned error code will explain why the connection terminated.
drive( request_rx: mpsc::Receiver<Request>, status_tx: watch::Sender<Status>, quiche_conn: quiche::Connection, socket: UdpSocket, net_id: u32, handshake_info: HandshakeInfo, ) -> Result<()>166 pub async fn drive(
167 request_rx: mpsc::Receiver<Request>,
168 status_tx: watch::Sender<Status>,
169 quiche_conn: quiche::Connection,
170 socket: UdpSocket,
171 net_id: u32,
172 handshake_info: HandshakeInfo,
173 ) -> Result<()> {
174 Driver::new(request_rx, status_tx, quiche_conn, socket, net_id, handshake_info).drive().await
175 }
176
177 impl Driver {
new( request_rx: mpsc::Receiver<Request>, status_tx: watch::Sender<Status>, quiche_conn: quiche::Connection, socket: UdpSocket, net_id: u32, handshake_info: HandshakeInfo, ) -> Self178 fn new(
179 request_rx: mpsc::Receiver<Request>,
180 status_tx: watch::Sender<Status>,
181 quiche_conn: quiche::Connection,
182 socket: UdpSocket,
183 net_id: u32,
184 handshake_info: HandshakeInfo,
185 ) -> Self {
186 Self {
187 request_rx,
188 status_tx,
189 quiche_conn,
190 socket,
191 buffer: Box::new([0; MAX_UDP_PACKET_SIZE]),
192 net_id,
193 closing: false,
194 handshake_info,
195 connection_start: Instant::now(),
196 }
197 }
198
drive(mut self) -> Result<()>199 async fn drive(mut self) -> Result<()> {
200 self.connection_start = Instant::now();
201 // Prime connection
202 self.flush_tx().await?;
203 loop {
204 self = self.drive_once().await?
205 }
206 }
207
handle_closed(&self) -> Result<()>208 fn handle_closed(&self) -> Result<()> {
209 if self.quiche_conn.is_closed() {
210 // TODO: Also log local_error() once Quiche 0.10.0 is available.
211 info!(
212 "Connection {} closed on network {}, peer_error={:x?}",
213 self.quiche_conn.trace_id(),
214 self.net_id,
215 self.quiche_conn.peer_error()
216 );
217 // We don't care if the receiver has hung up
218 let session = self.quiche_conn.session().map(<[_]>::to_vec);
219 let _ = self.status_tx.send(Status::Dead { session });
220 Err(Error::Closed)
221 } else {
222 Ok(())
223 }
224 }
225
handle_draining(&mut self)226 fn handle_draining(&mut self) {
227 if self.quiche_conn.is_draining() && !self.closing {
228 // TODO: Also log local_error() once Quiche 0.10.0 is available.
229 info!(
230 "Connection {} is draining on network {}, peer_error={:x?}",
231 self.quiche_conn.trace_id(),
232 self.net_id,
233 self.quiche_conn.peer_error()
234 );
235 // We don't care if the receiver has hung up
236 let session = self.quiche_conn.session().map(<[_]>::to_vec);
237 let _ = self.status_tx.send(Status::Dead { session });
238
239 self.request_rx.close();
240 // Drain the pending DNS requests from the queue to make their corresponding future
241 // tasks return some error quickly rather than timeout. However, the DNS requests
242 // that has been sent will still time out.
243 // TODO: re-issue the outstanding DNS requests, such as passing H3Driver.requests
244 // along with Status::Dead to the `Network` that can re-issue the DNS requests.
245 while self.request_rx.try_recv().is_ok() {}
246 self.closing = true;
247 }
248 }
249
drive_once(mut self) -> Result<Self>250 async fn drive_once(mut self) -> Result<Self> {
251 // If the QUIC connection is live, but the HTTP/3 is not, try to bring it up
252 if self.quiche_conn.is_established() || self.quiche_conn.is_in_early_data() {
253 info!(
254 "Connection {} established on network {}",
255 self.quiche_conn.trace_id(),
256 self.net_id
257 );
258 self.handshake_info.elapsed = self.connection_start.elapsed().as_micros();
259 // In Stats, sent_bytes implements the way that omits the length of padding data
260 // append to the datagram.
261 self.handshake_info.sent_bytes = self.quiche_conn.stats().sent_bytes;
262 self.handshake_info.recv_bytes = self.quiche_conn.stats().recv_bytes;
263 self.handshake_info.quic_version = quiche::PROTOCOL_VERSION;
264 log_handshake_event_stats(HandshakeResult::Success, self.handshake_info);
265 let h3_config = h3::Config::new()?;
266 let h3_conn = h3::Connection::with_transport(&mut self.quiche_conn, &h3_config)?;
267 self = H3Driver::new(self, h3_conn).drive().await?;
268 let _ = self.status_tx.send(Status::QUIC);
269 }
270
271 let timer = optional_timeout(self.quiche_conn.timeout(), self.net_id);
272 select! {
273 // If a quiche timer would fire, call their callback
274 _ = timer => {
275 info!("Driver: Timer expired on network {}", self.net_id);
276 self.quiche_conn.on_timeout();
277
278 if !self.quiche_conn.is_established() && self.quiche_conn.is_closed() {
279 info!(
280 "Connection {} timeouted on network {}",
281 self.quiche_conn.trace_id(),
282 self.net_id
283 );
284 self.handshake_info.elapsed = self.connection_start.elapsed().as_micros();
285 log_handshake_event_stats(
286 HandshakeResult::Timeout,
287 self.handshake_info,
288 );
289 }
290 }
291 // If we got packets from our peer, pass them to quiche
292 Ok((size, from)) = self.socket.recv_from(self.buffer.as_mut()) => {
293 let local = self.socket.local_addr()?;
294 self.quiche_conn.recv(&mut self.buffer[..size], quiche::RecvInfo { from, to: local })?;
295 debug!("Received {} bytes on network {}", size, self.net_id);
296 }
297 };
298
299 // Any of the actions in the select could require us to send packets to the peer
300 self.flush_tx().await?;
301
302 // If the connection has entered draining state (the server is closing the connection),
303 // tell the status watcher not to use the connection. Besides, per Quiche document,
304 // the connection should not be dropped until is_closed() returns true.
305 // This tokio task will become unowned and get dropped when is_closed() returns true.
306 self.handle_draining();
307
308 // If the connection has closed, tear down
309 self.handle_closed()?;
310
311 Ok(self)
312 }
313
flush_tx(&mut self) -> Result<()>314 async fn flush_tx(&mut self) -> Result<()> {
315 let send_buf = self.buffer.as_mut();
316 loop {
317 match self.quiche_conn.send(send_buf) {
318 Err(quiche::Error::Done) => return Ok(()),
319 Err(e) => return Err(e.into()),
320 Ok((valid_len, send_info)) => {
321 self.socket.send_to(&send_buf[..valid_len], send_info.to).await?;
322 debug!("Sent {} bytes on network {}", valid_len, self.net_id);
323 }
324 }
325 }
326 }
327 }
328
329 impl H3Driver {
new(driver: Driver, h3_conn: h3::Connection) -> Self330 fn new(driver: Driver, h3_conn: h3::Connection) -> Self {
331 Self {
332 driver,
333 h3_conn,
334 requests: HashMap::new(),
335 streams: HashMap::new(),
336 buffered_request: None,
337 }
338 }
339
drive(mut self) -> Result<Driver>340 async fn drive(mut self) -> Result<Driver> {
341 let _ = self.driver.status_tx.send(Status::H3);
342 loop {
343 if let Err(e) = self.drive_once().await {
344 let session = self.driver.quiche_conn.session().map(<[_]>::to_vec);
345 let _ = self.driver.status_tx.send(Status::Dead { session });
346 return Err(e);
347 }
348 }
349 }
350
drive_once(&mut self) -> Result<()>351 async fn drive_once(&mut self) -> Result<()> {
352 // We can't call self.driver.drive_once at the same time as
353 // self.driver.request_rx.recv() due to ownership
354 let timer = optional_timeout(self.driver.quiche_conn.timeout(), self.driver.net_id);
355 // If we've buffered a request (due to the connection being full)
356 // try to resend that first
357 if let Some(request) = self.buffered_request.take() {
358 self.handle_request(request)?;
359 self.driver.flush_tx().await?;
360 }
361 select! {
362 // Only attempt to enqueue new requests if we have no buffered request and aren't
363 // closing. Maybe limit the number of in-flight queries if the handshake
364 // still hasn't finished.
365 msg = self.driver.request_rx.recv(), if !self.driver.closing && self.buffered_request.is_none() => {
366 match msg {
367 Some(request) => self.handle_request(request)?,
368 None => self.shutdown(true, b"DONE").await?,
369 }
370 },
371 // If a quiche timer would fire, call their callback
372 _ = timer => {
373 info!("H3Driver: Timer expired on network {}", self.driver.net_id);
374 self.driver.quiche_conn.on_timeout()
375 }
376 // If we got packets from our peer, pass them to quiche
377 Ok((size, from)) = self.driver.socket.recv_from(self.driver.buffer.as_mut()) => {
378 let local = self.driver.socket.local_addr()?;
379 self.driver.quiche_conn.recv(&mut self.driver.buffer[..size], quiche::RecvInfo { from, to: local }).map(|_| ())?;
380
381 debug!("Received {} bytes on network {}", size, self.driver.net_id);
382 }
383 };
384
385 // Any of the actions in the select could require us to send packets to the peer
386 self.driver.flush_tx().await?;
387
388 // Process any incoming HTTP/3 events
389 self.flush_h3().await?;
390
391 // If the connection has entered draining state (the server is closing the connection),
392 // tell the status watcher not to use the connection. Besides, per Quiche document,
393 // the connection should not be dropped until is_closed() returns true.
394 // This tokio task will become unowned and get dropped when is_closed() returns true.
395 self.driver.handle_draining();
396
397 // If the connection has closed, tear down
398 self.driver.handle_closed()
399 }
400
handle_request(&mut self, request: Request) -> Result<()>401 fn handle_request(&mut self, request: Request) -> Result<()> {
402 info!("Handling DNS request on network {}, is_in_early_data={}, stats=[{:?}], peer_streams_left_bidi={}, peer_streams_left_uni={}",
403 self.driver.net_id, self.driver.quiche_conn.is_in_early_data(), self.driver.quiche_conn.stats(), self.driver.quiche_conn.peer_streams_left_bidi(), self.driver.quiche_conn.peer_streams_left_uni());
404 // If the request has already timed out, don't issue it to the server.
405 if let Some(expiry) = request.expiry {
406 if BootTime::now() > expiry {
407 warn!("Abandoning expired DNS request");
408 return Ok(());
409 }
410 }
411 let stream_id =
412 // If h3_conn says the stream is blocked, this error is recoverable just by trying
413 // again once the stream has made progress. Buffer the request for a later retry.
414 match self.h3_conn.send_request(&mut self.driver.quiche_conn, &request.headers, true) {
415 Err(h3::Error::StreamBlocked) | Err(h3::Error::TransportError(quiche::Error::StreamLimit)) => {
416 // We only call handle_request on a value that has just come out of
417 // buffered_request, or when buffered_request is empty. This assert just
418 // validates that we don't break that assumption later, as it could result in
419 // requests being dropped on the floor under high load.
420 info!("Stream has become blocked, buffering one request.");
421 assert!(self.buffered_request.is_none());
422 self.buffered_request = Some(request);
423 return Ok(())
424 }
425 result => result?,
426 };
427 info!(
428 "Handled DNS request: stream ID {}, network {}, stream_capacity={:?}",
429 stream_id,
430 self.driver.net_id,
431 self.driver.quiche_conn.stream_capacity(stream_id)
432 );
433 self.requests.insert(stream_id, request);
434 Ok(())
435 }
436
recv_body(&mut self, stream_id: u64) -> Result<()>437 async fn recv_body(&mut self, stream_id: u64) -> Result<()> {
438 const STREAM_READ_CHUNK: usize = 4096;
439 if let Some(stream) = self.streams.get_mut(&stream_id) {
440 loop {
441 let base_len = stream.data.len();
442 stream.data.resize(base_len + STREAM_READ_CHUNK, 0);
443 match self.h3_conn.recv_body(
444 &mut self.driver.quiche_conn,
445 stream_id,
446 &mut stream.data[base_len..],
447 ) {
448 Err(h3::Error::Done) => {
449 stream.data.truncate(base_len);
450 return Ok(());
451 }
452 Err(e) => {
453 info!("recv_body: Error={:?}", e);
454 stream.data.truncate(base_len);
455 return Err(e.into());
456 }
457 Ok(recvd) => {
458 stream.data.truncate(base_len + recvd);
459 info!(
460 "Got {} bytes of response data from stream ID {} on network {}",
461 recvd, stream_id, self.driver.net_id
462 );
463 }
464 }
465 }
466 } else {
467 warn!("Received body for untracked stream ID {}", stream_id);
468 }
469 Ok(())
470 }
471
discard_datagram(&mut self, _flow_id: u64) -> Result<()>472 fn discard_datagram(&mut self, _flow_id: u64) -> Result<()> {
473 loop {
474 match self.h3_conn.recv_dgram(&mut self.driver.quiche_conn, self.driver.buffer.as_mut())
475 {
476 Err(h3::Error::Done) => return Ok(()),
477 Err(e) => return Err(e.into()),
478 _ => (),
479 }
480 }
481 }
482
flush_h3(&mut self) -> Result<()>483 async fn flush_h3(&mut self) -> Result<()> {
484 loop {
485 match self.h3_conn.poll(&mut self.driver.quiche_conn) {
486 Err(h3::Error::Done) => return Ok(()),
487 Err(e) => return Err(e.into()),
488 Ok((stream_id, event)) => self.process_h3_event(stream_id, event).await?,
489 }
490 }
491 }
492
process_h3_event(&mut self, stream_id: u64, event: h3::Event) -> Result<()>493 async fn process_h3_event(&mut self, stream_id: u64, event: h3::Event) -> Result<()> {
494 if !self.requests.contains_key(&stream_id) {
495 warn!("Received event {:?} for stream_id {} without a request.", event, stream_id);
496 }
497 match event {
498 h3::Event::Headers { list, has_body } => {
499 debug!(
500 "process_h3_event: h3::Event::Headers on stream ID {}, network {}",
501 stream_id, self.driver.net_id
502 );
503 let stream = Stream::new(list);
504 if self.streams.insert(stream_id, stream).is_some() {
505 warn!("Re-using stream ID {} before it was completed.", stream_id)
506 }
507 if !has_body {
508 self.respond(stream_id);
509 }
510 }
511 h3::Event::Data => {
512 debug!(
513 "process_h3_event: h3::Event::Data on stream ID {}, network {}",
514 stream_id, self.driver.net_id
515 );
516 self.recv_body(stream_id).await?;
517 }
518 h3::Event::Finished => {
519 debug!(
520 "process_h3_event: h3::Event::Finished on stream ID {}, network {}",
521 stream_id, self.driver.net_id
522 );
523 self.respond(stream_id)
524 }
525 h3::Event::Reset(e) => {
526 warn!(
527 "process_h3_event: h3::Event::Reset with error code {} on stream ID {}, network {}",
528 e, stream_id, self.driver.net_id
529 );
530 if let Some(stream) = self.streams.get_mut(&stream_id) {
531 stream.error = Some(e)
532 }
533 self.respond(stream_id);
534 }
535 h3::Event::Datagram => {
536 warn!("Unexpected Datagram received");
537 // We don't care if something went wrong with the datagram, we didn't
538 // want it anyways.
539 let _ = self.discard_datagram(stream_id);
540 }
541 h3::Event::PriorityUpdate => {
542 debug!(
543 "process_h3_event: h3::Event::PriorityUpdate on stream ID {}, network {}",
544 stream_id, self.driver.net_id
545 );
546 // It tells us that PRIORITY_UPDATE frame is received, but we are not
547 // using it in our code currently. No-op should be fine.
548 }
549 h3::Event::GoAway => self.shutdown(false, b"SERVER GOAWAY").await?,
550 }
551 Ok(())
552 }
553
shutdown(&mut self, send_goaway: bool, msg: &[u8]) -> Result<()>554 async fn shutdown(&mut self, send_goaway: bool, msg: &[u8]) -> Result<()> {
555 info!(
556 "Closing connection {} on network {} with msg {:?}",
557 self.driver.quiche_conn.trace_id(),
558 self.driver.net_id,
559 msg
560 );
561 self.driver.request_rx.close();
562 while self.driver.request_rx.recv().await.is_some() {}
563 self.driver.closing = true;
564 if send_goaway {
565 self.h3_conn.send_goaway(&mut self.driver.quiche_conn, 0)?;
566 }
567 if self.driver.quiche_conn.close(true, 0, msg).is_err() {
568 warn!("Trying to close already closed QUIC connection");
569 }
570 Ok(())
571 }
572
respond(&mut self, stream_id: u64)573 fn respond(&mut self, stream_id: u64) {
574 match (self.streams.remove(&stream_id), self.requests.remove(&stream_id)) {
575 (Some(stream), Some(request)) => {
576 debug!(
577 "Sending answer back to resolv, stream ID: {}, network {}",
578 stream_id, self.driver.net_id
579 );
580 // We don't care about the error, because it means the requestor has left.
581 let _ = request.response_tx.send(stream);
582 }
583 (None, _) => warn!("Tried to deliver untracked stream {}", stream_id),
584 (_, None) => warn!("Tried to deliver stream {} to untracked requestor", stream_id),
585 }
586 }
587 }
588