1 //! This module manages LE connection requests and active
2 //! LE connections. In particular, it de-duplicates connection requests,
3 //! avoids duplicate connections to the same devices (even with different RPAs),
4 //! and retries failed connections
5
6 use std::{
7 cell::RefCell, collections::HashSet, fmt::Debug, future::Future, hash::Hash, ops::Deref,
8 time::Duration,
9 };
10
11 use crate::{
12 core::{
13 address::AddressWithType,
14 shared_box::{SharedBox, WeakBox, WeakBoxRef},
15 },
16 gatt::ids::ServerId,
17 };
18
19 use self::{
20 acceptlist_manager::{determine_target_state, LeAcceptlistManager},
21 attempt_manager::{ConnectionAttempts, ConnectionMode},
22 le_manager::{ErrorCode, InactiveLeAclManager, LeAclManagerConnectionCallbacks},
23 };
24
25 mod acceptlist_manager;
26 mod attempt_manager;
27 mod ffi;
28 pub mod le_manager;
29 mod mocks;
30
31 pub use ffi::{register_callbacks, LeAclManagerImpl, LeAclManagerShim};
32 use log::info;
33 use scopeguard::ScopeGuard;
34 use tokio::{task::spawn_local, time::timeout};
35
36 /// Possible errors returned when making a connection attempt
37 #[derive(Debug, PartialEq, Eq)]
38 pub enum CreateConnectionFailure {
39 /// This client is already making a connection of the same type
40 /// to the same address.
41 ConnectionAlreadyPending,
42 }
43
44 /// Errors returned if a connection successfully starts but fails afterwards.
45 #[derive(Debug, PartialEq, Eq)]
46 pub enum ConnectionFailure {
47 /// The connection attempt was cancelled
48 Cancelled,
49 /// The connection completed but with an HCI error code
50 Error(ErrorCode),
51 }
52
53 /// Errors returned if the client fails to cancel their connection attempt
54 #[derive(Debug, PartialEq, Eq)]
55 pub enum CancelConnectFailure {
56 /// The connection attempt does not exist
57 ConnectionNotPending,
58 }
59
60 /// Unique identifiers for a client of the connection manager
61 #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
62 pub enum ConnectionManagerClient {
63 /// A GATT client with given client ID
64 GattClient(u8),
65 /// A GATT server with given server ID
66 GattServer(ServerId),
67 }
68
69 /// An active connection
70 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
71 pub struct LeConnection {
72 /// The address of the peer device, as reported in the connection complete event
73 /// This is guaranteed to be unique across active connections, so we can implement
74 /// PartialEq/Eq on this.
75 pub remote_address: AddressWithType,
76 }
77
78 /// Responsible for managing the initiator state and the list of
79 /// devices on the filter accept list
80 #[derive(Debug)]
81 pub struct ConnectionManager {
82 state: RefCell<ConnectionManagerState>,
83 }
84
85 #[derive(Debug)]
86 struct ConnectionManagerState {
87 /// All pending connection attempts (unresolved direct + all background)
88 attempts: ConnectionAttempts,
89 /// The addresses we are currently connected to
90 current_connections: HashSet<AddressWithType>,
91 /// Tracks the state of the LE connect list, and updates it to drive to a
92 /// specified target state
93 acceptlist_manager: LeAcceptlistManager,
94 }
95
96 struct ConnectionManagerCallbackHandler(WeakBox<ConnectionManager>);
97
98 const DIRECT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(
99 29, /* ugly hack to avoid fighting with le_impl timeout, until I remove that timeout */
100 );
101
102 impl LeAclManagerConnectionCallbacks for ConnectionManagerCallbackHandler {
on_le_connect(&self, address: AddressWithType, result: Result<LeConnection, ErrorCode>)103 fn on_le_connect(&self, address: AddressWithType, result: Result<LeConnection, ErrorCode>) {
104 self.with_manager(|manager| manager.on_le_connect(address, result))
105 }
106
on_disconnect(&self, address: AddressWithType)107 fn on_disconnect(&self, address: AddressWithType) {
108 self.with_manager(|manager| manager.on_disconnect(address))
109 }
110 }
111
112 impl ConnectionManagerCallbackHandler {
with_manager(&self, f: impl FnOnce(&ConnectionManager))113 fn with_manager(&self, f: impl FnOnce(&ConnectionManager)) {
114 self.0.with(|manager| f(manager.expect("got connection event after stack died").deref()))
115 }
116 }
117
118 impl ConnectionManager {
119 /// Constructor
new(le_manager: impl InactiveLeAclManager) -> SharedBox<Self>120 pub fn new(le_manager: impl InactiveLeAclManager) -> SharedBox<Self> {
121 SharedBox::new_cyclic(|weak| Self {
122 state: RefCell::new(ConnectionManagerState {
123 attempts: ConnectionAttempts::new(),
124 current_connections: HashSet::new(),
125 acceptlist_manager: LeAcceptlistManager::new(
126 le_manager.register_callbacks(ConnectionManagerCallbackHandler(weak)),
127 ),
128 }),
129 })
130 }
131 }
132
133 /// Make the state of the LeAcceptlistManager consistent with the attempts tracked in ConnectionAttempts
reconcile_state(state: &mut ConnectionManagerState)134 fn reconcile_state(state: &mut ConnectionManagerState) {
135 state
136 .acceptlist_manager
137 .drive_to_state(determine_target_state(&state.attempts.active_attempts()));
138 }
139
140 impl WeakBoxRef<'_, ConnectionManager> {
141 /// Start a direct connection to a peer device from a specified client. If the peer
142 /// is connected, immediately resolve the attempt.
start_direct_connection( &self, client: ConnectionManagerClient, address: AddressWithType, ) -> Result<(), CreateConnectionFailure>143 pub fn start_direct_connection(
144 &self,
145 client: ConnectionManagerClient,
146 address: AddressWithType,
147 ) -> Result<(), CreateConnectionFailure> {
148 spawn_local(timeout(DIRECT_CONNECTION_TIMEOUT, self.direct_connection(client, address)?));
149 Ok(())
150 }
151
152 /// Start a direct connection to a peer device from a specified client.
153 ///
154 /// # Cancellation Safety
155 /// If this future is dropped, the connection attempt will be cancelled. It can also be cancelled
156 /// from the separate API ConnectionManager#cancel_connection.
direct_connection( &self, client: ConnectionManagerClient, address: AddressWithType, ) -> Result< impl Future<Output = Result<LeConnection, ConnectionFailure>>, CreateConnectionFailure, >157 fn direct_connection(
158 &self,
159 client: ConnectionManagerClient,
160 address: AddressWithType,
161 ) -> Result<
162 impl Future<Output = Result<LeConnection, ConnectionFailure>>,
163 CreateConnectionFailure,
164 > {
165 let mut state = self.state.borrow_mut();
166
167 // if connected, this is a no-op
168 let attempt_and_guard = if state.current_connections.contains(&address) {
169 None
170 } else {
171 let pending_attempt = state.attempts.register_direct_connection(client, address)?;
172 let attempt_id = pending_attempt.id;
173 reconcile_state(&mut state);
174 Some((
175 pending_attempt,
176 scopeguard::guard(self.downgrade(), move |this| {
177 // remove the attempt after we are cancelled
178 this.with(|this| {
179 this.map(|this| {
180 info!("Cancelling attempt {attempt_id:?}");
181 let mut state = this.state.borrow_mut();
182 state.attempts.cancel_attempt_with_id(attempt_id);
183 reconcile_state(&mut state);
184 })
185 });
186 }),
187 ))
188 };
189
190 Ok(async move {
191 let Some((attempt, guard)) = attempt_and_guard else {
192 // if we did not make an attempt, the connection must be ready
193 return Ok(LeConnection { remote_address: address });
194 };
195 // otherwise, wait until the attempt resolves
196 let ret = attempt.await;
197 // defuse scopeguard (no need to cancel now)
198 ScopeGuard::into_inner(guard);
199 ret
200 })
201 }
202 }
203
204 impl ConnectionManager {
205 /// Start a background connection to a peer device with given parameters from a specified client.
add_background_connection( &self, client: ConnectionManagerClient, address: AddressWithType, ) -> Result<(), CreateConnectionFailure>206 pub fn add_background_connection(
207 &self,
208 client: ConnectionManagerClient,
209 address: AddressWithType,
210 ) -> Result<(), CreateConnectionFailure> {
211 let mut state = self.state.borrow_mut();
212 state.attempts.register_background_connection(client, address)?;
213 reconcile_state(&mut state);
214 Ok(())
215 }
216
217 /// Cancel connection attempt from this client to the specified address with the specified mode.
cancel_connection( &self, client: ConnectionManagerClient, address: AddressWithType, mode: ConnectionMode, ) -> Result<(), CancelConnectFailure>218 pub fn cancel_connection(
219 &self,
220 client: ConnectionManagerClient,
221 address: AddressWithType,
222 mode: ConnectionMode,
223 ) -> Result<(), CancelConnectFailure> {
224 let mut state = self.state.borrow_mut();
225 state.attempts.cancel_attempt(client, address, mode)?;
226 reconcile_state(&mut state);
227 Ok(())
228 }
229
230 /// Cancel all connection attempts to this address
cancel_unconditionally(&self, address: AddressWithType)231 pub fn cancel_unconditionally(&self, address: AddressWithType) {
232 let mut state = self.state.borrow_mut();
233 state.attempts.remove_unconditionally(address);
234 reconcile_state(&mut state);
235 }
236
237 /// Cancel all connection attempts from this client
remove_client(&self, client: ConnectionManagerClient)238 pub fn remove_client(&self, client: ConnectionManagerClient) {
239 let mut state = self.state.borrow_mut();
240 state.attempts.remove_client(client);
241 reconcile_state(&mut state);
242 }
243
on_le_connect(&self, address: AddressWithType, result: Result<LeConnection, ErrorCode>)244 fn on_le_connect(&self, address: AddressWithType, result: Result<LeConnection, ErrorCode>) {
245 let mut state = self.state.borrow_mut();
246 // record this connection while it exists
247 state.current_connections.insert(address);
248 // all completed connections remove the address from the direct list
249 state.acceptlist_manager.on_connect_complete(address);
250 // invoke any pending callbacks, update set of attempts
251 state.attempts.process_connection(address, result);
252 // update the acceptlist
253 reconcile_state(&mut state);
254 }
255
on_disconnect(&self, address: AddressWithType)256 fn on_disconnect(&self, address: AddressWithType) {
257 let mut state = self.state.borrow_mut();
258 state.current_connections.remove(&address);
259 reconcile_state(&mut state);
260 }
261 }
262
263 #[cfg(test)]
264 mod test {
265 use crate::{core::address::AddressType, utils::task::block_on_locally};
266
267 use super::{mocks::mock_le_manager::MockLeAclManager, *};
268
269 const CLIENT_1: ConnectionManagerClient = ConnectionManagerClient::GattClient(1);
270 const CLIENT_2: ConnectionManagerClient = ConnectionManagerClient::GattClient(2);
271
272 const ADDRESS_1: AddressWithType =
273 AddressWithType { address: [1, 2, 3, 4, 5, 6], address_type: AddressType::Public };
274
275 const ERROR: ErrorCode = ErrorCode(1);
276
277 #[test]
test_single_direct_connection()278 fn test_single_direct_connection() {
279 block_on_locally(async {
280 // arrange
281 let mock_le_manager = MockLeAclManager::new();
282 let connection_manager = ConnectionManager::new(mock_le_manager.clone());
283
284 // act: initiate a direct connection
285 connection_manager.as_ref().start_direct_connection(CLIENT_1, ADDRESS_1).unwrap();
286
287 // assert: the direct connection is pending
288 assert_eq!(mock_le_manager.current_connection_mode(), Some(ConnectionMode::Direct));
289 assert_eq!(mock_le_manager.current_acceptlist().len(), 1);
290 assert!(mock_le_manager.current_acceptlist().contains(&ADDRESS_1));
291 });
292 }
293
294 #[test]
test_failed_direct_connection()295 fn test_failed_direct_connection() {
296 block_on_locally(async {
297 // arrange: one pending direct connection
298 let mock_le_manager = MockLeAclManager::new();
299 let connection_manager = ConnectionManager::new(mock_le_manager.clone());
300 connection_manager.as_ref().start_direct_connection(CLIENT_1, ADDRESS_1).unwrap();
301
302 // act: the connection attempt fails
303 mock_le_manager.on_le_connect(ADDRESS_1, ERROR);
304
305 // assert: the direct connection has stopped
306 assert_eq!(mock_le_manager.current_connection_mode(), None);
307 });
308 }
309
310 #[test]
test_single_background_connection()311 fn test_single_background_connection() {
312 block_on_locally(async {
313 // arrange
314 let mock_le_manager = MockLeAclManager::new();
315 let connection_manager = ConnectionManager::new(mock_le_manager.clone());
316
317 // act: initiate a background connection
318 connection_manager.as_ref().add_background_connection(CLIENT_1, ADDRESS_1).unwrap();
319
320 // assert: the background connection is pending
321 assert_eq!(mock_le_manager.current_connection_mode(), Some(ConnectionMode::Background));
322 assert_eq!(mock_le_manager.current_acceptlist().len(), 1);
323 assert!(mock_le_manager.current_acceptlist().contains(&ADDRESS_1));
324 });
325 }
326
327 #[test]
test_resolved_connection()328 fn test_resolved_connection() {
329 block_on_locally(async {
330 // arrange
331 let mock_le_manager = MockLeAclManager::new();
332 let connection_manager = ConnectionManager::new(mock_le_manager.clone());
333
334 // act: initiate a direct connection, that succeeds
335 connection_manager.as_ref().start_direct_connection(CLIENT_1, ADDRESS_1).unwrap();
336 mock_le_manager.on_le_connect(ADDRESS_1, ErrorCode::SUCCESS);
337
338 // assert: no connection is pending
339 assert_eq!(mock_le_manager.current_connection_mode(), None);
340 });
341 }
342
343 #[test]
test_resolved_background_connection()344 fn test_resolved_background_connection() {
345 block_on_locally(async {
346 // arrange
347 let mock_le_manager = MockLeAclManager::new();
348 let connection_manager = ConnectionManager::new(mock_le_manager.clone());
349
350 // act: initiate a background connection, that succeeds
351 connection_manager.as_ref().add_background_connection(CLIENT_1, ADDRESS_1).unwrap();
352 mock_le_manager.on_le_connect(ADDRESS_1, ErrorCode::SUCCESS);
353
354 // assert: no connection is pending
355 assert_eq!(mock_le_manager.current_connection_mode(), None);
356 });
357 }
358
359 #[test]
test_resolved_direct_connection_after_disconnect()360 fn test_resolved_direct_connection_after_disconnect() {
361 block_on_locally(async {
362 // arrange
363 let mock_le_manager = MockLeAclManager::new();
364 let connection_manager = ConnectionManager::new(mock_le_manager.clone());
365
366 // act: initiate a direct connection, that succeeds, then disconnects
367 connection_manager.as_ref().start_direct_connection(CLIENT_1, ADDRESS_1).unwrap();
368 mock_le_manager.on_le_connect(ADDRESS_1, ErrorCode::SUCCESS);
369 mock_le_manager.on_le_disconnect(ADDRESS_1);
370
371 // assert: no connection is pending
372 assert_eq!(mock_le_manager.current_connection_mode(), None);
373 });
374 }
375
376 #[test]
test_resolved_background_connection_after_disconnect()377 fn test_resolved_background_connection_after_disconnect() {
378 block_on_locally(async {
379 // arrange
380 let mock_le_manager = MockLeAclManager::new();
381 let connection_manager = ConnectionManager::new(mock_le_manager.clone());
382
383 // act: initiate a background connection, that succeeds, then disconnects
384 connection_manager.as_ref().add_background_connection(CLIENT_1, ADDRESS_1).unwrap();
385 mock_le_manager.on_le_connect(ADDRESS_1, ErrorCode::SUCCESS);
386 mock_le_manager.on_le_disconnect(ADDRESS_1);
387
388 // assert: the background connection has resumed
389 assert_eq!(mock_le_manager.current_connection_mode(), Some(ConnectionMode::Background));
390 });
391 }
392
393 #[test]
test_direct_connection_timeout()394 fn test_direct_connection_timeout() {
395 block_on_locally(async {
396 // arrange: a pending direct connection
397 let mock_le_manager = MockLeAclManager::new();
398 let connection_manager = ConnectionManager::new(mock_le_manager.clone());
399 connection_manager.as_ref().start_direct_connection(CLIENT_1, ADDRESS_1).unwrap();
400
401 // act: let it timeout
402 tokio::time::sleep(DIRECT_CONNECTION_TIMEOUT).await;
403 // go forward one tick to ensure all timers are fired
404 // (since we are using fake time, this is not a race condition)
405 tokio::time::sleep(Duration::from_millis(1)).await;
406
407 // assert: it is cancelled and we are idle again
408 assert_eq!(mock_le_manager.current_connection_mode(), None);
409 });
410 }
411
412 #[test]
test_stacked_direct_connections_timeout()413 fn test_stacked_direct_connections_timeout() {
414 block_on_locally(async {
415 // arrange
416 let mock_le_manager = MockLeAclManager::new();
417 let connection_manager = ConnectionManager::new(mock_le_manager.clone());
418
419 // act: start a direct connection
420 connection_manager.as_ref().start_direct_connection(CLIENT_1, ADDRESS_1).unwrap();
421 tokio::time::sleep(DIRECT_CONNECTION_TIMEOUT * 3 / 4).await;
422 // act: after some time, start a second one
423 connection_manager.as_ref().start_direct_connection(CLIENT_2, ADDRESS_1).unwrap();
424 // act: wait for the first one (but not the second) to time out
425 tokio::time::sleep(DIRECT_CONNECTION_TIMEOUT * 3 / 4).await;
426
427 // assert: we are still doing a direct connection
428 assert_eq!(mock_le_manager.current_connection_mode(), Some(ConnectionMode::Direct));
429 });
430 }
431 }
432