1 use std::time::Duration; 2 3 use log::{trace, warn}; 4 use tokio::{ 5 sync::mpsc::{self, error::TrySendError}, 6 time::timeout, 7 }; 8 9 use crate::{ 10 gatt::ids::AttHandle, 11 packets::{AttAttributeDataChild, AttChild, AttHandleValueIndicationBuilder, Serializable}, 12 utils::packet::build_att_data, 13 }; 14 15 use super::{ 16 att_database::{AttDatabase, StableAttDatabase}, 17 att_server_bearer::SendError, 18 }; 19 20 #[derive(Debug)] 21 /// Errors that can occur while sending an indication 22 pub enum IndicationError { 23 /// The provided data exceeds the MTU limitations 24 DataExceedsMtu { 25 /// The actual max payload size permitted 26 /// (ATT_MTU - 3, since 3 bytes are needed for the header) 27 mtu: usize, 28 }, 29 /// The indicated attribute handle does not exist 30 AttributeNotFound, 31 /// The indicated attribute does not support indications 32 IndicationsNotSupported, 33 /// Failed to send the outgoing indication packet 34 SendError(SendError), 35 /// Did not receive a confirmation in the given time (30s) 36 ConfirmationTimeout, 37 /// The connection was dropped while waiting for a confirmation 38 ConnectionDroppedWhileWaitingForConfirmation, 39 } 40 41 pub struct IndicationHandler<T> { 42 db: T, 43 pending_confirmation: mpsc::Receiver<()>, 44 } 45 46 impl<T: AttDatabase> IndicationHandler<T> { new(db: T) -> (Self, ConfirmationWatcher)47 pub fn new(db: T) -> (Self, ConfirmationWatcher) { 48 let (tx, rx) = mpsc::channel(1); 49 (Self { db, pending_confirmation: rx }, ConfirmationWatcher(tx)) 50 } 51 send( &mut self, handle: AttHandle, data: AttAttributeDataChild, mtu: usize, send_packet: impl FnOnce(AttChild) -> Result<(), SendError>, ) -> Result<(), IndicationError>52 pub async fn send( 53 &mut self, 54 handle: AttHandle, 55 data: AttAttributeDataChild, 56 mtu: usize, 57 send_packet: impl FnOnce(AttChild) -> Result<(), SendError>, 58 ) -> Result<(), IndicationError> { 59 let data_size = data 60 .size_in_bits() 61 .map_err(SendError::SerializeError) 62 .map_err(IndicationError::SendError)?; 63 // As per Core Spec 5.3 Vol 3F 3.4.7.2, the indicated value must be at most 64 // ATT_MTU-3 65 if data_size > (mtu - 3) * 8 { 66 return Err(IndicationError::DataExceedsMtu { mtu: mtu - 3 }); 67 } 68 69 if !self 70 .db 71 .snapshot() 72 .find_attribute(handle) 73 .ok_or(IndicationError::AttributeNotFound)? 74 .permissions 75 .indicate() 76 { 77 warn!("cannot send indication for {handle:?} since it does not support indications"); 78 return Err(IndicationError::IndicationsNotSupported); 79 } 80 81 // flushing any confirmations that arrived before we sent the next indication 82 let _ = self.pending_confirmation.try_recv(); 83 84 send_packet( 85 AttHandleValueIndicationBuilder { handle: handle.into(), value: build_att_data(data) } 86 .into(), 87 ) 88 .map_err(IndicationError::SendError)?; 89 90 match timeout(Duration::from_secs(30), self.pending_confirmation.recv()).await { 91 Ok(Some(())) => Ok(()), 92 Ok(None) => { 93 warn!("connection dropped while waiting for indication confirmation"); 94 Err(IndicationError::ConnectionDroppedWhileWaitingForConfirmation) 95 } 96 Err(_) => { 97 warn!("Sent indication but received no response for 30s"); 98 Err(IndicationError::ConfirmationTimeout) 99 } 100 } 101 } 102 } 103 104 pub struct ConfirmationWatcher(mpsc::Sender<()>); 105 106 impl ConfirmationWatcher { on_confirmation(&self)107 pub fn on_confirmation(&self) { 108 match self.0.try_send(()) { 109 Ok(_) => { 110 trace!("Got AttHandleValueConfirmation") 111 } 112 Err(TrySendError::Full(_)) => { 113 warn!("Got a second AttHandleValueConfirmation before the first was processed, dropping it") 114 } 115 Err(TrySendError::Closed(_)) => { 116 warn!("Got an AttHandleValueConfirmation while no indications are outstanding, dropping it") 117 } 118 } 119 } 120 } 121 122 #[cfg(test)] 123 mod test { 124 use tokio::{sync::oneshot, task::spawn_local, time::Instant}; 125 126 use crate::{ 127 core::uuid::Uuid, 128 gatt::server::{ 129 att_database::AttAttribute, gatt_database::AttPermissions, 130 test::test_att_db::TestAttDatabase, 131 }, 132 utils::task::block_on_locally, 133 }; 134 135 use super::*; 136 137 const HANDLE: AttHandle = AttHandle(1); 138 const NONEXISTENT_HANDLE: AttHandle = AttHandle(2); 139 const NON_INDICATE_HANDLE: AttHandle = AttHandle(3); 140 const MTU: usize = 32; 141 get_data() -> AttAttributeDataChild142 fn get_data() -> AttAttributeDataChild { 143 AttAttributeDataChild::RawData([1, 2, 3].into()) 144 } 145 get_att_database() -> TestAttDatabase146 fn get_att_database() -> TestAttDatabase { 147 TestAttDatabase::new(vec![ 148 ( 149 AttAttribute { 150 handle: HANDLE, 151 type_: Uuid::new(123), 152 permissions: AttPermissions::INDICATE, 153 }, 154 vec![], 155 ), 156 ( 157 AttAttribute { 158 handle: NON_INDICATE_HANDLE, 159 type_: Uuid::new(123), 160 permissions: AttPermissions::READABLE, 161 }, 162 vec![], 163 ), 164 ]) 165 } 166 167 #[test] test_indication_sent()168 fn test_indication_sent() { 169 block_on_locally(async move { 170 // arrange 171 let (mut indication_handler, _confirmation_watcher) = 172 IndicationHandler::new(get_att_database()); 173 let (tx, rx) = oneshot::channel(); 174 175 // act: send an indication 176 spawn_local(async move { 177 indication_handler 178 .send(HANDLE, get_data(), MTU, move |packet| { 179 tx.send(packet).unwrap(); 180 Ok(()) 181 }) 182 .await 183 }); 184 185 // assert: that an AttHandleValueIndication was sent on the channel 186 let AttChild::AttHandleValueIndication(indication) = rx.await.unwrap() else { 187 unreachable!() 188 }; 189 assert_eq!( 190 indication, 191 AttHandleValueIndicationBuilder { 192 handle: HANDLE.into(), 193 value: build_att_data(get_data()), 194 } 195 ); 196 }); 197 } 198 199 #[test] test_invalid_handle()200 fn test_invalid_handle() { 201 block_on_locally(async move { 202 // arrange 203 let (mut indication_handler, _confirmation_watcher) = 204 IndicationHandler::new(get_att_database()); 205 206 // act: send an indication on a nonexistent handle 207 let ret = indication_handler 208 .send(NONEXISTENT_HANDLE, get_data(), MTU, move |_| unreachable!()) 209 .await; 210 211 // assert: that we failed with IndicationError::AttributeNotFound 212 assert!(matches!(ret, Err(IndicationError::AttributeNotFound))); 213 }); 214 } 215 216 #[test] test_unsupported_permission()217 fn test_unsupported_permission() { 218 block_on_locally(async move { 219 // arrange 220 let (mut indication_handler, _confirmation_watcher) = 221 IndicationHandler::new(get_att_database()); 222 223 // act: send an indication on an attribute that does not support indications 224 let ret = indication_handler 225 .send(NON_INDICATE_HANDLE, get_data(), MTU, move |_| unreachable!()) 226 .await; 227 228 // assert: that we failed with IndicationError::IndicationsNotSupported 229 assert!(matches!(ret, Err(IndicationError::IndicationsNotSupported))); 230 }); 231 } 232 233 #[test] test_confirmation_handled()234 fn test_confirmation_handled() { 235 block_on_locally(async move { 236 // arrange 237 let (mut indication_handler, confirmation_watcher) = 238 IndicationHandler::new(get_att_database()); 239 let (tx, rx) = oneshot::channel(); 240 241 // act: send an indication 242 let pending_result = spawn_local(async move { 243 indication_handler 244 .send(HANDLE, get_data(), MTU, move |packet| { 245 tx.send(packet).unwrap(); 246 Ok(()) 247 }) 248 .await 249 }); 250 // when the indication is sent, send a confirmation in response 251 rx.await.unwrap(); 252 confirmation_watcher.on_confirmation(); 253 254 // assert: the indication was successfully sent 255 assert!(matches!(pending_result.await.unwrap(), Ok(()))); 256 }); 257 } 258 259 #[test] test_unblock_on_disconnect()260 fn test_unblock_on_disconnect() { 261 block_on_locally(async move { 262 // arrange 263 let (mut indication_handler, confirmation_watcher) = 264 IndicationHandler::new(get_att_database()); 265 let (tx, rx) = oneshot::channel(); 266 267 // act: send an indication 268 let pending_result = spawn_local(async move { 269 indication_handler 270 .send(HANDLE, get_data(), MTU, move |packet| { 271 tx.send(packet).unwrap(); 272 Ok(()) 273 }) 274 .await 275 }); 276 // when the indication is sent, drop the confirmation watcher (as would happen 277 // upon a disconnection) 278 rx.await.unwrap(); 279 drop(confirmation_watcher); 280 281 // assert: we get the appropriate error 282 assert!(matches!( 283 pending_result.await.unwrap(), 284 Err(IndicationError::ConnectionDroppedWhileWaitingForConfirmation) 285 )); 286 }); 287 } 288 289 #[test] test_spurious_confirmations()290 fn test_spurious_confirmations() { 291 block_on_locally(async move { 292 // arrange: send a few confirmations in advance 293 let (mut indication_handler, confirmation_watcher) = 294 IndicationHandler::new(get_att_database()); 295 let (tx, rx) = oneshot::channel(); 296 confirmation_watcher.on_confirmation(); 297 confirmation_watcher.on_confirmation(); 298 299 // act: send an indication 300 let pending_result = spawn_local(async move { 301 indication_handler 302 .send(HANDLE, get_data(), MTU, move |packet| { 303 tx.send(packet).unwrap(); 304 Ok(()) 305 }) 306 .await 307 }); 308 // when the indication is sent, drop the confirmation watcher (so we won't block 309 // forever) 310 rx.await.unwrap(); 311 drop(confirmation_watcher); 312 313 // assert: we get the appropriate error, rather than an Ok(()) 314 // (which would have been the case if we had processed the spurious 315 // confirmations) 316 assert!(matches!( 317 pending_result.await.unwrap(), 318 Err(IndicationError::ConnectionDroppedWhileWaitingForConfirmation) 319 )); 320 }); 321 } 322 323 #[test] test_indication_timeout()324 fn test_indication_timeout() { 325 block_on_locally(async move { 326 // arrange: send a few confirmations in advance 327 let (mut indication_handler, confirmation_watcher) = 328 IndicationHandler::new(get_att_database()); 329 let (tx, rx) = oneshot::channel(); 330 confirmation_watcher.on_confirmation(); 331 confirmation_watcher.on_confirmation(); 332 333 // act: send an indication 334 let time_sent = Instant::now(); 335 let pending_result = spawn_local(async move { 336 indication_handler 337 .send(HANDLE, get_data(), MTU, move |packet| { 338 tx.send(packet).unwrap(); 339 Ok(()) 340 }) 341 .await 342 }); 343 // after it is sent, wait for the timer to fire 344 rx.await.unwrap(); 345 346 // assert: we get the appropriate error 347 assert!(matches!( 348 pending_result.await.unwrap(), 349 Err(IndicationError::ConfirmationTimeout) 350 )); 351 // after the appropriate interval 352 // note: this is not really timing-dependent, since we are using a simulated 353 // clock TODO(aryarahul) - why is this not exactly 30s? 354 let time_slept = Instant::now().duration_since(time_sent); 355 assert!(time_slept > Duration::from_secs(29)); 356 assert!(time_slept < Duration::from_secs(31)); 357 }); 358 } 359 360 #[test] test_mtu_exceeds()361 fn test_mtu_exceeds() { 362 block_on_locally(async move { 363 // arrange 364 let (mut indication_handler, _confirmation_watcher) = 365 IndicationHandler::new(get_att_database()); 366 367 // act: send an indication with an ATT_MTU of 4 and data length of 3 368 let res = indication_handler 369 .send( 370 HANDLE, 371 AttAttributeDataChild::RawData([1, 2, 3].into()), 372 4, 373 move |_| unreachable!(), 374 ) 375 .await; 376 377 // assert: that we got the expected error, indicating the max data size (not the 378 // ATT_MTU, but ATT_MTU-3) 379 assert!(matches!(res, Err(IndicationError::DataExceedsMtu { mtu: 1 }))); 380 }); 381 } 382 } 383