1 //! This module handles an individual connection on the ATT fixed channel.
2 //! It handles ATT transactions and unacknowledged operations, backed by an
3 //! AttDatabase (that may in turn be backed by an upper-layer protocol)
4 
5 use std::{cell::Cell, future::Future};
6 
7 use anyhow::Result;
8 use log::{error, trace, warn};
9 use tokio::task::spawn_local;
10 
11 use crate::{
12     core::{
13         shared_box::{WeakBox, WeakBoxRef},
14         shared_mutex::SharedMutex,
15     },
16     gatt::{
17         ids::AttHandle,
18         mtu::{AttMtu, MtuEvent},
19         opcode_types::{classify_opcode, OperationType},
20     },
21     packets::{
22         AttAttributeDataChild, AttBuilder, AttChild, AttErrorCode, AttErrorResponseBuilder,
23         AttView, Packet, SerializeError,
24     },
25     utils::{owned_handle::OwnedHandle, packet::HACK_child_to_opcode},
26 };
27 
28 use super::{
29     att_database::AttDatabase,
30     command_handler::AttCommandHandler,
31     indication_handler::{ConfirmationWatcher, IndicationError, IndicationHandler},
32     request_handler::AttRequestHandler,
33 };
34 
35 enum AttRequestState<T: AttDatabase> {
36     Idle(AttRequestHandler<T>),
37     Pending { _task: OwnedHandle<()> },
38     Replacing,
39 }
40 
41 /// The errors that can occur while trying to send a packet
42 #[derive(Debug)]
43 pub enum SendError {
44     /// The packet failed to serialize
45     SerializeError(SerializeError),
46     /// The connection no longer exists
47     ConnectionDropped,
48 }
49 
50 /// This represents a single ATT bearer (currently, always the unenhanced fixed
51 /// channel on LE) The AttRequestState ensures that only one transaction can
52 /// take place at a time
53 pub struct AttServerBearer<T: AttDatabase> {
54     // general
55     send_packet: Box<dyn Fn(AttBuilder) -> Result<(), SerializeError>>,
56     mtu: AttMtu,
57 
58     // request state
59     curr_request: Cell<AttRequestState<T>>,
60 
61     // indication state
62     indication_handler: SharedMutex<IndicationHandler<T>>,
63     pending_confirmation: ConfirmationWatcher,
64 
65     // command handler (across all bearers)
66     command_handler: AttCommandHandler<T>,
67 }
68 
69 impl<T: AttDatabase + Clone + 'static> AttServerBearer<T> {
70     /// Constructor, wrapping an ATT channel (for outgoing packets) and an
71     /// AttDatabase
new( db: T, send_packet: impl Fn(AttBuilder) -> Result<(), SerializeError> + 'static, ) -> Self72     pub fn new(
73         db: T,
74         send_packet: impl Fn(AttBuilder) -> Result<(), SerializeError> + 'static,
75     ) -> Self {
76         let (indication_handler, pending_confirmation) = IndicationHandler::new(db.clone());
77         Self {
78             send_packet: Box::new(send_packet),
79             mtu: AttMtu::new(),
80 
81             curr_request: AttRequestState::Idle(AttRequestHandler::new(db.clone())).into(),
82 
83             indication_handler: SharedMutex::new(indication_handler),
84             pending_confirmation,
85 
86             command_handler: AttCommandHandler::new(db),
87         }
88     }
89 
send_packet(&self, packet: impl Into<AttChild>) -> Result<(), SerializeError>90     fn send_packet(&self, packet: impl Into<AttChild>) -> Result<(), SerializeError> {
91         let child = packet.into();
92         let packet = AttBuilder { opcode: HACK_child_to_opcode(&child), _child_: child };
93         (self.send_packet)(packet)
94     }
95 }
96 
97 impl<T: AttDatabase + Clone + 'static> WeakBoxRef<'_, AttServerBearer<T>> {
98     /// Handle an incoming packet, and send outgoing packets as appropriate
99     /// using the owned ATT channel.
handle_packet(&self, packet: AttView<'_>)100     pub fn handle_packet(&self, packet: AttView<'_>) {
101         match classify_opcode(packet.get_opcode()) {
102             OperationType::Command => {
103                 self.command_handler.process_packet(packet);
104             }
105             OperationType::Request => {
106                 self.handle_request(packet);
107             }
108             OperationType::Confirmation => self.pending_confirmation.on_confirmation(),
109             OperationType::Response | OperationType::Notification | OperationType::Indication => {
110                 unreachable!("the arbiter should not let us receive these packet types")
111             }
112         }
113     }
114 
115     /// Send an indication, wait for the peer confirmation, and return the
116     /// appropriate status If multiple calls are outstanding, they are
117     /// executed in FIFO order.
send_indication( &self, handle: AttHandle, data: AttAttributeDataChild, ) -> impl Future<Output = Result<(), IndicationError>>118     pub fn send_indication(
119         &self,
120         handle: AttHandle,
121         data: AttAttributeDataChild,
122     ) -> impl Future<Output = Result<(), IndicationError>> {
123         trace!("sending indication for handle {handle:?}");
124 
125         let locked_indication_handler = self.indication_handler.lock();
126         let pending_mtu = self.mtu.snapshot();
127         let this = self.downgrade();
128 
129         async move {
130             // first wait until we are at the head of the queue and are ready to send
131             // indications
132             let mut indication_handler = locked_indication_handler
133                 .await
134                 .ok_or_else(|| {
135                     warn!("indication for handle {handle:?} cancelled while queued since the connection dropped");
136                     IndicationError::SendError(SendError::ConnectionDropped)
137                 })?;
138             // then, if MTU negotiation is taking place, wait for it to complete
139             let mtu = pending_mtu
140                 .await
141                 .ok_or_else(|| {
142                     warn!("indication for handle {handle:?} cancelled while waiting for MTU exchange to complete since the connection dropped");
143                     IndicationError::SendError(SendError::ConnectionDropped)
144                 })?;
145             // finally, send, and wait for a response
146             indication_handler.send(handle, data, mtu, |packet| this.try_send_packet(packet)).await
147         }
148     }
149 
150     /// Handle a snooped MTU event, to update the MTU we use for our various
151     /// operations
handle_mtu_event(&self, mtu_event: MtuEvent) -> Result<()>152     pub fn handle_mtu_event(&self, mtu_event: MtuEvent) -> Result<()> {
153         self.mtu.handle_event(mtu_event)
154     }
155 
handle_request(&self, packet: AttView<'_>)156     fn handle_request(&self, packet: AttView<'_>) {
157         let curr_request = self.curr_request.replace(AttRequestState::Replacing);
158         self.curr_request.replace(match curr_request {
159             AttRequestState::Idle(mut request_handler) => {
160                 // even if the MTU is updated afterwards, 5.3 3F 3.4.2.2 states that the
161                 // request-time MTU should be used
162                 let mtu = self.mtu.snapshot_or_default();
163                 let packet = packet.to_owned_packet();
164                 let this = self.downgrade();
165                 let task = spawn_local(async move {
166                     trace!("starting ATT transaction");
167                     let reply = request_handler.process_packet(packet.view(), mtu).await;
168                     this.with(|this| {
169                         this.map(|this| {
170                             match this.send_packet(reply) {
171                                 Ok(_) => {
172                                     trace!("reply packet sent")
173                                 }
174                                 Err(err) => {
175                                     error!("serializer failure {err:?}, dropping packet and sending failed reply");
176                                     // if this also fails, we're stuck
177                                     if let Err(err) = this.send_packet(AttErrorResponseBuilder {
178                                         opcode_in_error: packet.view().get_opcode(),
179                                         handle_in_error: AttHandle(0).into(),
180                                         error_code: AttErrorCode::UNLIKELY_ERROR,
181                                     }) {
182                                         panic!("unexpected serialize error for known-good packet {err:?}")
183                                     }
184                                 }
185                             };
186                             // ready for next transaction
187                             this.curr_request.replace(AttRequestState::Idle(request_handler));
188                         })
189                     });
190                 });
191                 AttRequestState::Pending { _task: task.into() }
192             }
193             AttRequestState::Pending { .. } => {
194                 warn!("multiple ATT operations cannot simultaneously take place, dropping one");
195                 // TODO(aryarahul) - disconnect connection here;
196                 curr_request
197             }
198             AttRequestState::Replacing => {
199               panic!("Replacing is an ephemeral state");
200             }
201         });
202     }
203 }
204 
205 impl<T: AttDatabase + Clone + 'static> WeakBox<AttServerBearer<T>> {
try_send_packet(&self, packet: impl Into<AttChild>) -> Result<(), SendError>206     fn try_send_packet(&self, packet: impl Into<AttChild>) -> Result<(), SendError> {
207         self.with(|this| {
208             this.ok_or_else(|| {
209                 warn!("connection dropped before packet sent");
210                 SendError::ConnectionDropped
211             })?
212             .send_packet(packet)
213             .map_err(SendError::SerializeError)
214         })
215     }
216 }
217 
218 #[cfg(test)]
219 mod test {
220     use std::rc::Rc;
221 
222     use tokio::sync::mpsc::{error::TryRecvError, unbounded_channel, UnboundedReceiver};
223 
224     use super::*;
225 
226     use crate::{
227         core::{shared_box::SharedBox, uuid::Uuid},
228         gatt::{
229             ffi::AttributeBackingType,
230             ids::TransportIndex,
231             mocks::mock_datastore::{MockDatastore, MockDatastoreEvents},
232             server::{
233                 att_database::{AttAttribute, AttPermissions},
234                 gatt_database::{
235                     GattCharacteristicWithHandle, GattDatabase, GattServiceWithHandle,
236                 },
237                 test::test_att_db::TestAttDatabase,
238             },
239         },
240         packets::{
241             AttAttributeDataBuilder, AttAttributeDataChild, AttHandleValueConfirmationBuilder,
242             AttOpcode, AttReadRequestBuilder, AttReadResponseBuilder,
243         },
244         utils::{
245             packet::build_att_view_or_crash,
246             task::{block_on_locally, try_await},
247         },
248     };
249 
250     const VALID_HANDLE: AttHandle = AttHandle(3);
251     const INVALID_HANDLE: AttHandle = AttHandle(4);
252     const ANOTHER_VALID_HANDLE: AttHandle = AttHandle(10);
253 
254     const TCB_IDX: TransportIndex = TransportIndex(1);
255 
open_connection( ) -> (SharedBox<AttServerBearer<TestAttDatabase>>, UnboundedReceiver<AttBuilder>)256     fn open_connection(
257     ) -> (SharedBox<AttServerBearer<TestAttDatabase>>, UnboundedReceiver<AttBuilder>) {
258         let db = TestAttDatabase::new(vec![
259             (
260                 AttAttribute {
261                     handle: VALID_HANDLE,
262                     type_: Uuid::new(0x1234),
263                     permissions: AttPermissions::READABLE | AttPermissions::INDICATE,
264                 },
265                 vec![5, 6],
266             ),
267             (
268                 AttAttribute {
269                     handle: ANOTHER_VALID_HANDLE,
270                     type_: Uuid::new(0x5678),
271                     permissions: AttPermissions::READABLE | AttPermissions::INDICATE,
272                 },
273                 vec![5, 6],
274             ),
275         ]);
276         let (tx, rx) = unbounded_channel();
277         let conn = AttServerBearer::new(db, move |packet| {
278             tx.send(packet).unwrap();
279             Ok(())
280         })
281         .into();
282         (conn, rx)
283     }
284 
285     #[test]
test_single_transaction()286     fn test_single_transaction() {
287         block_on_locally(async {
288             let (conn, mut rx) = open_connection();
289             conn.as_ref().handle_packet(
290                 build_att_view_or_crash(AttReadRequestBuilder {
291                     attribute_handle: VALID_HANDLE.into(),
292                 })
293                 .view(),
294             );
295             assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::READ_RESPONSE);
296             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
297         });
298     }
299 
300     #[test]
test_sequential_transactions()301     fn test_sequential_transactions() {
302         block_on_locally(async {
303             let (conn, mut rx) = open_connection();
304             conn.as_ref().handle_packet(
305                 build_att_view_or_crash(AttReadRequestBuilder {
306                     attribute_handle: INVALID_HANDLE.into(),
307                 })
308                 .view(),
309             );
310             assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::ERROR_RESPONSE);
311             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
312 
313             conn.as_ref().handle_packet(
314                 build_att_view_or_crash(AttReadRequestBuilder {
315                     attribute_handle: VALID_HANDLE.into(),
316                 })
317                 .view(),
318             );
319             assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::READ_RESPONSE);
320             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
321         });
322     }
323 
324     #[test]
test_concurrent_transaction_failure()325     fn test_concurrent_transaction_failure() {
326         // arrange: AttServerBearer linked to a backing datastore and packet queue, with
327         // two characteristics in the database
328         let (datastore, mut data_rx) = MockDatastore::new();
329         let datastore = Rc::new(datastore);
330         let db = SharedBox::new(GattDatabase::new());
331         db.add_service_with_handles(
332             GattServiceWithHandle {
333                 handle: AttHandle(1),
334                 type_: Uuid::new(1),
335                 characteristics: vec![
336                     GattCharacteristicWithHandle {
337                         handle: VALID_HANDLE,
338                         type_: Uuid::new(2),
339                         permissions: AttPermissions::READABLE,
340                         descriptors: vec![],
341                     },
342                     GattCharacteristicWithHandle {
343                         handle: ANOTHER_VALID_HANDLE,
344                         type_: Uuid::new(2),
345                         permissions: AttPermissions::READABLE,
346                         descriptors: vec![],
347                     },
348                 ],
349             },
350             datastore,
351         )
352         .unwrap();
353         let (tx, mut rx) = unbounded_channel();
354         let send_packet = move |packet| {
355             tx.send(packet).unwrap();
356             Ok(())
357         };
358         let conn = SharedBox::new(AttServerBearer::new(db.get_att_database(TCB_IDX), send_packet));
359         let data = [1, 2];
360 
361         // act: send two read requests before replying to either read
362         // first request
363         block_on_locally(async {
364             let req1 = build_att_view_or_crash(AttReadRequestBuilder {
365                 attribute_handle: VALID_HANDLE.into(),
366             });
367             conn.as_ref().handle_packet(req1.view());
368             // second request
369             let req2 = build_att_view_or_crash(AttReadRequestBuilder {
370                 attribute_handle: ANOTHER_VALID_HANDLE.into(),
371             });
372             conn.as_ref().handle_packet(req2.view());
373             // handle first reply
374             let MockDatastoreEvents::Read(
375                 TCB_IDX,
376                 VALID_HANDLE,
377                 AttributeBackingType::Characteristic,
378                 data_resp,
379             ) = data_rx.recv().await.unwrap()
380             else {
381                 unreachable!();
382             };
383             data_resp.send(Ok(data.to_vec())).unwrap();
384             trace!("reply sent from upper tester");
385 
386             // assert: that the first reply was made
387             let resp = rx.recv().await.unwrap();
388             assert_eq!(
389                 resp,
390                 AttBuilder {
391                     opcode: AttOpcode::READ_RESPONSE,
392                     _child_: AttReadResponseBuilder {
393                         value: AttAttributeDataBuilder {
394                             _child_: AttAttributeDataChild::RawData(
395                                 data.to_vec().into_boxed_slice()
396                             )
397                         },
398                     }
399                     .into()
400                 }
401             );
402             // assert no other replies were made
403             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
404             // assert no callbacks are pending
405             assert_eq!(data_rx.try_recv().unwrap_err(), TryRecvError::Empty);
406         });
407     }
408 
409     #[test]
test_indication_confirmation()410     fn test_indication_confirmation() {
411         block_on_locally(async {
412             // arrange
413             let (conn, mut rx) = open_connection();
414 
415             // act: send an indication
416             let pending_send =
417                 spawn_local(conn.as_ref().send_indication(
418                     VALID_HANDLE,
419                     AttAttributeDataChild::RawData([1, 2, 3].into()),
420                 ));
421             assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::HANDLE_VALUE_INDICATION);
422             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
423             // and the confirmation
424             conn.as_ref().handle_packet(
425                 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(),
426             );
427 
428             // assert: the indication was correctly sent
429             assert!(matches!(pending_send.await.unwrap(), Ok(())));
430         });
431     }
432 
433     #[test]
test_sequential_indications()434     fn test_sequential_indications() {
435         block_on_locally(async {
436             // arrange
437             let (conn, mut rx) = open_connection();
438 
439             // act: send the first indication
440             let pending_send1 =
441                 spawn_local(conn.as_ref().send_indication(
442                     VALID_HANDLE,
443                     AttAttributeDataChild::RawData([1, 2, 3].into()),
444                 ));
445             // wait for/capture the outgoing packet
446             let sent1 = rx.recv().await.unwrap();
447             // send the response
448             conn.as_ref().handle_packet(
449                 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(),
450             );
451             // send the second indication
452             let pending_send2 =
453                 spawn_local(conn.as_ref().send_indication(
454                     VALID_HANDLE,
455                     AttAttributeDataChild::RawData([1, 2, 3].into()),
456                 ));
457             // wait for/capture the outgoing packet
458             let sent2 = rx.recv().await.unwrap();
459             // and the response
460             conn.as_ref().handle_packet(
461                 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(),
462             );
463 
464             // assert: exactly two indications were sent
465             assert_eq!(sent1.opcode, AttOpcode::HANDLE_VALUE_INDICATION);
466             assert_eq!(sent2.opcode, AttOpcode::HANDLE_VALUE_INDICATION);
467             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
468             // and that both got successful responses
469             assert!(matches!(pending_send1.await.unwrap(), Ok(())));
470             assert!(matches!(pending_send2.await.unwrap(), Ok(())));
471         });
472     }
473 
474     #[test]
test_queued_indications_only_one_sent()475     fn test_queued_indications_only_one_sent() {
476         block_on_locally(async {
477             // arrange
478             let (conn, mut rx) = open_connection();
479 
480             // act: send two indications simultaneously
481             let pending_send1 =
482                 spawn_local(conn.as_ref().send_indication(
483                     VALID_HANDLE,
484                     AttAttributeDataChild::RawData([1, 2, 3].into()),
485                 ));
486             let pending_send2 = spawn_local(conn.as_ref().send_indication(
487                 ANOTHER_VALID_HANDLE,
488                 AttAttributeDataChild::RawData([1, 2, 3].into()),
489             ));
490             // assert: only one was initially sent
491             assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::HANDLE_VALUE_INDICATION);
492             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
493             // and both are still pending
494             assert!(!pending_send1.is_finished());
495             assert!(!pending_send2.is_finished());
496         });
497     }
498 
499     #[test]
test_queued_indications_dequeue_second()500     fn test_queued_indications_dequeue_second() {
501         block_on_locally(async {
502             // arrange
503             let (conn, mut rx) = open_connection();
504 
505             // act: send two indications simultaneously
506             let pending_send1 =
507                 spawn_local(conn.as_ref().send_indication(
508                     VALID_HANDLE,
509                     AttAttributeDataChild::RawData([1, 2, 3].into()),
510                 ));
511             let pending_send2 = spawn_local(conn.as_ref().send_indication(
512                 ANOTHER_VALID_HANDLE,
513                 AttAttributeDataChild::RawData([1, 2, 3].into()),
514             ));
515             // wait for/capture the outgoing packet
516             let sent1 = rx.recv().await.unwrap();
517             // send response for the first one
518             conn.as_ref().handle_packet(
519                 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(),
520             );
521             // wait for/capture the outgoing packet
522             let sent2 = rx.recv().await.unwrap();
523 
524             // assert: the first future has completed successfully, the second one is
525             // pending
526             assert!(matches!(pending_send1.await.unwrap(), Ok(())));
527             assert!(!pending_send2.is_finished());
528             // and that both indications have been sent
529             assert_eq!(sent1.opcode, AttOpcode::HANDLE_VALUE_INDICATION);
530             assert_eq!(sent2.opcode, AttOpcode::HANDLE_VALUE_INDICATION);
531             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
532         });
533     }
534 
535     #[test]
test_queued_indications_complete_both()536     fn test_queued_indications_complete_both() {
537         block_on_locally(async {
538             // arrange
539             let (conn, mut rx) = open_connection();
540 
541             // act: send two indications simultaneously
542             let pending_send1 =
543                 spawn_local(conn.as_ref().send_indication(
544                     VALID_HANDLE,
545                     AttAttributeDataChild::RawData([1, 2, 3].into()),
546                 ));
547             let pending_send2 = spawn_local(conn.as_ref().send_indication(
548                 ANOTHER_VALID_HANDLE,
549                 AttAttributeDataChild::RawData([1, 2, 3].into()),
550             ));
551             // wait for/capture the outgoing packet
552             let sent1 = rx.recv().await.unwrap();
553             // send response for the first one
554             conn.as_ref().handle_packet(
555                 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(),
556             );
557             // wait for/capture the outgoing packet
558             let sent2 = rx.recv().await.unwrap();
559             // and now the second
560             conn.as_ref().handle_packet(
561                 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(),
562             );
563 
564             // assert: both futures have completed successfully
565             assert!(matches!(pending_send1.await.unwrap(), Ok(())));
566             assert!(matches!(pending_send2.await.unwrap(), Ok(())));
567             // and both indications have been sent
568             assert_eq!(sent1.opcode, AttOpcode::HANDLE_VALUE_INDICATION);
569             assert_eq!(sent2.opcode, AttOpcode::HANDLE_VALUE_INDICATION);
570             assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
571         });
572     }
573 
574     #[test]
test_indication_connection_drop()575     fn test_indication_connection_drop() {
576         block_on_locally(async {
577             // arrange: a pending indication
578             let (conn, mut rx) = open_connection();
579             let pending_send =
580                 spawn_local(conn.as_ref().send_indication(
581                     VALID_HANDLE,
582                     AttAttributeDataChild::RawData([1, 2, 3].into()),
583                 ));
584 
585             // act: drop the connection after the indication is sent
586             rx.recv().await.unwrap();
587             drop(conn);
588 
589             // assert: the pending indication fails with the appropriate error
590             assert!(matches!(
591                 pending_send.await.unwrap(),
592                 Err(IndicationError::ConnectionDroppedWhileWaitingForConfirmation)
593             ));
594         });
595     }
596 
597     #[test]
test_single_indication_pending_mtu()598     fn test_single_indication_pending_mtu() {
599         block_on_locally(async {
600             // arrange: pending MTU negotiation
601             let (conn, mut rx) = open_connection();
602             conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap();
603 
604             // act: try to send an indication with a large payload size
605             let _ =
606                 try_await(conn.as_ref().send_indication(
607                     VALID_HANDLE,
608                     AttAttributeDataChild::RawData((1..50).collect()),
609                 ))
610                 .await;
611             // then resolve the MTU negotiation with a large MTU
612             conn.as_ref().handle_mtu_event(MtuEvent::IncomingResponse(100)).unwrap();
613 
614             // assert: the indication was sent
615             assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::HANDLE_VALUE_INDICATION);
616         });
617     }
618 
619     #[test]
test_single_indication_pending_mtu_fail()620     fn test_single_indication_pending_mtu_fail() {
621         block_on_locally(async {
622             // arrange: pending MTU negotiation
623             let (conn, _) = open_connection();
624             conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap();
625 
626             // act: try to send an indication with a large payload size
627             let pending_mtu =
628                 try_await(conn.as_ref().send_indication(
629                     VALID_HANDLE,
630                     AttAttributeDataChild::RawData((1..50).collect()),
631                 ))
632                 .await
633                 .unwrap_err();
634             // then resolve the MTU negotiation with a small MTU
635             conn.as_ref().handle_mtu_event(MtuEvent::IncomingResponse(32)).unwrap();
636 
637             // assert: the indication failed to send
638             assert!(matches!(pending_mtu.await, Err(IndicationError::DataExceedsMtu { .. })));
639         });
640     }
641 
642     #[test]
test_server_transaction_pending_mtu()643     fn test_server_transaction_pending_mtu() {
644         block_on_locally(async {
645             // arrange: pending MTU negotiation
646             let (conn, mut rx) = open_connection();
647             conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap();
648 
649             // act: send server packet
650             conn.as_ref().handle_packet(
651                 build_att_view_or_crash(AttReadRequestBuilder {
652                     attribute_handle: VALID_HANDLE.into(),
653                 })
654                 .view(),
655             );
656 
657             // assert: that we reply even while the MTU req is outstanding
658             assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::READ_RESPONSE);
659         });
660     }
661 
662     #[test]
test_queued_indication_pending_mtu_uses_mtu_on_dequeue()663     fn test_queued_indication_pending_mtu_uses_mtu_on_dequeue() {
664         block_on_locally(async {
665             // arrange: an outstanding indication
666             let (conn, mut rx) = open_connection();
667             let _ =
668                 try_await(conn.as_ref().send_indication(
669                     VALID_HANDLE,
670                     AttAttributeDataChild::RawData([1, 2, 3].into()),
671                 ))
672                 .await;
673             rx.recv().await.unwrap(); // flush rx_queue
674 
675             // act: enqueue an indication with a large payload
676             let _ =
677                 try_await(conn.as_ref().send_indication(
678                     VALID_HANDLE,
679                     AttAttributeDataChild::RawData((1..50).collect()),
680                 ))
681                 .await;
682             // then perform MTU negotiation to upgrade to a large MTU
683             conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap();
684             conn.as_ref().handle_mtu_event(MtuEvent::IncomingResponse(512)).unwrap();
685             // finally resolve the first indication, so the second indication can be sent
686             conn.as_ref().handle_packet(
687                 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(),
688             );
689 
690             // assert: the second indication successfully sent (so it used the new MTU)
691             assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::HANDLE_VALUE_INDICATION);
692         });
693     }
694 }
695