1 //! This module handles an individual connection on the ATT fixed channel. 2 //! It handles ATT transactions and unacknowledged operations, backed by an 3 //! AttDatabase (that may in turn be backed by an upper-layer protocol) 4 5 use std::{cell::Cell, future::Future}; 6 7 use anyhow::Result; 8 use log::{error, trace, warn}; 9 use tokio::task::spawn_local; 10 11 use crate::{ 12 core::{ 13 shared_box::{WeakBox, WeakBoxRef}, 14 shared_mutex::SharedMutex, 15 }, 16 gatt::{ 17 ids::AttHandle, 18 mtu::{AttMtu, MtuEvent}, 19 opcode_types::{classify_opcode, OperationType}, 20 }, 21 packets::{ 22 AttAttributeDataChild, AttBuilder, AttChild, AttErrorCode, AttErrorResponseBuilder, 23 AttView, Packet, SerializeError, 24 }, 25 utils::{owned_handle::OwnedHandle, packet::HACK_child_to_opcode}, 26 }; 27 28 use super::{ 29 att_database::AttDatabase, 30 command_handler::AttCommandHandler, 31 indication_handler::{ConfirmationWatcher, IndicationError, IndicationHandler}, 32 request_handler::AttRequestHandler, 33 }; 34 35 enum AttRequestState<T: AttDatabase> { 36 Idle(AttRequestHandler<T>), 37 Pending { _task: OwnedHandle<()> }, 38 Replacing, 39 } 40 41 /// The errors that can occur while trying to send a packet 42 #[derive(Debug)] 43 pub enum SendError { 44 /// The packet failed to serialize 45 SerializeError(SerializeError), 46 /// The connection no longer exists 47 ConnectionDropped, 48 } 49 50 /// This represents a single ATT bearer (currently, always the unenhanced fixed 51 /// channel on LE) The AttRequestState ensures that only one transaction can 52 /// take place at a time 53 pub struct AttServerBearer<T: AttDatabase> { 54 // general 55 send_packet: Box<dyn Fn(AttBuilder) -> Result<(), SerializeError>>, 56 mtu: AttMtu, 57 58 // request state 59 curr_request: Cell<AttRequestState<T>>, 60 61 // indication state 62 indication_handler: SharedMutex<IndicationHandler<T>>, 63 pending_confirmation: ConfirmationWatcher, 64 65 // command handler (across all bearers) 66 command_handler: AttCommandHandler<T>, 67 } 68 69 impl<T: AttDatabase + Clone + 'static> AttServerBearer<T> { 70 /// Constructor, wrapping an ATT channel (for outgoing packets) and an 71 /// AttDatabase new( db: T, send_packet: impl Fn(AttBuilder) -> Result<(), SerializeError> + 'static, ) -> Self72 pub fn new( 73 db: T, 74 send_packet: impl Fn(AttBuilder) -> Result<(), SerializeError> + 'static, 75 ) -> Self { 76 let (indication_handler, pending_confirmation) = IndicationHandler::new(db.clone()); 77 Self { 78 send_packet: Box::new(send_packet), 79 mtu: AttMtu::new(), 80 81 curr_request: AttRequestState::Idle(AttRequestHandler::new(db.clone())).into(), 82 83 indication_handler: SharedMutex::new(indication_handler), 84 pending_confirmation, 85 86 command_handler: AttCommandHandler::new(db), 87 } 88 } 89 send_packet(&self, packet: impl Into<AttChild>) -> Result<(), SerializeError>90 fn send_packet(&self, packet: impl Into<AttChild>) -> Result<(), SerializeError> { 91 let child = packet.into(); 92 let packet = AttBuilder { opcode: HACK_child_to_opcode(&child), _child_: child }; 93 (self.send_packet)(packet) 94 } 95 } 96 97 impl<T: AttDatabase + Clone + 'static> WeakBoxRef<'_, AttServerBearer<T>> { 98 /// Handle an incoming packet, and send outgoing packets as appropriate 99 /// using the owned ATT channel. handle_packet(&self, packet: AttView<'_>)100 pub fn handle_packet(&self, packet: AttView<'_>) { 101 match classify_opcode(packet.get_opcode()) { 102 OperationType::Command => { 103 self.command_handler.process_packet(packet); 104 } 105 OperationType::Request => { 106 self.handle_request(packet); 107 } 108 OperationType::Confirmation => self.pending_confirmation.on_confirmation(), 109 OperationType::Response | OperationType::Notification | OperationType::Indication => { 110 unreachable!("the arbiter should not let us receive these packet types") 111 } 112 } 113 } 114 115 /// Send an indication, wait for the peer confirmation, and return the 116 /// appropriate status If multiple calls are outstanding, they are 117 /// executed in FIFO order. send_indication( &self, handle: AttHandle, data: AttAttributeDataChild, ) -> impl Future<Output = Result<(), IndicationError>>118 pub fn send_indication( 119 &self, 120 handle: AttHandle, 121 data: AttAttributeDataChild, 122 ) -> impl Future<Output = Result<(), IndicationError>> { 123 trace!("sending indication for handle {handle:?}"); 124 125 let locked_indication_handler = self.indication_handler.lock(); 126 let pending_mtu = self.mtu.snapshot(); 127 let this = self.downgrade(); 128 129 async move { 130 // first wait until we are at the head of the queue and are ready to send 131 // indications 132 let mut indication_handler = locked_indication_handler 133 .await 134 .ok_or_else(|| { 135 warn!("indication for handle {handle:?} cancelled while queued since the connection dropped"); 136 IndicationError::SendError(SendError::ConnectionDropped) 137 })?; 138 // then, if MTU negotiation is taking place, wait for it to complete 139 let mtu = pending_mtu 140 .await 141 .ok_or_else(|| { 142 warn!("indication for handle {handle:?} cancelled while waiting for MTU exchange to complete since the connection dropped"); 143 IndicationError::SendError(SendError::ConnectionDropped) 144 })?; 145 // finally, send, and wait for a response 146 indication_handler.send(handle, data, mtu, |packet| this.try_send_packet(packet)).await 147 } 148 } 149 150 /// Handle a snooped MTU event, to update the MTU we use for our various 151 /// operations handle_mtu_event(&self, mtu_event: MtuEvent) -> Result<()>152 pub fn handle_mtu_event(&self, mtu_event: MtuEvent) -> Result<()> { 153 self.mtu.handle_event(mtu_event) 154 } 155 handle_request(&self, packet: AttView<'_>)156 fn handle_request(&self, packet: AttView<'_>) { 157 let curr_request = self.curr_request.replace(AttRequestState::Replacing); 158 self.curr_request.replace(match curr_request { 159 AttRequestState::Idle(mut request_handler) => { 160 // even if the MTU is updated afterwards, 5.3 3F 3.4.2.2 states that the 161 // request-time MTU should be used 162 let mtu = self.mtu.snapshot_or_default(); 163 let packet = packet.to_owned_packet(); 164 let this = self.downgrade(); 165 let task = spawn_local(async move { 166 trace!("starting ATT transaction"); 167 let reply = request_handler.process_packet(packet.view(), mtu).await; 168 this.with(|this| { 169 this.map(|this| { 170 match this.send_packet(reply) { 171 Ok(_) => { 172 trace!("reply packet sent") 173 } 174 Err(err) => { 175 error!("serializer failure {err:?}, dropping packet and sending failed reply"); 176 // if this also fails, we're stuck 177 if let Err(err) = this.send_packet(AttErrorResponseBuilder { 178 opcode_in_error: packet.view().get_opcode(), 179 handle_in_error: AttHandle(0).into(), 180 error_code: AttErrorCode::UNLIKELY_ERROR, 181 }) { 182 panic!("unexpected serialize error for known-good packet {err:?}") 183 } 184 } 185 }; 186 // ready for next transaction 187 this.curr_request.replace(AttRequestState::Idle(request_handler)); 188 }) 189 }); 190 }); 191 AttRequestState::Pending { _task: task.into() } 192 } 193 AttRequestState::Pending { .. } => { 194 warn!("multiple ATT operations cannot simultaneously take place, dropping one"); 195 // TODO(aryarahul) - disconnect connection here; 196 curr_request 197 } 198 AttRequestState::Replacing => { 199 panic!("Replacing is an ephemeral state"); 200 } 201 }); 202 } 203 } 204 205 impl<T: AttDatabase + Clone + 'static> WeakBox<AttServerBearer<T>> { try_send_packet(&self, packet: impl Into<AttChild>) -> Result<(), SendError>206 fn try_send_packet(&self, packet: impl Into<AttChild>) -> Result<(), SendError> { 207 self.with(|this| { 208 this.ok_or_else(|| { 209 warn!("connection dropped before packet sent"); 210 SendError::ConnectionDropped 211 })? 212 .send_packet(packet) 213 .map_err(SendError::SerializeError) 214 }) 215 } 216 } 217 218 #[cfg(test)] 219 mod test { 220 use std::rc::Rc; 221 222 use tokio::sync::mpsc::{error::TryRecvError, unbounded_channel, UnboundedReceiver}; 223 224 use super::*; 225 226 use crate::{ 227 core::{shared_box::SharedBox, uuid::Uuid}, 228 gatt::{ 229 ffi::AttributeBackingType, 230 ids::TransportIndex, 231 mocks::mock_datastore::{MockDatastore, MockDatastoreEvents}, 232 server::{ 233 att_database::{AttAttribute, AttPermissions}, 234 gatt_database::{ 235 GattCharacteristicWithHandle, GattDatabase, GattServiceWithHandle, 236 }, 237 test::test_att_db::TestAttDatabase, 238 }, 239 }, 240 packets::{ 241 AttAttributeDataBuilder, AttAttributeDataChild, AttHandleValueConfirmationBuilder, 242 AttOpcode, AttReadRequestBuilder, AttReadResponseBuilder, 243 }, 244 utils::{ 245 packet::build_att_view_or_crash, 246 task::{block_on_locally, try_await}, 247 }, 248 }; 249 250 const VALID_HANDLE: AttHandle = AttHandle(3); 251 const INVALID_HANDLE: AttHandle = AttHandle(4); 252 const ANOTHER_VALID_HANDLE: AttHandle = AttHandle(10); 253 254 const TCB_IDX: TransportIndex = TransportIndex(1); 255 open_connection( ) -> (SharedBox<AttServerBearer<TestAttDatabase>>, UnboundedReceiver<AttBuilder>)256 fn open_connection( 257 ) -> (SharedBox<AttServerBearer<TestAttDatabase>>, UnboundedReceiver<AttBuilder>) { 258 let db = TestAttDatabase::new(vec![ 259 ( 260 AttAttribute { 261 handle: VALID_HANDLE, 262 type_: Uuid::new(0x1234), 263 permissions: AttPermissions::READABLE | AttPermissions::INDICATE, 264 }, 265 vec![5, 6], 266 ), 267 ( 268 AttAttribute { 269 handle: ANOTHER_VALID_HANDLE, 270 type_: Uuid::new(0x5678), 271 permissions: AttPermissions::READABLE | AttPermissions::INDICATE, 272 }, 273 vec![5, 6], 274 ), 275 ]); 276 let (tx, rx) = unbounded_channel(); 277 let conn = AttServerBearer::new(db, move |packet| { 278 tx.send(packet).unwrap(); 279 Ok(()) 280 }) 281 .into(); 282 (conn, rx) 283 } 284 285 #[test] test_single_transaction()286 fn test_single_transaction() { 287 block_on_locally(async { 288 let (conn, mut rx) = open_connection(); 289 conn.as_ref().handle_packet( 290 build_att_view_or_crash(AttReadRequestBuilder { 291 attribute_handle: VALID_HANDLE.into(), 292 }) 293 .view(), 294 ); 295 assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::READ_RESPONSE); 296 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 297 }); 298 } 299 300 #[test] test_sequential_transactions()301 fn test_sequential_transactions() { 302 block_on_locally(async { 303 let (conn, mut rx) = open_connection(); 304 conn.as_ref().handle_packet( 305 build_att_view_or_crash(AttReadRequestBuilder { 306 attribute_handle: INVALID_HANDLE.into(), 307 }) 308 .view(), 309 ); 310 assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::ERROR_RESPONSE); 311 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 312 313 conn.as_ref().handle_packet( 314 build_att_view_or_crash(AttReadRequestBuilder { 315 attribute_handle: VALID_HANDLE.into(), 316 }) 317 .view(), 318 ); 319 assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::READ_RESPONSE); 320 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 321 }); 322 } 323 324 #[test] test_concurrent_transaction_failure()325 fn test_concurrent_transaction_failure() { 326 // arrange: AttServerBearer linked to a backing datastore and packet queue, with 327 // two characteristics in the database 328 let (datastore, mut data_rx) = MockDatastore::new(); 329 let datastore = Rc::new(datastore); 330 let db = SharedBox::new(GattDatabase::new()); 331 db.add_service_with_handles( 332 GattServiceWithHandle { 333 handle: AttHandle(1), 334 type_: Uuid::new(1), 335 characteristics: vec![ 336 GattCharacteristicWithHandle { 337 handle: VALID_HANDLE, 338 type_: Uuid::new(2), 339 permissions: AttPermissions::READABLE, 340 descriptors: vec![], 341 }, 342 GattCharacteristicWithHandle { 343 handle: ANOTHER_VALID_HANDLE, 344 type_: Uuid::new(2), 345 permissions: AttPermissions::READABLE, 346 descriptors: vec![], 347 }, 348 ], 349 }, 350 datastore, 351 ) 352 .unwrap(); 353 let (tx, mut rx) = unbounded_channel(); 354 let send_packet = move |packet| { 355 tx.send(packet).unwrap(); 356 Ok(()) 357 }; 358 let conn = SharedBox::new(AttServerBearer::new(db.get_att_database(TCB_IDX), send_packet)); 359 let data = [1, 2]; 360 361 // act: send two read requests before replying to either read 362 // first request 363 block_on_locally(async { 364 let req1 = build_att_view_or_crash(AttReadRequestBuilder { 365 attribute_handle: VALID_HANDLE.into(), 366 }); 367 conn.as_ref().handle_packet(req1.view()); 368 // second request 369 let req2 = build_att_view_or_crash(AttReadRequestBuilder { 370 attribute_handle: ANOTHER_VALID_HANDLE.into(), 371 }); 372 conn.as_ref().handle_packet(req2.view()); 373 // handle first reply 374 let MockDatastoreEvents::Read( 375 TCB_IDX, 376 VALID_HANDLE, 377 AttributeBackingType::Characteristic, 378 data_resp, 379 ) = data_rx.recv().await.unwrap() 380 else { 381 unreachable!(); 382 }; 383 data_resp.send(Ok(data.to_vec())).unwrap(); 384 trace!("reply sent from upper tester"); 385 386 // assert: that the first reply was made 387 let resp = rx.recv().await.unwrap(); 388 assert_eq!( 389 resp, 390 AttBuilder { 391 opcode: AttOpcode::READ_RESPONSE, 392 _child_: AttReadResponseBuilder { 393 value: AttAttributeDataBuilder { 394 _child_: AttAttributeDataChild::RawData( 395 data.to_vec().into_boxed_slice() 396 ) 397 }, 398 } 399 .into() 400 } 401 ); 402 // assert no other replies were made 403 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 404 // assert no callbacks are pending 405 assert_eq!(data_rx.try_recv().unwrap_err(), TryRecvError::Empty); 406 }); 407 } 408 409 #[test] test_indication_confirmation()410 fn test_indication_confirmation() { 411 block_on_locally(async { 412 // arrange 413 let (conn, mut rx) = open_connection(); 414 415 // act: send an indication 416 let pending_send = 417 spawn_local(conn.as_ref().send_indication( 418 VALID_HANDLE, 419 AttAttributeDataChild::RawData([1, 2, 3].into()), 420 )); 421 assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::HANDLE_VALUE_INDICATION); 422 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 423 // and the confirmation 424 conn.as_ref().handle_packet( 425 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(), 426 ); 427 428 // assert: the indication was correctly sent 429 assert!(matches!(pending_send.await.unwrap(), Ok(()))); 430 }); 431 } 432 433 #[test] test_sequential_indications()434 fn test_sequential_indications() { 435 block_on_locally(async { 436 // arrange 437 let (conn, mut rx) = open_connection(); 438 439 // act: send the first indication 440 let pending_send1 = 441 spawn_local(conn.as_ref().send_indication( 442 VALID_HANDLE, 443 AttAttributeDataChild::RawData([1, 2, 3].into()), 444 )); 445 // wait for/capture the outgoing packet 446 let sent1 = rx.recv().await.unwrap(); 447 // send the response 448 conn.as_ref().handle_packet( 449 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(), 450 ); 451 // send the second indication 452 let pending_send2 = 453 spawn_local(conn.as_ref().send_indication( 454 VALID_HANDLE, 455 AttAttributeDataChild::RawData([1, 2, 3].into()), 456 )); 457 // wait for/capture the outgoing packet 458 let sent2 = rx.recv().await.unwrap(); 459 // and the response 460 conn.as_ref().handle_packet( 461 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(), 462 ); 463 464 // assert: exactly two indications were sent 465 assert_eq!(sent1.opcode, AttOpcode::HANDLE_VALUE_INDICATION); 466 assert_eq!(sent2.opcode, AttOpcode::HANDLE_VALUE_INDICATION); 467 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 468 // and that both got successful responses 469 assert!(matches!(pending_send1.await.unwrap(), Ok(()))); 470 assert!(matches!(pending_send2.await.unwrap(), Ok(()))); 471 }); 472 } 473 474 #[test] test_queued_indications_only_one_sent()475 fn test_queued_indications_only_one_sent() { 476 block_on_locally(async { 477 // arrange 478 let (conn, mut rx) = open_connection(); 479 480 // act: send two indications simultaneously 481 let pending_send1 = 482 spawn_local(conn.as_ref().send_indication( 483 VALID_HANDLE, 484 AttAttributeDataChild::RawData([1, 2, 3].into()), 485 )); 486 let pending_send2 = spawn_local(conn.as_ref().send_indication( 487 ANOTHER_VALID_HANDLE, 488 AttAttributeDataChild::RawData([1, 2, 3].into()), 489 )); 490 // assert: only one was initially sent 491 assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::HANDLE_VALUE_INDICATION); 492 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 493 // and both are still pending 494 assert!(!pending_send1.is_finished()); 495 assert!(!pending_send2.is_finished()); 496 }); 497 } 498 499 #[test] test_queued_indications_dequeue_second()500 fn test_queued_indications_dequeue_second() { 501 block_on_locally(async { 502 // arrange 503 let (conn, mut rx) = open_connection(); 504 505 // act: send two indications simultaneously 506 let pending_send1 = 507 spawn_local(conn.as_ref().send_indication( 508 VALID_HANDLE, 509 AttAttributeDataChild::RawData([1, 2, 3].into()), 510 )); 511 let pending_send2 = spawn_local(conn.as_ref().send_indication( 512 ANOTHER_VALID_HANDLE, 513 AttAttributeDataChild::RawData([1, 2, 3].into()), 514 )); 515 // wait for/capture the outgoing packet 516 let sent1 = rx.recv().await.unwrap(); 517 // send response for the first one 518 conn.as_ref().handle_packet( 519 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(), 520 ); 521 // wait for/capture the outgoing packet 522 let sent2 = rx.recv().await.unwrap(); 523 524 // assert: the first future has completed successfully, the second one is 525 // pending 526 assert!(matches!(pending_send1.await.unwrap(), Ok(()))); 527 assert!(!pending_send2.is_finished()); 528 // and that both indications have been sent 529 assert_eq!(sent1.opcode, AttOpcode::HANDLE_VALUE_INDICATION); 530 assert_eq!(sent2.opcode, AttOpcode::HANDLE_VALUE_INDICATION); 531 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 532 }); 533 } 534 535 #[test] test_queued_indications_complete_both()536 fn test_queued_indications_complete_both() { 537 block_on_locally(async { 538 // arrange 539 let (conn, mut rx) = open_connection(); 540 541 // act: send two indications simultaneously 542 let pending_send1 = 543 spawn_local(conn.as_ref().send_indication( 544 VALID_HANDLE, 545 AttAttributeDataChild::RawData([1, 2, 3].into()), 546 )); 547 let pending_send2 = spawn_local(conn.as_ref().send_indication( 548 ANOTHER_VALID_HANDLE, 549 AttAttributeDataChild::RawData([1, 2, 3].into()), 550 )); 551 // wait for/capture the outgoing packet 552 let sent1 = rx.recv().await.unwrap(); 553 // send response for the first one 554 conn.as_ref().handle_packet( 555 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(), 556 ); 557 // wait for/capture the outgoing packet 558 let sent2 = rx.recv().await.unwrap(); 559 // and now the second 560 conn.as_ref().handle_packet( 561 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(), 562 ); 563 564 // assert: both futures have completed successfully 565 assert!(matches!(pending_send1.await.unwrap(), Ok(()))); 566 assert!(matches!(pending_send2.await.unwrap(), Ok(()))); 567 // and both indications have been sent 568 assert_eq!(sent1.opcode, AttOpcode::HANDLE_VALUE_INDICATION); 569 assert_eq!(sent2.opcode, AttOpcode::HANDLE_VALUE_INDICATION); 570 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); 571 }); 572 } 573 574 #[test] test_indication_connection_drop()575 fn test_indication_connection_drop() { 576 block_on_locally(async { 577 // arrange: a pending indication 578 let (conn, mut rx) = open_connection(); 579 let pending_send = 580 spawn_local(conn.as_ref().send_indication( 581 VALID_HANDLE, 582 AttAttributeDataChild::RawData([1, 2, 3].into()), 583 )); 584 585 // act: drop the connection after the indication is sent 586 rx.recv().await.unwrap(); 587 drop(conn); 588 589 // assert: the pending indication fails with the appropriate error 590 assert!(matches!( 591 pending_send.await.unwrap(), 592 Err(IndicationError::ConnectionDroppedWhileWaitingForConfirmation) 593 )); 594 }); 595 } 596 597 #[test] test_single_indication_pending_mtu()598 fn test_single_indication_pending_mtu() { 599 block_on_locally(async { 600 // arrange: pending MTU negotiation 601 let (conn, mut rx) = open_connection(); 602 conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap(); 603 604 // act: try to send an indication with a large payload size 605 let _ = 606 try_await(conn.as_ref().send_indication( 607 VALID_HANDLE, 608 AttAttributeDataChild::RawData((1..50).collect()), 609 )) 610 .await; 611 // then resolve the MTU negotiation with a large MTU 612 conn.as_ref().handle_mtu_event(MtuEvent::IncomingResponse(100)).unwrap(); 613 614 // assert: the indication was sent 615 assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::HANDLE_VALUE_INDICATION); 616 }); 617 } 618 619 #[test] test_single_indication_pending_mtu_fail()620 fn test_single_indication_pending_mtu_fail() { 621 block_on_locally(async { 622 // arrange: pending MTU negotiation 623 let (conn, _) = open_connection(); 624 conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap(); 625 626 // act: try to send an indication with a large payload size 627 let pending_mtu = 628 try_await(conn.as_ref().send_indication( 629 VALID_HANDLE, 630 AttAttributeDataChild::RawData((1..50).collect()), 631 )) 632 .await 633 .unwrap_err(); 634 // then resolve the MTU negotiation with a small MTU 635 conn.as_ref().handle_mtu_event(MtuEvent::IncomingResponse(32)).unwrap(); 636 637 // assert: the indication failed to send 638 assert!(matches!(pending_mtu.await, Err(IndicationError::DataExceedsMtu { .. }))); 639 }); 640 } 641 642 #[test] test_server_transaction_pending_mtu()643 fn test_server_transaction_pending_mtu() { 644 block_on_locally(async { 645 // arrange: pending MTU negotiation 646 let (conn, mut rx) = open_connection(); 647 conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap(); 648 649 // act: send server packet 650 conn.as_ref().handle_packet( 651 build_att_view_or_crash(AttReadRequestBuilder { 652 attribute_handle: VALID_HANDLE.into(), 653 }) 654 .view(), 655 ); 656 657 // assert: that we reply even while the MTU req is outstanding 658 assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::READ_RESPONSE); 659 }); 660 } 661 662 #[test] test_queued_indication_pending_mtu_uses_mtu_on_dequeue()663 fn test_queued_indication_pending_mtu_uses_mtu_on_dequeue() { 664 block_on_locally(async { 665 // arrange: an outstanding indication 666 let (conn, mut rx) = open_connection(); 667 let _ = 668 try_await(conn.as_ref().send_indication( 669 VALID_HANDLE, 670 AttAttributeDataChild::RawData([1, 2, 3].into()), 671 )) 672 .await; 673 rx.recv().await.unwrap(); // flush rx_queue 674 675 // act: enqueue an indication with a large payload 676 let _ = 677 try_await(conn.as_ref().send_indication( 678 VALID_HANDLE, 679 AttAttributeDataChild::RawData((1..50).collect()), 680 )) 681 .await; 682 // then perform MTU negotiation to upgrade to a large MTU 683 conn.as_ref().handle_mtu_event(MtuEvent::OutgoingRequest).unwrap(); 684 conn.as_ref().handle_mtu_event(MtuEvent::IncomingResponse(512)).unwrap(); 685 // finally resolve the first indication, so the second indication can be sent 686 conn.as_ref().handle_packet( 687 build_att_view_or_crash(AttHandleValueConfirmationBuilder {}).view(), 688 ); 689 690 // assert: the second indication successfully sent (so it used the new MTU) 691 assert_eq!(rx.recv().await.unwrap().opcode, AttOpcode::HANDLE_VALUE_INDICATION); 692 }); 693 } 694 } 695