1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //! Provides a backing task to implement a network
18
19 use crate::boot_time::{timeout, BootTime, Duration};
20 use crate::config::Config;
21 use crate::connection::driver::Cause;
22 use crate::connection::Connection;
23 use crate::dispatcher::{QueryError, Response};
24 use crate::encoding;
25 use anyhow::{anyhow, bail, Result};
26 use log::{debug, info};
27 use std::sync::Arc;
28 use tokio::sync::{mpsc, watch};
29 use tokio::task;
30
31 use super::{Query, ServerInfo, SocketTagger, ValidationReporter};
32
33 pub struct Driver {
34 info: ServerInfo,
35 config: Config,
36 connection: Connection,
37 command_rx: mpsc::Receiver<Command>,
38 status_tx: watch::Sender<Status>,
39 validation: ValidationReporter,
40 tag_socket: SocketTagger,
41 }
42
43 #[derive(Debug)]
44 /// Requests the network can handle
45 pub enum Command {
46 /// Send a DNS query to the network
47 Query(Query),
48 /// Run a probe to check the health of the network. Argument is timeout.
49 Probe(Duration),
50 }
51
52 #[derive(Clone, Debug)]
53 /// Current Network Status
54 ///
55 /// (Unprobed or Failed) can go to (Live or Failed) via Probe.
56 /// Currently, there is no way to go from Live to Failed - probing a live network will short-circuit to returning valid, and query failures do not declare the network failed.
57 pub enum Status {
58 /// Network has not been probed, it may or may not work
59 Unprobed,
60 /// Network is believed to be working
61 Live,
62 /// Network is broken, reason as argument
63 Failed(Arc<anyhow::Error>),
64 }
65
66 impl Status {
is_live(&self) -> bool67 pub fn is_live(&self) -> bool {
68 matches!(self, Self::Live)
69 }
is_failed(&self) -> bool70 pub fn is_failed(&self) -> bool {
71 matches!(self, Self::Failed(_))
72 }
73 }
74
build_connection( info: &ServerInfo, tag_socket: &SocketTagger, config: &mut Config, session: Option<Vec<u8>>, cause: Cause, ) -> Result<Connection>75 async fn build_connection(
76 info: &ServerInfo,
77 tag_socket: &SocketTagger,
78 config: &mut Config,
79 session: Option<Vec<u8>>,
80 cause: Cause,
81 ) -> Result<Connection> {
82 use std::ops::DerefMut;
83 Ok(Connection::new(info, tag_socket, config.take().await.deref_mut(), session, cause).await?)
84 }
85
86 impl Driver {
87 const MAX_BUFFERED_COMMANDS: usize = 50;
88
new( info: ServerInfo, mut config: Config, validation: ValidationReporter, tag_socket: SocketTagger, ) -> Result<(Self, mpsc::Sender<Command>, watch::Receiver<Status>)>89 pub async fn new(
90 info: ServerInfo,
91 mut config: Config,
92 validation: ValidationReporter,
93 tag_socket: SocketTagger,
94 ) -> Result<(Self, mpsc::Sender<Command>, watch::Receiver<Status>)> {
95 let (command_tx, command_rx) = mpsc::channel(Self::MAX_BUFFERED_COMMANDS);
96 let (status_tx, status_rx) = watch::channel(Status::Unprobed);
97 let connection =
98 build_connection(&info, &tag_socket, &mut config, None, Cause::Probe).await?;
99 Ok((
100 Self { info, config, connection, status_tx, command_rx, validation, tag_socket },
101 command_tx,
102 status_rx,
103 ))
104 }
105
drive(mut self) -> Result<()>106 pub async fn drive(mut self) -> Result<()> {
107 while let Some(cmd) = self.command_rx.recv().await {
108 match cmd {
109 Command::Probe(duration) => {
110 if let Err(e) = self.probe(duration).await {
111 self.status_tx.send(Status::Failed(Arc::new(e)))?
112 }
113 }
114 Command::Query(query) => {
115 if let Err(e) = self.send_query(query).await {
116 info!("Unable to send query: {:?}", e)
117 }
118 }
119 };
120 }
121 Ok(())
122 }
123
probe(&mut self, probe_timeout: Duration) -> Result<()>124 async fn probe(&mut self, probe_timeout: Duration) -> Result<()> {
125 if self.status_tx.borrow().is_failed() {
126 debug!("Network is currently failed, reconnecting");
127 // If our network is currently failed, it may be due to issues with the connection.
128 // Re-establish before re-probing
129 self.connection = build_connection(
130 &self.info,
131 &self.tag_socket,
132 &mut self.config,
133 None,
134 Cause::Retry,
135 )
136 .await?;
137 self.status_tx.send(Status::Unprobed)?;
138 }
139 if self.status_tx.borrow().is_live() {
140 // If we're already validated, short circuit
141 (self.validation)(&self.info, true).await;
142 return Ok(());
143 }
144 self.force_probe(probe_timeout).await
145 }
146
force_probe(&mut self, probe_timeout: Duration) -> Result<()>147 async fn force_probe(&mut self, probe_timeout: Duration) -> Result<()> {
148 info!("Sending probe to server {} on Network {}", self.info.peer_addr, self.info.net_id);
149 let probe = encoding::probe_query()?;
150 let dns_request = encoding::dns_request(&probe, &self.info.url)?;
151 let expiry = BootTime::now().checked_add(probe_timeout);
152 let request = async {
153 match self.connection.query(dns_request, expiry).await {
154 Err(e) => self.status_tx.send(Status::Failed(Arc::new(anyhow!(e)))),
155 Ok(rsp) => {
156 if let Some(_stream) = rsp.await {
157 // TODO verify stream contents
158 self.status_tx.send(Status::Live)
159 } else {
160 self.status_tx.send(Status::Failed(Arc::new(anyhow!("Empty response"))))
161 }
162 }
163 }
164 };
165 match timeout(probe_timeout, request).await {
166 // Timed out
167 Err(time) => self.status_tx.send(Status::Failed(Arc::new(anyhow!(
168 "Probe timed out after {:?} (timeout={:?})",
169 time,
170 probe_timeout
171 )))),
172 // Query completed
173 Ok(r) => r,
174 }?;
175 let valid = self.status_tx.borrow().is_live();
176 (self.validation)(&self.info, valid).await;
177 Ok(())
178 }
179
send_query(&mut self, query: Query) -> Result<()>180 async fn send_query(&mut self, query: Query) -> Result<()> {
181 // If the associated receiver has been closed, meaning that the request has already
182 // timed out, just drop it. This check helps drain the channel quickly in the case
183 // where the network is stalled.
184 if query.response.is_closed() {
185 bail!("Abandoning expired DNS request")
186 }
187
188 if !self.connection.wait_for_live().await {
189 let session =
190 if self.info.use_session_resumption { self.connection.session() } else { None };
191 // Try reconnecting
192 self.connection = build_connection(
193 &self.info,
194 &self.tag_socket,
195 &mut self.config,
196 session,
197 Cause::Reconnect,
198 )
199 .await?;
200 }
201 let request = encoding::dns_request(&query.query, &self.info.url)?;
202 let stream_fut = self.connection.query(request, Some(query.expiry)).await?;
203 task::spawn(async move {
204 let stream = match stream_fut.await {
205 Some(stream) => stream,
206 None => {
207 info!("Connection died while processing request");
208 // We don't care if the response is gone
209 let _ =
210 query.response.send(Response::Error { error: QueryError::ConnectionError });
211 return;
212 }
213 };
214 // We don't care if the response is gone.
215 let _ = if let Some(err) = stream.error {
216 query.response.send(Response::Error { error: QueryError::Reset(err) })
217 } else {
218 query.response.send(Response::Success { answer: stream.data })
219 };
220 });
221 Ok(())
222 }
223 }
224