1 use std::time::Duration;
2 
3 use log::{trace, warn};
4 use tokio::{
5     sync::mpsc::{self, error::TrySendError},
6     time::timeout,
7 };
8 
9 use crate::{
10     gatt::ids::AttHandle,
11     packets::{AttAttributeDataChild, AttChild, AttHandleValueIndicationBuilder, Serializable},
12     utils::packet::build_att_data,
13 };
14 
15 use super::{
16     att_database::{AttDatabase, StableAttDatabase},
17     att_server_bearer::SendError,
18 };
19 
20 #[derive(Debug)]
21 /// Errors that can occur while sending an indication
22 pub enum IndicationError {
23     /// The provided data exceeds the MTU limitations
24     DataExceedsMtu {
25         /// The actual max payload size permitted
26         /// (ATT_MTU - 3, since 3 bytes are needed for the header)
27         mtu: usize,
28     },
29     /// The indicated attribute handle does not exist
30     AttributeNotFound,
31     /// The indicated attribute does not support indications
32     IndicationsNotSupported,
33     /// Failed to send the outgoing indication packet
34     SendError(SendError),
35     /// Did not receive a confirmation in the given time (30s)
36     ConfirmationTimeout,
37     /// The connection was dropped while waiting for a confirmation
38     ConnectionDroppedWhileWaitingForConfirmation,
39 }
40 
41 pub struct IndicationHandler<T> {
42     db: T,
43     pending_confirmation: mpsc::Receiver<()>,
44 }
45 
46 impl<T: AttDatabase> IndicationHandler<T> {
new(db: T) -> (Self, ConfirmationWatcher)47     pub fn new(db: T) -> (Self, ConfirmationWatcher) {
48         let (tx, rx) = mpsc::channel(1);
49         (Self { db, pending_confirmation: rx }, ConfirmationWatcher(tx))
50     }
51 
send( &mut self, handle: AttHandle, data: AttAttributeDataChild, mtu: usize, send_packet: impl FnOnce(AttChild) -> Result<(), SendError>, ) -> Result<(), IndicationError>52     pub async fn send(
53         &mut self,
54         handle: AttHandle,
55         data: AttAttributeDataChild,
56         mtu: usize,
57         send_packet: impl FnOnce(AttChild) -> Result<(), SendError>,
58     ) -> Result<(), IndicationError> {
59         let data_size = data
60             .size_in_bits()
61             .map_err(SendError::SerializeError)
62             .map_err(IndicationError::SendError)?;
63         // As per Core Spec 5.3 Vol 3F 3.4.7.2, the indicated value must be at most
64         // ATT_MTU-3
65         if data_size > (mtu - 3) * 8 {
66             return Err(IndicationError::DataExceedsMtu { mtu: mtu - 3 });
67         }
68 
69         if !self
70             .db
71             .snapshot()
72             .find_attribute(handle)
73             .ok_or(IndicationError::AttributeNotFound)?
74             .permissions
75             .indicate()
76         {
77             warn!("cannot send indication for {handle:?} since it does not support indications");
78             return Err(IndicationError::IndicationsNotSupported);
79         }
80 
81         // flushing any confirmations that arrived before we sent the next indication
82         let _ = self.pending_confirmation.try_recv();
83 
84         send_packet(
85             AttHandleValueIndicationBuilder { handle: handle.into(), value: build_att_data(data) }
86                 .into(),
87         )
88         .map_err(IndicationError::SendError)?;
89 
90         match timeout(Duration::from_secs(30), self.pending_confirmation.recv()).await {
91             Ok(Some(())) => Ok(()),
92             Ok(None) => {
93                 warn!("connection dropped while waiting for indication confirmation");
94                 Err(IndicationError::ConnectionDroppedWhileWaitingForConfirmation)
95             }
96             Err(_) => {
97                 warn!("Sent indication but received no response for 30s");
98                 Err(IndicationError::ConfirmationTimeout)
99             }
100         }
101     }
102 }
103 
104 pub struct ConfirmationWatcher(mpsc::Sender<()>);
105 
106 impl ConfirmationWatcher {
on_confirmation(&self)107     pub fn on_confirmation(&self) {
108         match self.0.try_send(()) {
109             Ok(_) => {
110                 trace!("Got AttHandleValueConfirmation")
111             }
112             Err(TrySendError::Full(_)) => {
113                 warn!("Got a second AttHandleValueConfirmation before the first was processed, dropping it")
114             }
115             Err(TrySendError::Closed(_)) => {
116                 warn!("Got an AttHandleValueConfirmation while no indications are outstanding, dropping it")
117             }
118         }
119     }
120 }
121 
122 #[cfg(test)]
123 mod test {
124     use tokio::{sync::oneshot, task::spawn_local, time::Instant};
125 
126     use crate::{
127         core::uuid::Uuid,
128         gatt::server::{
129             att_database::AttAttribute, gatt_database::AttPermissions,
130             test::test_att_db::TestAttDatabase,
131         },
132         utils::task::block_on_locally,
133     };
134 
135     use super::*;
136 
137     const HANDLE: AttHandle = AttHandle(1);
138     const NONEXISTENT_HANDLE: AttHandle = AttHandle(2);
139     const NON_INDICATE_HANDLE: AttHandle = AttHandle(3);
140     const MTU: usize = 32;
141 
get_data() -> AttAttributeDataChild142     fn get_data() -> AttAttributeDataChild {
143         AttAttributeDataChild::RawData([1, 2, 3].into())
144     }
145 
get_att_database() -> TestAttDatabase146     fn get_att_database() -> TestAttDatabase {
147         TestAttDatabase::new(vec![
148             (
149                 AttAttribute {
150                     handle: HANDLE,
151                     type_: Uuid::new(123),
152                     permissions: AttPermissions::INDICATE,
153                 },
154                 vec![],
155             ),
156             (
157                 AttAttribute {
158                     handle: NON_INDICATE_HANDLE,
159                     type_: Uuid::new(123),
160                     permissions: AttPermissions::READABLE,
161                 },
162                 vec![],
163             ),
164         ])
165     }
166 
167     #[test]
test_indication_sent()168     fn test_indication_sent() {
169         block_on_locally(async move {
170             // arrange
171             let (mut indication_handler, _confirmation_watcher) =
172                 IndicationHandler::new(get_att_database());
173             let (tx, rx) = oneshot::channel();
174 
175             // act: send an indication
176             spawn_local(async move {
177                 indication_handler
178                     .send(HANDLE, get_data(), MTU, move |packet| {
179                         tx.send(packet).unwrap();
180                         Ok(())
181                     })
182                     .await
183             });
184 
185             // assert: that an AttHandleValueIndication was sent on the channel
186             let AttChild::AttHandleValueIndication(indication) = rx.await.unwrap() else {
187                 unreachable!()
188             };
189             assert_eq!(
190                 indication,
191                 AttHandleValueIndicationBuilder {
192                     handle: HANDLE.into(),
193                     value: build_att_data(get_data()),
194                 }
195             );
196         });
197     }
198 
199     #[test]
test_invalid_handle()200     fn test_invalid_handle() {
201         block_on_locally(async move {
202             // arrange
203             let (mut indication_handler, _confirmation_watcher) =
204                 IndicationHandler::new(get_att_database());
205 
206             // act: send an indication on a nonexistent handle
207             let ret = indication_handler
208                 .send(NONEXISTENT_HANDLE, get_data(), MTU, move |_| unreachable!())
209                 .await;
210 
211             // assert: that we failed with IndicationError::AttributeNotFound
212             assert!(matches!(ret, Err(IndicationError::AttributeNotFound)));
213         });
214     }
215 
216     #[test]
test_unsupported_permission()217     fn test_unsupported_permission() {
218         block_on_locally(async move {
219             // arrange
220             let (mut indication_handler, _confirmation_watcher) =
221                 IndicationHandler::new(get_att_database());
222 
223             // act: send an indication on an attribute that does not support indications
224             let ret = indication_handler
225                 .send(NON_INDICATE_HANDLE, get_data(), MTU, move |_| unreachable!())
226                 .await;
227 
228             // assert: that we failed with IndicationError::IndicationsNotSupported
229             assert!(matches!(ret, Err(IndicationError::IndicationsNotSupported)));
230         });
231     }
232 
233     #[test]
test_confirmation_handled()234     fn test_confirmation_handled() {
235         block_on_locally(async move {
236             // arrange
237             let (mut indication_handler, confirmation_watcher) =
238                 IndicationHandler::new(get_att_database());
239             let (tx, rx) = oneshot::channel();
240 
241             // act: send an indication
242             let pending_result = spawn_local(async move {
243                 indication_handler
244                     .send(HANDLE, get_data(), MTU, move |packet| {
245                         tx.send(packet).unwrap();
246                         Ok(())
247                     })
248                     .await
249             });
250             // when the indication is sent, send a confirmation in response
251             rx.await.unwrap();
252             confirmation_watcher.on_confirmation();
253 
254             // assert: the indication was successfully sent
255             assert!(matches!(pending_result.await.unwrap(), Ok(())));
256         });
257     }
258 
259     #[test]
test_unblock_on_disconnect()260     fn test_unblock_on_disconnect() {
261         block_on_locally(async move {
262             // arrange
263             let (mut indication_handler, confirmation_watcher) =
264                 IndicationHandler::new(get_att_database());
265             let (tx, rx) = oneshot::channel();
266 
267             // act: send an indication
268             let pending_result = spawn_local(async move {
269                 indication_handler
270                     .send(HANDLE, get_data(), MTU, move |packet| {
271                         tx.send(packet).unwrap();
272                         Ok(())
273                     })
274                     .await
275             });
276             // when the indication is sent, drop the confirmation watcher (as would happen
277             // upon a disconnection)
278             rx.await.unwrap();
279             drop(confirmation_watcher);
280 
281             // assert: we get the appropriate error
282             assert!(matches!(
283                 pending_result.await.unwrap(),
284                 Err(IndicationError::ConnectionDroppedWhileWaitingForConfirmation)
285             ));
286         });
287     }
288 
289     #[test]
test_spurious_confirmations()290     fn test_spurious_confirmations() {
291         block_on_locally(async move {
292             // arrange: send a few confirmations in advance
293             let (mut indication_handler, confirmation_watcher) =
294                 IndicationHandler::new(get_att_database());
295             let (tx, rx) = oneshot::channel();
296             confirmation_watcher.on_confirmation();
297             confirmation_watcher.on_confirmation();
298 
299             // act: send an indication
300             let pending_result = spawn_local(async move {
301                 indication_handler
302                     .send(HANDLE, get_data(), MTU, move |packet| {
303                         tx.send(packet).unwrap();
304                         Ok(())
305                     })
306                     .await
307             });
308             // when the indication is sent, drop the confirmation watcher (so we won't block
309             // forever)
310             rx.await.unwrap();
311             drop(confirmation_watcher);
312 
313             // assert: we get the appropriate error, rather than an Ok(())
314             // (which would have been the case if we had processed the spurious
315             // confirmations)
316             assert!(matches!(
317                 pending_result.await.unwrap(),
318                 Err(IndicationError::ConnectionDroppedWhileWaitingForConfirmation)
319             ));
320         });
321     }
322 
323     #[test]
test_indication_timeout()324     fn test_indication_timeout() {
325         block_on_locally(async move {
326             // arrange: send a few confirmations in advance
327             let (mut indication_handler, confirmation_watcher) =
328                 IndicationHandler::new(get_att_database());
329             let (tx, rx) = oneshot::channel();
330             confirmation_watcher.on_confirmation();
331             confirmation_watcher.on_confirmation();
332 
333             // act: send an indication
334             let time_sent = Instant::now();
335             let pending_result = spawn_local(async move {
336                 indication_handler
337                     .send(HANDLE, get_data(), MTU, move |packet| {
338                         tx.send(packet).unwrap();
339                         Ok(())
340                     })
341                     .await
342             });
343             // after it is sent, wait for the timer to fire
344             rx.await.unwrap();
345 
346             // assert: we get the appropriate error
347             assert!(matches!(
348                 pending_result.await.unwrap(),
349                 Err(IndicationError::ConfirmationTimeout)
350             ));
351             // after the appropriate interval
352             // note: this is not really timing-dependent, since we are using a simulated
353             // clock TODO(aryarahul) - why is this not exactly 30s?
354             let time_slept = Instant::now().duration_since(time_sent);
355             assert!(time_slept > Duration::from_secs(29));
356             assert!(time_slept < Duration::from_secs(31));
357         });
358     }
359 
360     #[test]
test_mtu_exceeds()361     fn test_mtu_exceeds() {
362         block_on_locally(async move {
363             // arrange
364             let (mut indication_handler, _confirmation_watcher) =
365                 IndicationHandler::new(get_att_database());
366 
367             // act: send an indication with an ATT_MTU of 4 and data length of 3
368             let res = indication_handler
369                 .send(
370                     HANDLE,
371                     AttAttributeDataChild::RawData([1, 2, 3].into()),
372                     4,
373                     move |_| unreachable!(),
374                 )
375                 .await;
376 
377             // assert: that we got the expected error, indicating the max data size (not the
378             // ATT_MTU, but ATT_MTU-3)
379             assert!(matches!(res, Err(IndicationError::DataExceedsMtu { mtu: 1 })));
380         });
381     }
382 }
383