1# Copyright 2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import asyncio 16import avatar 17import enum 18import grpc 19import logging 20import numpy as np 21 22from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices, asynchronous 23from bumble import pandora as bumble_server 24from bumble.gatt import GATT_ASHA_SERVICE 25from bumble.pairing import PairingDelegate 26from bumble_experimental.asha import AshaGattService, AshaService 27from mobly import base_test, signals, test_runner 28from mobly.asserts import assert_equal # type: ignore 29from mobly.asserts import assert_false # type: ignore 30from mobly.asserts import assert_in # type: ignore 31from mobly.asserts import assert_is_not_none # type: ignore 32from mobly.asserts import assert_not_equal # type: ignore 33from mobly.asserts import assert_true # type: ignore 34from pandora._utils import AioStream 35from pandora.host_pb2 import PUBLIC, RANDOM, AdvertiseResponse, Connection, DataTypes, OwnAddressType, ScanningResponse 36from pandora.security_pb2 import LE_LEVEL3 37from pandora_experimental.asha_grpc_aio import Asha as AioAsha, add_AshaServicer_to_server 38from pandora_experimental.asha_pb2 import PlaybackAudioRequest 39from typing import AsyncIterator, ByteString, List, Optional, Tuple 40 41ASHA_UUID = GATT_ASHA_SERVICE.to_hex_str('-') 42HISYCNID: List[int] = [0x01, 0x02, 0x03, 0x04, 0x5, 0x6, 0x7, 0x8] 43COMPLETE_LOCAL_NAME: str = "Bumble" 44AUDIO_SIGNAL_AMPLITUDE = 0.8 45AUDIO_SIGNAL_SAMPLING_RATE = 44100 46SINE_FREQUENCY = 440 47SINE_DURATION = 0.1 48 49 50class Ear(enum.IntEnum): 51 """Reference devices type""" 52 53 LEFT = 0 54 RIGHT = 1 55 56 57class AshaTest(base_test.BaseTestClass): # type: ignore[misc] 58 devices: Optional[PandoraDevices] = None 59 60 # pandora devices. 61 dut: PandoraDevice 62 ref_left: BumblePandoraDevice 63 ref_right: BumblePandoraDevice 64 65 def setup_class(self) -> None: 66 # Register experimental bumble servicers hook. 67 bumble_server.register_servicer_hook( 68 lambda bumble, _, server: add_AshaServicer_to_server(AshaService(bumble.device), server) 69 ) 70 71 self.devices = PandoraDevices(self) 72 self.dut, ref_left, ref_right, *_ = self.devices 73 74 if isinstance(self.dut, BumblePandoraDevice): 75 raise signals.TestAbortClass('DUT Bumble does not support Asha source') 76 if not isinstance(ref_left, BumblePandoraDevice): 77 raise signals.TestAbortClass('Test require Bumble as reference device(s)') 78 if not isinstance(ref_right, BumblePandoraDevice): 79 raise signals.TestAbortClass('Test require Bumble as reference device(s)') 80 81 self.ref_left, self.ref_right = ref_left, ref_right 82 83 def teardown_class(self) -> None: 84 if self.devices: 85 self.devices.stop_all() 86 87 @avatar.asynchronous 88 async def setup_test(self) -> None: 89 await asyncio.gather(self.dut.reset(), self.ref_left.reset(), self.ref_right.reset()) 90 91 # ASHA hearing aid's IO capability is NO_OUTPUT_NO_INPUT 92 self.ref_left.server_config.io_capability = PairingDelegate.NO_OUTPUT_NO_INPUT 93 self.ref_right.server_config.io_capability = PairingDelegate.NO_OUTPUT_NO_INPUT 94 95 async def ref_advertise_asha( 96 self, ref_device: PandoraDevice, ref_address_type: OwnAddressType, ear: Ear 97 ) -> AioStream[AdvertiseResponse]: 98 """ 99 Ref device starts to advertise with service data in advertisement data. 100 :return: Ref device's advertise stream 101 """ 102 # Ref starts advertising with ASHA service data 103 asha = AioAsha(ref_device.aio.channel) 104 await asha.Register(capability=ear, hisyncid=HISYCNID) 105 return ref_device.aio.host.Advertise( 106 legacy=True, 107 connectable=True, 108 own_address_type=ref_address_type, 109 data=DataTypes( 110 complete_local_name=COMPLETE_LOCAL_NAME, 111 incomplete_service_class_uuids16=[ASHA_UUID], 112 ), 113 ) 114 115 async def dut_scan_for_asha(self, dut_address_type: OwnAddressType, ear: Ear) -> ScanningResponse: 116 """ 117 DUT starts to scan for the Ref device. 118 :return: ScanningResponse for ASHA 119 """ 120 dut_scan = self.dut.aio.host.Scan(own_address_type=dut_address_type) 121 expected_advertisement_data = self.get_expected_advertisement_data(ear) 122 ref = await anext( 123 ( 124 x 125 async for x in dut_scan 126 if ( 127 ASHA_UUID in x.data.incomplete_service_class_uuids16 128 and expected_advertisement_data == (x.data.service_data_uuid16[ASHA_UUID]).hex() 129 ) 130 ) 131 ) 132 dut_scan.cancel() 133 return ref 134 135 async def dut_connect_to_ref( 136 self, advertisement: AioStream[AdvertiseResponse], ref: ScanningResponse, dut_address_type: OwnAddressType 137 ) -> Tuple[Connection, Connection]: 138 """ 139 Helper method for Dut connects to Ref 140 :return: a Tuple (DUT to REF connection, REF to DUT connection) 141 """ 142 (dut_ref_res, ref_dut_res) = await asyncio.gather( 143 self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict()), 144 anext(aiter(advertisement)), # pytype: disable=name-error 145 ) 146 assert_equal(dut_ref_res.result_variant(), 'connection') 147 dut_ref, ref_dut = dut_ref_res.connection, ref_dut_res.connection 148 assert_is_not_none(dut_ref) 149 assert dut_ref 150 advertisement.cancel() 151 return dut_ref, ref_dut 152 153 async def is_device_connected(self, device: PandoraDevice, connection: Connection, timeout: float) -> bool: 154 try: 155 await device.aio.host.WaitDisconnection(connection=connection, timeout=timeout) 156 return False 157 except grpc.RpcError as e: 158 assert_equal(e.code(), grpc.StatusCode.DEADLINE_EXCEEDED) # type: ignore 159 return True 160 161 def get_expected_advertisement_data(self, ear: Ear) -> str: 162 protocol_version = 0x01 163 truncated_hisyncid = HISYCNID[:4] 164 return ( 165 "{:02x}".format(protocol_version) 166 + "{:02x}".format(ear) 167 + "".join([("{:02x}".format(x)) for x in truncated_hisyncid]) 168 ) 169 170 def get_le_psm_future(self, ref_device: BumblePandoraDevice) -> asyncio.Future[int]: 171 asha_service = next((x for x in ref_device.device.gatt_server.attributes if isinstance(x, AshaGattService))) 172 le_psm_future = asyncio.get_running_loop().create_future() 173 174 def le_psm_handler(connection: Connection, data: int) -> None: 175 le_psm_future.set_result(data) 176 177 asha_service.on('le_psm_out', le_psm_handler) 178 return le_psm_future 179 180 def get_read_only_properties_future(self, ref_device: BumblePandoraDevice) -> asyncio.Future[bytes]: 181 asha_service = next((x for x in ref_device.device.gatt_server.attributes if isinstance(x, AshaGattService))) 182 read_only_properties_future = asyncio.get_running_loop().create_future() 183 184 def read_only_properties_handler(connection: Connection, data: bytes) -> None: 185 read_only_properties_future.set_result(data) 186 187 asha_service.on('read_only_properties', read_only_properties_handler) 188 return read_only_properties_future 189 190 def get_start_future(self, ref_device: BumblePandoraDevice) -> asyncio.Future[dict[str, int]]: 191 asha_service = next((x for x in ref_device.device.gatt_server.attributes if isinstance(x, AshaGattService))) 192 start_future = asyncio.get_running_loop().create_future() 193 194 def start_command_handler(connection: Connection, data: dict[str, int]) -> None: 195 start_future.set_result(data) 196 197 asha_service.on('start', start_command_handler) 198 return start_future 199 200 def get_stop_future(self, ref_device: BumblePandoraDevice) -> asyncio.Future[Connection]: 201 asha_service = next((x for x in ref_device.device.gatt_server.attributes if isinstance(x, AshaGattService))) 202 stop_future = asyncio.get_running_loop().create_future() 203 204 def stop_command_handler(connection: Connection) -> None: 205 stop_future.set_result(connection) 206 207 asha_service.on('stop', stop_command_handler) 208 return stop_future 209 210 async def get_audio_data(self, ref_asha: AioAsha, connection: Connection, timeout: int) -> ByteString: 211 audio_data = bytearray() 212 try: 213 captured_data = ref_asha.CaptureAudio(connection=connection, timeout=timeout) 214 async for data in captured_data: 215 audio_data.extend(data.data) 216 217 except grpc.aio.AioRpcError as e: 218 if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED: 219 pass 220 else: 221 raise 222 223 return audio_data 224 225 async def generate_sine(self, connection: Connection) -> AsyncIterator[PlaybackAudioRequest]: 226 # generate sine wave audio 227 sine = AUDIO_SIGNAL_AMPLITUDE * np.sin( 228 2 229 * np.pi 230 * np.arange(AUDIO_SIGNAL_SAMPLING_RATE * SINE_DURATION) 231 * (SINE_FREQUENCY / AUDIO_SIGNAL_SAMPLING_RATE) 232 ) 233 s16le = (sine * 32767).astype('<i2') 234 235 # Interleaved audio. 236 stereo = np.zeros(s16le.size * 2, dtype=sine.dtype) 237 stereo[0::2] = s16le 238 239 # Send 4 second of audio. 240 for _ in range(0, int(4 / SINE_DURATION)): 241 yield PlaybackAudioRequest(connection=connection, data=stereo.tobytes()) 242 243 @avatar.parameterized( 244 (RANDOM, Ear.LEFT), 245 (RANDOM, Ear.RIGHT), 246 ) # type: ignore[misc] 247 @asynchronous 248 async def test_advertising_advertisement_data( 249 self, 250 ref_address_type: OwnAddressType, 251 ear: Ear, 252 ) -> None: 253 """ 254 Ref starts ASHA advertisements with service data in advertisement data. 255 DUT starts a service discovery. 256 Verify Ref is correctly discovered by DUT as a hearing aid device. 257 """ 258 advertisement = await self.ref_advertise_asha(self.ref_left, ref_address_type, ear) 259 260 # DUT starts a service discovery 261 scan_result = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear) 262 advertisement.cancel() 263 264 # Verify Ref is correctly discovered by DUT as a hearing aid device 265 assert_in(ASHA_UUID, scan_result.data.service_data_uuid16) 266 assert_equal(type(scan_result.data.complete_local_name), str) 267 expected_advertisement_data = self.get_expected_advertisement_data(ear) 268 assert_equal( 269 expected_advertisement_data, 270 (scan_result.data.service_data_uuid16[ASHA_UUID]).hex(), 271 ) 272 273 @asynchronous 274 async def test_advertising_scan_response(self) -> None: 275 """ 276 Ref starts ASHA advertisements with service data in scan response data. 277 DUT starts a service discovery. 278 Verify Ref is correctly discovered by DUT as a hearing aid device. 279 """ 280 asha = AioAsha(self.ref_left.aio.channel) 281 await asha.Register(capability=Ear.LEFT, hisyncid=HISYCNID) 282 283 # advertise with ASHA service data in scan response 284 advertisement = self.ref_left.aio.host.Advertise( 285 legacy=True, 286 scan_response_data=DataTypes( 287 complete_local_name=COMPLETE_LOCAL_NAME, 288 complete_service_class_uuids16=[ASHA_UUID], 289 ), 290 ) 291 292 scan_result = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=Ear.LEFT) 293 advertisement.cancel() 294 295 # Verify Ref is correctly discovered by DUT as a hearing aid device. 296 assert_in(ASHA_UUID, scan_result.data.service_data_uuid16) 297 expected_advertisement_data = self.get_expected_advertisement_data(Ear.LEFT) 298 assert_equal( 299 expected_advertisement_data, 300 (scan_result.data.service_data_uuid16[ASHA_UUID]).hex(), 301 ) 302 303 @avatar.parameterized( 304 (RANDOM, PUBLIC), 305 (RANDOM, RANDOM), 306 ) # type: ignore[misc] 307 @asynchronous 308 async def test_pairing( 309 self, 310 dut_address_type: OwnAddressType, 311 ref_address_type: OwnAddressType, 312 ) -> None: 313 """ 314 DUT discovers Ref. 315 DUT initiates connection to Ref. 316 Verify that DUT and Ref are bonded and connected. 317 """ 318 advertisement = await self.ref_advertise_asha( 319 ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT 320 ) 321 322 ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT) 323 324 # DUT initiates connection to Ref. 325 dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) 326 327 # DUT starts pairing with the Ref. 328 (secure, wait_security) = await asyncio.gather( 329 self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3), 330 self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3), 331 ) 332 333 assert_equal(secure.result_variant(), 'success') 334 assert_equal(wait_security.result_variant(), 'success') 335 336 @avatar.parameterized( 337 (RANDOM, PUBLIC), 338 (RANDOM, RANDOM), 339 ) # type: ignore[misc] 340 @asynchronous 341 async def test_pairing_dual_device( 342 self, 343 dut_address_type: OwnAddressType, 344 ref_address_type: OwnAddressType, 345 ) -> None: 346 """ 347 DUT discovers Ref. 348 DUT initiates connection to Ref. 349 Verify that DUT and Ref are bonded and connected. 350 """ 351 352 async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]: 353 advertisement = await self.ref_advertise_asha( 354 ref_device=ref_device, ref_address_type=ref_address_type, ear=ear 355 ) 356 ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=ear) 357 # DUT initiates connection to ref_device. 358 dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) 359 advertisement.cancel() 360 361 return dut_ref, ref_dut 362 363 ((dut_ref_left, ref_left_dut), (dut_ref_right, ref_right_dut)) = await asyncio.gather( 364 ref_device_connect(self.ref_left, Ear.LEFT), ref_device_connect(self.ref_right, Ear.RIGHT) 365 ) 366 367 # DUT starts pairing with the ref_left 368 (secure_left, wait_security_left) = await asyncio.gather( 369 self.dut.aio.security.Secure(connection=dut_ref_left, le=LE_LEVEL3), 370 self.ref_left.aio.security.WaitSecurity(connection=ref_left_dut, le=LE_LEVEL3), 371 ) 372 373 assert_equal(secure_left.result_variant(), 'success') 374 assert_equal(wait_security_left.result_variant(), 'success') 375 376 # DUT starts pairing with the ref_right 377 (secure_right, wait_security_right) = await asyncio.gather( 378 self.dut.aio.security.Secure(connection=dut_ref_right, le=LE_LEVEL3), 379 self.ref_right.aio.security.WaitSecurity(connection=ref_right_dut, le=LE_LEVEL3), 380 ) 381 382 assert_equal(secure_right.result_variant(), 'success') 383 assert_equal(wait_security_right.result_variant(), 'success') 384 385 await asyncio.gather( 386 self.ref_left.aio.host.Disconnect(connection=ref_left_dut), 387 self.dut.aio.host.WaitDisconnection(connection=dut_ref_left), 388 ) 389 await asyncio.gather( 390 self.ref_right.aio.host.Disconnect(connection=ref_right_dut), 391 self.dut.aio.host.WaitDisconnection(connection=dut_ref_right), 392 ) 393 394 @avatar.parameterized( 395 (RANDOM, PUBLIC), 396 (RANDOM, RANDOM), 397 ) # type: ignore[misc] 398 @asynchronous 399 async def test_unbonding( 400 self, 401 dut_address_type: OwnAddressType, 402 ref_address_type: OwnAddressType, 403 ) -> None: 404 """ 405 DUT removes bond with Ref. 406 Verify that DUT and Ref are disconnected and unbonded. 407 """ 408 raise signals.TestSkip("TODO: update rootcanal to retry") 409 410 advertisement = await self.ref_advertise_asha( 411 ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT 412 ) 413 ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT) 414 415 dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) 416 417 secure = self.dut.security.Secure(connection=dut_ref, le=LE_LEVEL3) 418 419 assert_equal(secure.WhichOneof("result"), "success") 420 await self.dut.aio.host.Disconnect(dut_ref) 421 await self.ref_left.aio.host.WaitDisconnection(ref_dut) 422 423 # delete the bond 424 await self.dut.aio.security_storage.DeleteBond(random=self.ref_left.random_address) 425 426 # DUT connect to REF again 427 dut_ref = ( 428 await self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict()) 429 ).connection 430 # TODO very likely there is a bug in android here 431 logging.debug("result should come out") 432 433 advertisement.cancel() 434 assert_is_not_none(dut_ref) 435 436 secure = await self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3) 437 438 assert_equal(secure.WhichOneof("result"), "success") 439 440 @avatar.parameterized( 441 (RANDOM, RANDOM), 442 (RANDOM, PUBLIC), 443 ) # type: ignore[misc] 444 @asynchronous 445 async def test_connection(self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType) -> None: 446 """ 447 DUT discovers Ref. 448 DUT initiates connection to Ref. 449 Verify that DUT and Ref are connected. 450 """ 451 advertisement = await self.ref_advertise_asha( 452 ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT 453 ) 454 ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT) 455 456 _, _ = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) 457 458 @avatar.parameterized( 459 (RANDOM, RANDOM), 460 (RANDOM, PUBLIC), 461 ) # type: ignore[misc] 462 @asynchronous 463 async def test_disconnect_initiator( 464 self, 465 dut_address_type: OwnAddressType, 466 ref_address_type: OwnAddressType, 467 ) -> None: 468 """ 469 DUT initiates disconnection to Ref. 470 Verify that DUT and Ref are disconnected. 471 """ 472 advertisement = await self.ref_advertise_asha( 473 ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT 474 ) 475 ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT) 476 477 dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) 478 advertisement.cancel() 479 480 await self.dut.aio.host.Disconnect(connection=dut_ref) 481 assert_false(await self.is_device_connected(self.ref_left, ref_dut, 5), "Should be disconnected") 482 483 @avatar.parameterized( 484 (RANDOM, RANDOM), 485 (RANDOM, PUBLIC), 486 ) # type: ignore[misc] 487 @asynchronous 488 async def test_disconnect_initiator_dual_device( 489 self, 490 dut_address_type: OwnAddressType, 491 ref_address_type: OwnAddressType, 492 ) -> None: 493 """ 494 DUT initiates disconnection to Ref. 495 Verify that DUT and Ref are disconnected. 496 """ 497 498 async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]: 499 advertisement = await self.ref_advertise_asha( 500 ref_device=ref_device, ref_address_type=ref_address_type, ear=ear 501 ) 502 ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=ear) 503 # DUT initiates connection to ref_device. 504 dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) 505 advertisement.cancel() 506 507 return dut_ref, ref_dut 508 509 ((dut_ref_left, ref_left_dut), (dut_ref_right, ref_right_dut)) = await asyncio.gather( 510 ref_device_connect(self.ref_left, Ear.LEFT), ref_device_connect(self.ref_right, Ear.RIGHT) 511 ) 512 513 # Disconnect from DUT 514 await asyncio.gather( 515 self.dut.aio.host.Disconnect(connection=dut_ref_left), 516 self.dut.aio.host.Disconnect(connection=dut_ref_right), 517 ) 518 519 # Verify the Refs are disconnected 520 assert_false(await self.is_device_connected(self.ref_left, ref_left_dut, 5), "Should be disconnected") 521 assert_false(await self.is_device_connected(self.ref_right, ref_right_dut, 5), "Should be disconnected") 522 523 @avatar.parameterized( 524 (RANDOM, RANDOM), 525 (RANDOM, PUBLIC), 526 ) # type: ignore[misc] 527 @asynchronous 528 async def test_disconnect_acceptor( 529 self, 530 dut_address_type: OwnAddressType, 531 ref_address_type: OwnAddressType, 532 ) -> None: 533 """ 534 Ref initiates disconnection to DUT (typically when put back in its box). 535 Verify that Ref is disconnected. 536 """ 537 advertisement = await self.ref_advertise_asha( 538 ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT 539 ) 540 ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT) 541 542 dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) 543 advertisement.cancel() 544 545 await self.ref_left.aio.host.Disconnect(connection=ref_dut) 546 assert_false(await self.is_device_connected(self.dut, dut_ref, 5), "Should be disconnected") 547 548 @avatar.parameterized( 549 (RANDOM, RANDOM, 0), 550 (RANDOM, RANDOM, 0.5), 551 (RANDOM, RANDOM, 1), 552 (RANDOM, RANDOM, 5), 553 ) # type: ignore[misc] 554 @asynchronous 555 async def test_reconnection( 556 self, 557 dut_address_type: OwnAddressType, 558 ref_address_type: OwnAddressType, 559 reconnection_gap: float, 560 ) -> None: 561 """ 562 DUT initiates disconnection to the Ref. 563 Verify that DUT and Ref are disconnected. 564 DUT reconnects to Ref after various certain time. 565 Verify that DUT and Ref are connected. 566 """ 567 568 async def connect_and_disconnect() -> None: 569 advertisement = await self.ref_advertise_asha( 570 ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT 571 ) 572 ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT) 573 dut_ref, _ = await self.dut_connect_to_ref(advertisement, ref, dut_address_type) 574 await self.dut.aio.host.Disconnect(connection=dut_ref) 575 576 await connect_and_disconnect() 577 # simulating reconnect interval 578 await asyncio.sleep(reconnection_gap) 579 await connect_and_disconnect() 580 581 @avatar.parameterized( 582 (RANDOM, RANDOM), 583 (RANDOM, PUBLIC), 584 ) # type: ignore[misc] 585 @asynchronous 586 async def test_auto_connection( 587 self, 588 dut_address_type: OwnAddressType, 589 ref_address_type: OwnAddressType, 590 ) -> None: 591 """ 592 Ref initiates disconnection to DUT. 593 Ref starts sending ASHA advertisements. 594 Verify that DUT auto-connects to Ref. 595 """ 596 advertisement = await self.ref_advertise_asha( 597 ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT 598 ) 599 ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT) 600 601 # manually connect and not cancel advertisement 602 dut_ref_res, ref_dut_res = await asyncio.gather( 603 self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict()), 604 anext(aiter(advertisement)), # pytype: disable=name-error 605 ) 606 assert_equal(dut_ref_res.result_variant(), 'connection') 607 dut_ref, ref_dut = dut_ref_res.connection, ref_dut_res.connection 608 assert_is_not_none(dut_ref) 609 assert dut_ref 610 611 # Pairing 612 (secure, wait_security) = await asyncio.gather( 613 self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3), 614 self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3), 615 ) 616 assert_equal(secure.result_variant(), 'success') 617 assert_equal(wait_security.result_variant(), 'success') 618 619 await self.ref_left.aio.host.Disconnect(connection=ref_dut) 620 621 ref_dut = (await anext(aiter(advertisement))).connection 622 advertisement.cancel() 623 624 @avatar.parameterized( 625 (RANDOM, RANDOM, Ear.LEFT), 626 (RANDOM, PUBLIC, Ear.RIGHT), 627 ) # type: ignore[misc] 628 @asynchronous 629 async def test_disconnect_acceptor_dual_device( 630 self, 631 dut_address_type: OwnAddressType, 632 ref_address_type: OwnAddressType, 633 disconnect_device: Ear, 634 ) -> None: 635 """ 636 Prerequisites: DUT and Ref are connected and bonded. 637 Description: 638 1. One peripheral of Ref initiates disconnection to DUT. 639 2. Verify that it is disconnected and that the other peripheral is still connected. 640 """ 641 642 advertisement_left = await self.ref_advertise_asha( 643 ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT 644 ) 645 ref_left = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT) 646 _, ref_left_dut = await self.dut_connect_to_ref( 647 advertisement=advertisement_left, ref=ref_left, dut_address_type=dut_address_type 648 ) 649 advertisement_left.cancel() 650 651 advertisement_right = await self.ref_advertise_asha( 652 ref_device=self.ref_right, ref_address_type=ref_address_type, ear=Ear.RIGHT 653 ) 654 ref_right = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.RIGHT) 655 _, ref_right_dut = await self.dut_connect_to_ref( 656 advertisement=advertisement_right, ref=ref_right, dut_address_type=dut_address_type 657 ) 658 advertisement_right.cancel() 659 660 if disconnect_device == Ear.LEFT: 661 await self.ref_left.aio.host.Disconnect(connection=ref_left_dut) 662 assert_true(await self.is_device_connected(self.ref_right, ref_right_dut, 5), "Should be disconnected") 663 assert_false(await self.is_device_connected(self.ref_left, ref_left_dut, 5), "Should be disconnected") 664 else: 665 await self.ref_right.aio.host.Disconnect(connection=ref_right_dut) 666 assert_false(await self.is_device_connected(self.ref_right, ref_right_dut, 5), "Should be disconnected") 667 assert_true(await self.is_device_connected(self.ref_left, ref_left_dut, 5), "Should be disconnected") 668 669 @avatar.parameterized( 670 (RANDOM, RANDOM, Ear.LEFT), 671 (RANDOM, RANDOM, Ear.RIGHT), 672 (RANDOM, PUBLIC, Ear.LEFT), 673 (RANDOM, PUBLIC, Ear.RIGHT), 674 ) # type: ignore[misc] 675 @asynchronous 676 async def test_auto_connection_dual_device( 677 self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType, tested_device: Ear 678 ) -> None: 679 """ 680 Prerequisites: DUT and Ref are connected and bonded. Ref is a dual device. 681 Description: 682 1. One peripheral of Ref initiates disconnection to DUT. 683 2. The disconnected peripheral starts sending ASHA advertisements. 684 3. Verify that DUT auto-connects to the peripheral. 685 """ 686 687 advertisement_left = await self.ref_advertise_asha( 688 ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT 689 ) 690 ref_left = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT) 691 (dut_ref_left_res, ref_left_dut_res) = await asyncio.gather( 692 self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref_left.address_asdict()), 693 anext(aiter(advertisement_left)), # pytype: disable=name-error 694 ) 695 assert_equal(dut_ref_left_res.result_variant(), 'connection') 696 dut_ref_left, ref_left_dut = dut_ref_left_res.connection, ref_left_dut_res.connection 697 assert_is_not_none(dut_ref_left) 698 assert dut_ref_left 699 advertisement_left.cancel() 700 701 advertisement_right = await self.ref_advertise_asha( 702 ref_device=self.ref_right, ref_address_type=ref_address_type, ear=Ear.RIGHT 703 ) 704 ref_right = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.RIGHT) 705 (dut_ref_right_res, ref_right_dut_res) = await asyncio.gather( 706 self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref_right.address_asdict()), 707 anext(aiter(advertisement_right)), # pytype: disable=name-error 708 ) 709 assert_equal(dut_ref_right_res.result_variant(), 'connection') 710 dut_ref_right, ref_right_dut = dut_ref_right_res.connection, ref_right_dut_res.connection 711 assert_is_not_none(dut_ref_right) 712 assert dut_ref_right 713 advertisement_right.cancel() 714 715 # Pairing 716 (secure_left, wait_security_left) = await asyncio.gather( 717 self.dut.aio.security.Secure(connection=dut_ref_left, le=LE_LEVEL3), 718 self.ref_left.aio.security.WaitSecurity(connection=ref_left_dut, le=LE_LEVEL3), 719 ) 720 assert_equal(secure_left.result_variant(), 'success') 721 assert_equal(wait_security_left.result_variant(), 'success') 722 723 (secure_right, wait_security_right) = await asyncio.gather( 724 self.dut.aio.security.Secure(connection=dut_ref_right, le=LE_LEVEL3), 725 self.ref_right.aio.security.WaitSecurity(connection=ref_right_dut, le=LE_LEVEL3), 726 ) 727 assert_equal(secure_right.result_variant(), 'success') 728 assert_equal(wait_security_right.result_variant(), 'success') 729 730 if tested_device == Ear.LEFT: 731 await asyncio.gather( 732 self.ref_left.aio.host.Disconnect(connection=ref_left_dut), 733 self.dut.aio.host.WaitDisconnection(connection=dut_ref_left), 734 ) 735 assert_false(await self.is_device_connected(self.ref_left, ref_left_dut, 5), "Should be disconnected") 736 737 advertisement_left = await self.ref_advertise_asha( 738 ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT 739 ) 740 ref_left_dut = (await anext(aiter(advertisement_left))).connection 741 advertisement_left.cancel() 742 else: 743 await asyncio.gather( 744 self.ref_right.aio.host.Disconnect(connection=ref_right_dut), 745 self.dut.aio.host.WaitDisconnection(connection=dut_ref_right), 746 ) 747 assert_false(await self.is_device_connected(self.ref_right, ref_right_dut, 5), "Should be disconnected") 748 749 advertisement_right = await self.ref_advertise_asha( 750 ref_device=self.ref_right, ref_address_type=ref_address_type, ear=Ear.RIGHT 751 ) 752 ref_right_dut = (await anext(aiter(advertisement_right))).connection 753 advertisement_right.cancel() 754 755 @asynchronous 756 async def test_music_start(self) -> None: 757 """ 758 DUT discovers Ref. 759 DUT initiates connection to Ref. 760 Verify that DUT and Ref are bonded and connected. 761 DUT starts media streaming. 762 Verify that DUT sends a correct AudioControlPoint `Start` command (codec=1, 763 audiotype=0, volume=<volume set on DUT>, otherstate=<state of Ref aux if dual devices>). 764 """ 765 766 async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]: 767 advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear) 768 ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear) 769 # DUT initiates connection to ref_device. 770 dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM) 771 advertisement.cancel() 772 773 return dut_ref, ref_dut 774 775 dut_ref, ref_dut = await ref_device_connect(self.ref_left, Ear.LEFT) 776 le_psm_future = self.get_le_psm_future(self.ref_left) 777 read_only_properties_future = self.get_read_only_properties_future(self.ref_left) 778 779 # DUT starts pairing with the ref_left 780 (secure, wait_security) = await asyncio.gather( 781 self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3), 782 self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3), 783 ) 784 785 assert_equal(secure.result_variant(), 'success') 786 assert_equal(wait_security.result_variant(), 'success') 787 788 le_psm_out_result = await asyncio.wait_for(le_psm_future, timeout=3.0) 789 assert_is_not_none(le_psm_out_result) 790 791 read_only_properties_result = await asyncio.wait_for(read_only_properties_future, timeout=3.0) 792 assert_is_not_none(read_only_properties_result) 793 794 dut_asha = AioAsha(self.dut.aio.channel) 795 start_future = self.get_start_future(self.ref_left) 796 797 logging.info("send start") 798 await dut_asha.WaitPeripheral(connection=dut_ref) 799 _, start_result = await asyncio.gather( 800 dut_asha.Start(connection=dut_ref), asyncio.wait_for(start_future, timeout=3.0) 801 ) 802 803 logging.info(f"start_result:{start_result}") 804 assert_is_not_none(start_result) 805 assert_equal(start_result['codec'], 1) 806 assert_equal(start_result['audiotype'], 0) 807 assert_is_not_none(start_result['volume']) 808 assert_equal(start_result['otherstate'], 0) 809 810 @asynchronous 811 async def test_set_volume(self) -> None: 812 """ 813 DUT discovers Ref. 814 DUT initiates connection to Ref. 815 Verify that DUT and Ref are bonded and connected. 816 DUT is streaming media to Ref. 817 Change volume on DUT. 818 Verify DUT writes the correct value to ASHA `Volume` characteristic. 819 """ 820 raise signals.TestSkip("TODO: update bt test interface for SetVolume to retry") 821 822 advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=RANDOM, ear=Ear.LEFT) 823 824 ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=Ear.LEFT) 825 826 # DUT initiates connection to Ref. 827 dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM) 828 829 # DUT starts pairing with the ref_left 830 (secure, wait_security) = await asyncio.gather( 831 self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3), 832 self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3), 833 ) 834 835 assert_equal(secure.result_variant(), 'success') 836 assert_equal(wait_security.result_variant(), 'success') 837 838 asha_service = next((x for x in self.ref_left.device.gatt_server.attributes if isinstance(x, AshaGattService))) 839 dut_asha = AioAsha(self.dut.aio.channel) 840 841 volume_future = asyncio.get_running_loop().create_future() 842 843 def volume_command_handler(connection: Connection, data: int): 844 volume_future.set_result(data) 845 846 asha_service.on('volume', volume_command_handler) 847 848 await dut_asha.WaitPeripheral(connection=dut_ref) 849 await dut_asha.Start(connection=dut_ref) 850 # set volume to max volume 851 _, volume_result = await asyncio.gather(dut_asha.SetVolume(1), asyncio.wait_for(volume_future, timeout=3.0)) 852 853 logging.info(f"start_result:{volume_result}") 854 assert_is_not_none(volume_result) 855 assert_equal(volume_result, 0) 856 857 @asynchronous 858 async def test_music_stop(self) -> None: 859 """ 860 DUT discovers Ref. 861 DUT initiates connection to Ref. 862 Verify that DUT and Ref are bonded and connected. 863 DUT is streaming media to Ref. 864 DUT stops media streaming on Ref. 865 Verify that DUT sends a correct AudioControlPoint `Stop` command. 866 """ 867 868 async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]: 869 advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear) 870 ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear) 871 # DUT initiates connection to ref_device. 872 dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM) 873 advertisement.cancel() 874 875 return dut_ref, ref_dut 876 877 dut_ref, ref_dut = await ref_device_connect(self.ref_left, Ear.LEFT) 878 879 # DUT starts pairing with the ref_left 880 (secure, wait_security) = await asyncio.gather( 881 self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3), 882 self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3), 883 ) 884 885 assert_equal(secure.result_variant(), 'success') 886 assert_equal(wait_security.result_variant(), 'success') 887 888 dut_asha = AioAsha(self.dut.aio.channel) 889 890 stop_future = self.get_stop_future(self.ref_left) 891 892 await dut_asha.WaitPeripheral(connection=dut_ref) 893 await dut_asha.Start(connection=dut_ref) 894 logging.info("send stop") 895 _, stop_result = await asyncio.gather(dut_asha.Stop(), asyncio.wait_for(stop_future, timeout=10.0)) 896 897 logging.info(f"stop_result:{stop_result}") 898 assert_is_not_none(stop_result) 899 900 # Sleep 0.5 second to mitigate flaky test first. 901 await asyncio.sleep(0.5) 902 903 audio_data = await self.get_audio_data( 904 ref_asha=AioAsha(self.ref_left.aio.channel), connection=ref_dut, timeout=10 905 ) 906 assert_equal(len(audio_data), 0) 907 908 @asynchronous 909 async def test_music_restart(self) -> None: 910 """ 911 DUT discovers Ref. 912 DUT initiates connection to Ref. 913 Verify that DUT and Ref are bonded and connected. 914 DUT starts media streaming. 915 DUT stops media streaming. 916 Verify that DUT sends a correct AudioControlPoint `Stop` command. 917 DUT starts media streaming again. 918 Verify that DUT sends a correct AudioControlPoint `Start` command. 919 """ 920 921 async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]: 922 advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear) 923 ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear) 924 # DUT initiates connection to ref_device. 925 dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM) 926 advertisement.cancel() 927 928 return dut_ref, ref_dut 929 930 dut_ref, ref_dut = await ref_device_connect(self.ref_left, Ear.LEFT) 931 932 # DUT starts pairing with the ref_left 933 (secure, wait_security) = await asyncio.gather( 934 self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3), 935 self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3), 936 ) 937 938 assert_equal(secure.result_variant(), 'success') 939 assert_equal(wait_security.result_variant(), 'success') 940 941 dut_asha = AioAsha(self.dut.aio.channel) 942 943 stop_future = self.get_stop_future(self.ref_left) 944 945 await dut_asha.WaitPeripheral(connection=dut_ref) 946 await dut_asha.Start(connection=dut_ref) 947 _, stop_result = await asyncio.gather(dut_asha.Stop(), asyncio.wait_for(stop_future, timeout=10.0)) 948 949 logging.info(f"stop_result:{stop_result}") 950 assert_is_not_none(stop_result) 951 952 # restart music streaming 953 logging.info("restart music streaming") 954 955 start_future = self.get_start_future(self.ref_left) 956 957 await dut_asha.WaitPeripheral(connection=dut_ref) 958 _, start_result = await asyncio.gather( 959 dut_asha.Start(connection=dut_ref), asyncio.wait_for(start_future, timeout=3.0) 960 ) 961 962 logging.info(f"start_result:{start_result}") 963 assert_is_not_none(start_result) 964 965 @asynchronous 966 async def test_music_start_dual_device(self) -> None: 967 """ 968 DUT discovers Ref. 969 DUT initiates connection to Ref. 970 Verify that DUT and Ref are bonded and connected. 971 DUT starts media streaming. 972 Verify that DUT sends a correct AudioControlPoint `Start` command (codec=1, 973 audiotype=0, volume=<volume set on DUT>, otherstate=<state of Ref aux if dual devices>). 974 """ 975 976 async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]: 977 advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear) 978 ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear) 979 # DUT initiates connection to ref_device. 980 dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM) 981 advertisement.cancel() 982 983 return dut_ref, ref_dut 984 985 # connect ref_left 986 dut_ref_left, ref_left_dut = await ref_device_connect(self.ref_left, Ear.LEFT) 987 le_psm_future_left = self.get_le_psm_future(self.ref_left) 988 read_only_properties_future_left = self.get_read_only_properties_future(self.ref_left) 989 990 # DUT starts pairing with the ref_left 991 (secure_left, wait_security_left) = await asyncio.gather( 992 self.dut.aio.security.Secure(connection=dut_ref_left, le=LE_LEVEL3), 993 self.ref_left.aio.security.WaitSecurity(connection=ref_left_dut, le=LE_LEVEL3), 994 ) 995 996 assert_equal(secure_left.result_variant(), 'success') 997 assert_equal(wait_security_left.result_variant(), 'success') 998 999 le_psm_out_result_left = await asyncio.wait_for(le_psm_future_left, timeout=3.0) 1000 assert_is_not_none(le_psm_out_result_left) 1001 1002 read_only_properties_result_left = await asyncio.wait_for(read_only_properties_future_left, timeout=3.0) 1003 assert_is_not_none(read_only_properties_result_left) 1004 1005 dut_asha = AioAsha(self.dut.aio.channel) 1006 start_future_left = self.get_start_future(self.ref_left) 1007 1008 logging.info("send start") 1009 await dut_asha.WaitPeripheral(connection=dut_ref_left) 1010 _, start_result_left = await asyncio.gather( 1011 dut_asha.Start(connection=dut_ref_left), asyncio.wait_for(start_future_left, timeout=3.0) 1012 ) 1013 1014 logging.info(f"start_result_left:{start_result_left}") 1015 assert_is_not_none(start_result_left) 1016 assert_equal(start_result_left['codec'], 1) 1017 assert_equal(start_result_left['audiotype'], 0) 1018 assert_is_not_none(start_result_left['volume']) 1019 assert_equal(start_result_left['otherstate'], 0) 1020 1021 # Start playing audio before connecting to ref_right 1022 generated_audio = self.generate_sine(connection=dut_ref_left) 1023 dut_asha.PlaybackAudio(generated_audio) 1024 1025 # connect ref_right 1026 dut_ref_right, ref_right_dut = await ref_device_connect(self.ref_right, Ear.RIGHT) 1027 le_psm_future_right = self.get_le_psm_future(self.ref_right) 1028 read_only_properties_future_right = self.get_read_only_properties_future(self.ref_right) 1029 1030 # DUT starts pairing with the ref_right 1031 (secure_right, wait_security_right) = await asyncio.gather( 1032 self.dut.aio.security.Secure(connection=dut_ref_right, le=LE_LEVEL3), 1033 self.ref_right.aio.security.WaitSecurity(connection=ref_right_dut, le=LE_LEVEL3), 1034 ) 1035 1036 assert_equal(secure_right.result_variant(), 'success') 1037 assert_equal(wait_security_right.result_variant(), 'success') 1038 1039 le_psm_out_result_right = await asyncio.wait_for(le_psm_future_right, timeout=3.0) 1040 assert_is_not_none(le_psm_out_result_right) 1041 1042 read_only_properties_result_right = await asyncio.wait_for(read_only_properties_future_right, timeout=3.0) 1043 assert_is_not_none(read_only_properties_result_right) 1044 1045 start_future_right = self.get_start_future(self.ref_right) 1046 1047 logging.info("send start_right") 1048 await dut_asha.WaitPeripheral(connection=dut_ref_right) 1049 start_result_right = await asyncio.wait_for(start_future_right, timeout=10.0) 1050 1051 logging.info(f"start_result_right:{start_result_right}") 1052 assert_is_not_none(start_result_right) 1053 assert_equal(start_result_right['codec'], 1) 1054 assert_equal(start_result_right['audiotype'], 0) 1055 assert_is_not_none(start_result_right['volume']) 1056 # ref_left already connected, otherstate = 1 1057 assert_equal(start_result_right['otherstate'], 1) 1058 1059 @asynchronous 1060 async def test_music_stop_dual_device(self) -> None: 1061 """ 1062 DUT discovers Refs. 1063 DUT initiates connection to Refs. 1064 Verify that DUT and Refs are bonded and connected. 1065 DUT is streaming media to Refs. 1066 DUT stops media streaming on Refs. 1067 Verify that DUT sends a correct AudioControlPoint `Stop` command. 1068 Verify Refs cannot recevice audio data after DUT stops media streaming. 1069 """ 1070 1071 async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]: 1072 advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear) 1073 ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear) 1074 # DUT initiates connection to ref_device. 1075 dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM) 1076 advertisement.cancel() 1077 1078 return dut_ref, ref_dut 1079 1080 # DUT starts connecting, pairing with the ref_left 1081 dut_ref_left, ref_left_dut = await ref_device_connect(self.ref_left, Ear.LEFT) 1082 (secure_left, wait_security_left) = await asyncio.gather( 1083 self.dut.aio.security.Secure(connection=dut_ref_left, le=LE_LEVEL3), 1084 self.ref_left.aio.security.WaitSecurity(connection=ref_left_dut, le=LE_LEVEL3), 1085 ) 1086 assert_equal(secure_left.result_variant(), 'success') 1087 assert_equal(wait_security_left.result_variant(), 'success') 1088 1089 # DUT starts connecting, pairing with the ref_right 1090 dut_ref_right, ref_right_dut = await ref_device_connect(self.ref_right, Ear.RIGHT) 1091 (secure_right, wait_security_right) = await asyncio.gather( 1092 self.dut.aio.security.Secure(connection=dut_ref_right, le=LE_LEVEL3), 1093 self.ref_right.aio.security.WaitSecurity(connection=ref_right_dut, le=LE_LEVEL3), 1094 ) 1095 assert_equal(secure_right.result_variant(), 'success') 1096 assert_equal(wait_security_right.result_variant(), 'success') 1097 1098 dut_asha = AioAsha(self.dut.aio.channel) 1099 ref_left_asha = AioAsha(self.ref_left.aio.channel) 1100 ref_right_asha = AioAsha(self.ref_right.aio.channel) 1101 1102 await asyncio.gather( 1103 dut_asha.WaitPeripheral(connection=dut_ref_left), dut_asha.WaitPeripheral(connection=dut_ref_right) 1104 ) 1105 await dut_asha.Start(connection=dut_ref_left) 1106 1107 # Stop audio and wait until ref_device connections stopped. 1108 stop_future_left = self.get_stop_future(self.ref_left) 1109 stop_future_right = self.get_stop_future(self.ref_right) 1110 1111 logging.info("send stop") 1112 _, stop_result_left, stop_result_right = await asyncio.gather( 1113 dut_asha.Stop(), 1114 asyncio.wait_for(stop_future_left, timeout=10.0), 1115 asyncio.wait_for(stop_future_right, timeout=10.0), 1116 ) 1117 1118 logging.info(f"stop_result_left:{stop_result_left}") 1119 logging.info(f"stop_result_right:{stop_result_right}") 1120 assert_is_not_none(stop_result_left) 1121 assert_is_not_none(stop_result_right) 1122 1123 # Sleep 0.5 second to mitigate flaky test first. 1124 await asyncio.sleep(0.5) 1125 1126 (audio_data_left, audio_data_right) = await asyncio.gather( 1127 self.get_audio_data(ref_asha=ref_left_asha, connection=ref_left_dut, timeout=10), 1128 self.get_audio_data(ref_asha=ref_right_asha, connection=ref_right_dut, timeout=10), 1129 ) 1130 1131 assert_equal(len(audio_data_left), 0) 1132 assert_equal(len(audio_data_right), 0) 1133 1134 @asynchronous 1135 async def test_music_audio_playback(self) -> None: 1136 """ 1137 DUT discovers Ref. 1138 DUT initiates connection to Ref. 1139 Verify that DUT and Ref are bonded and connected. 1140 DUT is streaming media to Ref using playback API. 1141 Verify that Ref has received audio data. 1142 """ 1143 1144 async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]: 1145 advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear) 1146 ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear) 1147 # DUT initiates connection to ref_device. 1148 dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM) 1149 advertisement.cancel() 1150 1151 return dut_ref, ref_dut 1152 1153 dut_ref_left, ref_left_dut = await ref_device_connect(self.ref_left, Ear.LEFT) 1154 1155 # DUT starts pairing with the ref_left 1156 (secure_left, wait_security_left) = await asyncio.gather( 1157 self.dut.aio.security.Secure(connection=dut_ref_left, le=LE_LEVEL3), 1158 self.ref_left.aio.security.WaitSecurity(connection=ref_left_dut, le=LE_LEVEL3), 1159 ) 1160 1161 assert_equal(secure_left.result_variant(), 'success') 1162 assert_equal(wait_security_left.result_variant(), 'success') 1163 1164 dut_asha = AioAsha(self.dut.aio.channel) 1165 ref_asha = AioAsha(self.ref_left.aio.channel) 1166 1167 await dut_asha.WaitPeripheral(connection=dut_ref_left) 1168 await dut_asha.Start(connection=dut_ref_left) 1169 1170 # Clear audio data before start audio playback testing 1171 await self.get_audio_data(ref_asha=ref_asha, connection=ref_left_dut, timeout=10) 1172 1173 generated_audio = self.generate_sine(connection=dut_ref_left) 1174 1175 _, audio_data = await asyncio.gather( 1176 dut_asha.PlaybackAudio(generated_audio), 1177 self.get_audio_data(ref_asha=ref_asha, connection=ref_left_dut, timeout=10), 1178 ) 1179 1180 assert_not_equal(len(audio_data), 0) 1181 # TODO(duoho): decode audio_data and verify the content 1182 1183 1184if __name__ == "__main__": 1185 logging.basicConfig(level=logging.DEBUG) 1186 test_runner.main() # type: ignore 1187