1# Copyright 2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import asyncio
16import avatar
17import enum
18import grpc
19import logging
20import numpy as np
21
22from avatar import BumblePandoraDevice, PandoraDevice, PandoraDevices, asynchronous
23from bumble import pandora as bumble_server
24from bumble.gatt import GATT_ASHA_SERVICE
25from bumble.pairing import PairingDelegate
26from bumble_experimental.asha import AshaGattService, AshaService
27from mobly import base_test, signals, test_runner
28from mobly.asserts import assert_equal  # type: ignore
29from mobly.asserts import assert_false  # type: ignore
30from mobly.asserts import assert_in  # type: ignore
31from mobly.asserts import assert_is_not_none  # type: ignore
32from mobly.asserts import assert_not_equal  # type: ignore
33from mobly.asserts import assert_true  # type: ignore
34from pandora._utils import AioStream
35from pandora.host_pb2 import PUBLIC, RANDOM, AdvertiseResponse, Connection, DataTypes, OwnAddressType, ScanningResponse
36from pandora.security_pb2 import LE_LEVEL3
37from pandora_experimental.asha_grpc_aio import Asha as AioAsha, add_AshaServicer_to_server
38from pandora_experimental.asha_pb2 import PlaybackAudioRequest
39from typing import AsyncIterator, ByteString, List, Optional, Tuple
40
41ASHA_UUID = GATT_ASHA_SERVICE.to_hex_str('-')
42HISYCNID: List[int] = [0x01, 0x02, 0x03, 0x04, 0x5, 0x6, 0x7, 0x8]
43COMPLETE_LOCAL_NAME: str = "Bumble"
44AUDIO_SIGNAL_AMPLITUDE = 0.8
45AUDIO_SIGNAL_SAMPLING_RATE = 44100
46SINE_FREQUENCY = 440
47SINE_DURATION = 0.1
48
49
50class Ear(enum.IntEnum):
51    """Reference devices type"""
52
53    LEFT = 0
54    RIGHT = 1
55
56
57class AshaTest(base_test.BaseTestClass):  # type: ignore[misc]
58    devices: Optional[PandoraDevices] = None
59
60    # pandora devices.
61    dut: PandoraDevice
62    ref_left: BumblePandoraDevice
63    ref_right: BumblePandoraDevice
64
65    def setup_class(self) -> None:
66        # Register experimental bumble servicers hook.
67        bumble_server.register_servicer_hook(
68            lambda bumble, _, server: add_AshaServicer_to_server(AshaService(bumble.device), server)
69        )
70
71        self.devices = PandoraDevices(self)
72        self.dut, ref_left, ref_right, *_ = self.devices
73
74        if isinstance(self.dut, BumblePandoraDevice):
75            raise signals.TestAbortClass('DUT Bumble does not support Asha source')
76        if not isinstance(ref_left, BumblePandoraDevice):
77            raise signals.TestAbortClass('Test require Bumble as reference device(s)')
78        if not isinstance(ref_right, BumblePandoraDevice):
79            raise signals.TestAbortClass('Test require Bumble as reference device(s)')
80
81        self.ref_left, self.ref_right = ref_left, ref_right
82
83    def teardown_class(self) -> None:
84        if self.devices:
85            self.devices.stop_all()
86
87    @avatar.asynchronous
88    async def setup_test(self) -> None:
89        await asyncio.gather(self.dut.reset(), self.ref_left.reset(), self.ref_right.reset())
90
91        # ASHA hearing aid's IO capability is NO_OUTPUT_NO_INPUT
92        self.ref_left.server_config.io_capability = PairingDelegate.NO_OUTPUT_NO_INPUT
93        self.ref_right.server_config.io_capability = PairingDelegate.NO_OUTPUT_NO_INPUT
94
95    async def ref_advertise_asha(
96        self, ref_device: PandoraDevice, ref_address_type: OwnAddressType, ear: Ear
97    ) -> AioStream[AdvertiseResponse]:
98        """
99        Ref device starts to advertise with service data in advertisement data.
100        :return: Ref device's advertise stream
101        """
102        # Ref starts advertising with ASHA service data
103        asha = AioAsha(ref_device.aio.channel)
104        await asha.Register(capability=ear, hisyncid=HISYCNID)
105        return ref_device.aio.host.Advertise(
106            legacy=True,
107            connectable=True,
108            own_address_type=ref_address_type,
109            data=DataTypes(
110                complete_local_name=COMPLETE_LOCAL_NAME,
111                incomplete_service_class_uuids16=[ASHA_UUID],
112            ),
113        )
114
115    async def dut_scan_for_asha(self, dut_address_type: OwnAddressType, ear: Ear) -> ScanningResponse:
116        """
117        DUT starts to scan for the Ref device.
118        :return: ScanningResponse for ASHA
119        """
120        dut_scan = self.dut.aio.host.Scan(own_address_type=dut_address_type)
121        expected_advertisement_data = self.get_expected_advertisement_data(ear)
122        ref = await anext(
123            (
124                x
125                async for x in dut_scan
126                if (
127                    ASHA_UUID in x.data.incomplete_service_class_uuids16
128                    and expected_advertisement_data == (x.data.service_data_uuid16[ASHA_UUID]).hex()
129                )
130            )
131        )
132        dut_scan.cancel()
133        return ref
134
135    async def dut_connect_to_ref(
136        self, advertisement: AioStream[AdvertiseResponse], ref: ScanningResponse, dut_address_type: OwnAddressType
137    ) -> Tuple[Connection, Connection]:
138        """
139        Helper method for Dut connects to Ref
140        :return: a Tuple (DUT to REF connection, REF to DUT connection)
141        """
142        (dut_ref_res, ref_dut_res) = await asyncio.gather(
143            self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict()),
144            anext(aiter(advertisement)),  # pytype: disable=name-error
145        )
146        assert_equal(dut_ref_res.result_variant(), 'connection')
147        dut_ref, ref_dut = dut_ref_res.connection, ref_dut_res.connection
148        assert_is_not_none(dut_ref)
149        assert dut_ref
150        advertisement.cancel()
151        return dut_ref, ref_dut
152
153    async def is_device_connected(self, device: PandoraDevice, connection: Connection, timeout: float) -> bool:
154        try:
155            await device.aio.host.WaitDisconnection(connection=connection, timeout=timeout)
156            return False
157        except grpc.RpcError as e:
158            assert_equal(e.code(), grpc.StatusCode.DEADLINE_EXCEEDED)  # type: ignore
159            return True
160
161    def get_expected_advertisement_data(self, ear: Ear) -> str:
162        protocol_version = 0x01
163        truncated_hisyncid = HISYCNID[:4]
164        return (
165            "{:02x}".format(protocol_version)
166            + "{:02x}".format(ear)
167            + "".join([("{:02x}".format(x)) for x in truncated_hisyncid])
168        )
169
170    def get_le_psm_future(self, ref_device: BumblePandoraDevice) -> asyncio.Future[int]:
171        asha_service = next((x for x in ref_device.device.gatt_server.attributes if isinstance(x, AshaGattService)))
172        le_psm_future = asyncio.get_running_loop().create_future()
173
174        def le_psm_handler(connection: Connection, data: int) -> None:
175            le_psm_future.set_result(data)
176
177        asha_service.on('le_psm_out', le_psm_handler)
178        return le_psm_future
179
180    def get_read_only_properties_future(self, ref_device: BumblePandoraDevice) -> asyncio.Future[bytes]:
181        asha_service = next((x for x in ref_device.device.gatt_server.attributes if isinstance(x, AshaGattService)))
182        read_only_properties_future = asyncio.get_running_loop().create_future()
183
184        def read_only_properties_handler(connection: Connection, data: bytes) -> None:
185            read_only_properties_future.set_result(data)
186
187        asha_service.on('read_only_properties', read_only_properties_handler)
188        return read_only_properties_future
189
190    def get_start_future(self, ref_device: BumblePandoraDevice) -> asyncio.Future[dict[str, int]]:
191        asha_service = next((x for x in ref_device.device.gatt_server.attributes if isinstance(x, AshaGattService)))
192        start_future = asyncio.get_running_loop().create_future()
193
194        def start_command_handler(connection: Connection, data: dict[str, int]) -> None:
195            start_future.set_result(data)
196
197        asha_service.on('start', start_command_handler)
198        return start_future
199
200    def get_stop_future(self, ref_device: BumblePandoraDevice) -> asyncio.Future[Connection]:
201        asha_service = next((x for x in ref_device.device.gatt_server.attributes if isinstance(x, AshaGattService)))
202        stop_future = asyncio.get_running_loop().create_future()
203
204        def stop_command_handler(connection: Connection) -> None:
205            stop_future.set_result(connection)
206
207        asha_service.on('stop', stop_command_handler)
208        return stop_future
209
210    async def get_audio_data(self, ref_asha: AioAsha, connection: Connection, timeout: int) -> ByteString:
211        audio_data = bytearray()
212        try:
213            captured_data = ref_asha.CaptureAudio(connection=connection, timeout=timeout)
214            async for data in captured_data:
215                audio_data.extend(data.data)
216
217        except grpc.aio.AioRpcError as e:
218            if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
219                pass
220            else:
221                raise
222
223        return audio_data
224
225    async def generate_sine(self, connection: Connection) -> AsyncIterator[PlaybackAudioRequest]:
226        # generate sine wave audio
227        sine = AUDIO_SIGNAL_AMPLITUDE * np.sin(
228            2
229            * np.pi
230            * np.arange(AUDIO_SIGNAL_SAMPLING_RATE * SINE_DURATION)
231            * (SINE_FREQUENCY / AUDIO_SIGNAL_SAMPLING_RATE)
232        )
233        s16le = (sine * 32767).astype('<i2')
234
235        # Interleaved audio.
236        stereo = np.zeros(s16le.size * 2, dtype=sine.dtype)
237        stereo[0::2] = s16le
238
239        # Send 4 second of audio.
240        for _ in range(0, int(4 / SINE_DURATION)):
241            yield PlaybackAudioRequest(connection=connection, data=stereo.tobytes())
242
243    @avatar.parameterized(
244        (RANDOM, Ear.LEFT),
245        (RANDOM, Ear.RIGHT),
246    )  # type: ignore[misc]
247    @asynchronous
248    async def test_advertising_advertisement_data(
249        self,
250        ref_address_type: OwnAddressType,
251        ear: Ear,
252    ) -> None:
253        """
254        Ref starts ASHA advertisements with service data in advertisement data.
255        DUT starts a service discovery.
256        Verify Ref is correctly discovered by DUT as a hearing aid device.
257        """
258        advertisement = await self.ref_advertise_asha(self.ref_left, ref_address_type, ear)
259
260        # DUT starts a service discovery
261        scan_result = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear)
262        advertisement.cancel()
263
264        # Verify Ref is correctly discovered by DUT as a hearing aid device
265        assert_in(ASHA_UUID, scan_result.data.service_data_uuid16)
266        assert_equal(type(scan_result.data.complete_local_name), str)
267        expected_advertisement_data = self.get_expected_advertisement_data(ear)
268        assert_equal(
269            expected_advertisement_data,
270            (scan_result.data.service_data_uuid16[ASHA_UUID]).hex(),
271        )
272
273    @asynchronous
274    async def test_advertising_scan_response(self) -> None:
275        """
276        Ref starts ASHA advertisements with service data in scan response data.
277        DUT starts a service discovery.
278        Verify Ref is correctly discovered by DUT as a hearing aid device.
279        """
280        asha = AioAsha(self.ref_left.aio.channel)
281        await asha.Register(capability=Ear.LEFT, hisyncid=HISYCNID)
282
283        # advertise with ASHA service data in scan response
284        advertisement = self.ref_left.aio.host.Advertise(
285            legacy=True,
286            scan_response_data=DataTypes(
287                complete_local_name=COMPLETE_LOCAL_NAME,
288                complete_service_class_uuids16=[ASHA_UUID],
289            ),
290        )
291
292        scan_result = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=Ear.LEFT)
293        advertisement.cancel()
294
295        # Verify Ref is correctly discovered by DUT as a hearing aid device.
296        assert_in(ASHA_UUID, scan_result.data.service_data_uuid16)
297        expected_advertisement_data = self.get_expected_advertisement_data(Ear.LEFT)
298        assert_equal(
299            expected_advertisement_data,
300            (scan_result.data.service_data_uuid16[ASHA_UUID]).hex(),
301        )
302
303    @avatar.parameterized(
304        (RANDOM, PUBLIC),
305        (RANDOM, RANDOM),
306    )  # type: ignore[misc]
307    @asynchronous
308    async def test_pairing(
309        self,
310        dut_address_type: OwnAddressType,
311        ref_address_type: OwnAddressType,
312    ) -> None:
313        """
314        DUT discovers Ref.
315        DUT initiates connection to Ref.
316        Verify that DUT and Ref are bonded and connected.
317        """
318        advertisement = await self.ref_advertise_asha(
319            ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
320        )
321
322        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)
323
324        # DUT initiates connection to Ref.
325        dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
326
327        # DUT starts pairing with the Ref.
328        (secure, wait_security) = await asyncio.gather(
329            self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3),
330            self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3),
331        )
332
333        assert_equal(secure.result_variant(), 'success')
334        assert_equal(wait_security.result_variant(), 'success')
335
336    @avatar.parameterized(
337        (RANDOM, PUBLIC),
338        (RANDOM, RANDOM),
339    )  # type: ignore[misc]
340    @asynchronous
341    async def test_pairing_dual_device(
342        self,
343        dut_address_type: OwnAddressType,
344        ref_address_type: OwnAddressType,
345    ) -> None:
346        """
347        DUT discovers Ref.
348        DUT initiates connection to Ref.
349        Verify that DUT and Ref are bonded and connected.
350        """
351
352        async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]:
353            advertisement = await self.ref_advertise_asha(
354                ref_device=ref_device, ref_address_type=ref_address_type, ear=ear
355            )
356            ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=ear)
357            # DUT initiates connection to ref_device.
358            dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
359            advertisement.cancel()
360
361            return dut_ref, ref_dut
362
363        ((dut_ref_left, ref_left_dut), (dut_ref_right, ref_right_dut)) = await asyncio.gather(
364            ref_device_connect(self.ref_left, Ear.LEFT), ref_device_connect(self.ref_right, Ear.RIGHT)
365        )
366
367        # DUT starts pairing with the ref_left
368        (secure_left, wait_security_left) = await asyncio.gather(
369            self.dut.aio.security.Secure(connection=dut_ref_left, le=LE_LEVEL3),
370            self.ref_left.aio.security.WaitSecurity(connection=ref_left_dut, le=LE_LEVEL3),
371        )
372
373        assert_equal(secure_left.result_variant(), 'success')
374        assert_equal(wait_security_left.result_variant(), 'success')
375
376        # DUT starts pairing with the ref_right
377        (secure_right, wait_security_right) = await asyncio.gather(
378            self.dut.aio.security.Secure(connection=dut_ref_right, le=LE_LEVEL3),
379            self.ref_right.aio.security.WaitSecurity(connection=ref_right_dut, le=LE_LEVEL3),
380        )
381
382        assert_equal(secure_right.result_variant(), 'success')
383        assert_equal(wait_security_right.result_variant(), 'success')
384
385        await asyncio.gather(
386            self.ref_left.aio.host.Disconnect(connection=ref_left_dut),
387            self.dut.aio.host.WaitDisconnection(connection=dut_ref_left),
388        )
389        await asyncio.gather(
390            self.ref_right.aio.host.Disconnect(connection=ref_right_dut),
391            self.dut.aio.host.WaitDisconnection(connection=dut_ref_right),
392        )
393
394    @avatar.parameterized(
395        (RANDOM, PUBLIC),
396        (RANDOM, RANDOM),
397    )  # type: ignore[misc]
398    @asynchronous
399    async def test_unbonding(
400        self,
401        dut_address_type: OwnAddressType,
402        ref_address_type: OwnAddressType,
403    ) -> None:
404        """
405        DUT removes bond with Ref.
406        Verify that DUT and Ref are disconnected and unbonded.
407        """
408        raise signals.TestSkip("TODO: update rootcanal to retry")
409
410        advertisement = await self.ref_advertise_asha(
411            ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
412        )
413        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)
414
415        dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
416
417        secure = self.dut.security.Secure(connection=dut_ref, le=LE_LEVEL3)
418
419        assert_equal(secure.WhichOneof("result"), "success")
420        await self.dut.aio.host.Disconnect(dut_ref)
421        await self.ref_left.aio.host.WaitDisconnection(ref_dut)
422
423        # delete the bond
424        await self.dut.aio.security_storage.DeleteBond(random=self.ref_left.random_address)
425
426        # DUT connect to REF again
427        dut_ref = (
428            await self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict())
429        ).connection
430        # TODO very likely there is a bug in android here
431        logging.debug("result should come out")
432
433        advertisement.cancel()
434        assert_is_not_none(dut_ref)
435
436        secure = await self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3)
437
438        assert_equal(secure.WhichOneof("result"), "success")
439
440    @avatar.parameterized(
441        (RANDOM, RANDOM),
442        (RANDOM, PUBLIC),
443    )  # type: ignore[misc]
444    @asynchronous
445    async def test_connection(self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType) -> None:
446        """
447        DUT discovers Ref.
448        DUT initiates connection to Ref.
449        Verify that DUT and Ref are connected.
450        """
451        advertisement = await self.ref_advertise_asha(
452            ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
453        )
454        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)
455
456        _, _ = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
457
458    @avatar.parameterized(
459        (RANDOM, RANDOM),
460        (RANDOM, PUBLIC),
461    )  # type: ignore[misc]
462    @asynchronous
463    async def test_disconnect_initiator(
464        self,
465        dut_address_type: OwnAddressType,
466        ref_address_type: OwnAddressType,
467    ) -> None:
468        """
469        DUT initiates disconnection to Ref.
470        Verify that DUT and Ref are disconnected.
471        """
472        advertisement = await self.ref_advertise_asha(
473            ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
474        )
475        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)
476
477        dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
478        advertisement.cancel()
479
480        await self.dut.aio.host.Disconnect(connection=dut_ref)
481        assert_false(await self.is_device_connected(self.ref_left, ref_dut, 5), "Should be disconnected")
482
483    @avatar.parameterized(
484        (RANDOM, RANDOM),
485        (RANDOM, PUBLIC),
486    )  # type: ignore[misc]
487    @asynchronous
488    async def test_disconnect_initiator_dual_device(
489        self,
490        dut_address_type: OwnAddressType,
491        ref_address_type: OwnAddressType,
492    ) -> None:
493        """
494        DUT initiates disconnection to Ref.
495        Verify that DUT and Ref are disconnected.
496        """
497
498        async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]:
499            advertisement = await self.ref_advertise_asha(
500                ref_device=ref_device, ref_address_type=ref_address_type, ear=ear
501            )
502            ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=ear)
503            # DUT initiates connection to ref_device.
504            dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
505            advertisement.cancel()
506
507            return dut_ref, ref_dut
508
509        ((dut_ref_left, ref_left_dut), (dut_ref_right, ref_right_dut)) = await asyncio.gather(
510            ref_device_connect(self.ref_left, Ear.LEFT), ref_device_connect(self.ref_right, Ear.RIGHT)
511        )
512
513        # Disconnect from DUT
514        await asyncio.gather(
515            self.dut.aio.host.Disconnect(connection=dut_ref_left),
516            self.dut.aio.host.Disconnect(connection=dut_ref_right),
517        )
518
519        # Verify the Refs are disconnected
520        assert_false(await self.is_device_connected(self.ref_left, ref_left_dut, 5), "Should be disconnected")
521        assert_false(await self.is_device_connected(self.ref_right, ref_right_dut, 5), "Should be disconnected")
522
523    @avatar.parameterized(
524        (RANDOM, RANDOM),
525        (RANDOM, PUBLIC),
526    )  # type: ignore[misc]
527    @asynchronous
528    async def test_disconnect_acceptor(
529        self,
530        dut_address_type: OwnAddressType,
531        ref_address_type: OwnAddressType,
532    ) -> None:
533        """
534        Ref initiates disconnection to DUT (typically when put back in its box).
535        Verify that Ref is disconnected.
536        """
537        advertisement = await self.ref_advertise_asha(
538            ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
539        )
540        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)
541
542        dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
543        advertisement.cancel()
544
545        await self.ref_left.aio.host.Disconnect(connection=ref_dut)
546        assert_false(await self.is_device_connected(self.dut, dut_ref, 5), "Should be disconnected")
547
548    @avatar.parameterized(
549        (RANDOM, RANDOM, 0),
550        (RANDOM, RANDOM, 0.5),
551        (RANDOM, RANDOM, 1),
552        (RANDOM, RANDOM, 5),
553    )  # type: ignore[misc]
554    @asynchronous
555    async def test_reconnection(
556        self,
557        dut_address_type: OwnAddressType,
558        ref_address_type: OwnAddressType,
559        reconnection_gap: float,
560    ) -> None:
561        """
562        DUT initiates disconnection to the Ref.
563        Verify that DUT and Ref are disconnected.
564        DUT reconnects to Ref after various certain time.
565        Verify that DUT and Ref are connected.
566        """
567
568        async def connect_and_disconnect() -> None:
569            advertisement = await self.ref_advertise_asha(
570                ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
571            )
572            ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)
573            dut_ref, _ = await self.dut_connect_to_ref(advertisement, ref, dut_address_type)
574            await self.dut.aio.host.Disconnect(connection=dut_ref)
575
576        await connect_and_disconnect()
577        # simulating reconnect interval
578        await asyncio.sleep(reconnection_gap)
579        await connect_and_disconnect()
580
581    @avatar.parameterized(
582        (RANDOM, RANDOM),
583        (RANDOM, PUBLIC),
584    )  # type: ignore[misc]
585    @asynchronous
586    async def test_auto_connection(
587        self,
588        dut_address_type: OwnAddressType,
589        ref_address_type: OwnAddressType,
590    ) -> None:
591        """
592        Ref initiates disconnection to DUT.
593        Ref starts sending ASHA advertisements.
594        Verify that DUT auto-connects to Ref.
595        """
596        advertisement = await self.ref_advertise_asha(
597            ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
598        )
599        ref = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)
600
601        # manually connect and not cancel advertisement
602        dut_ref_res, ref_dut_res = await asyncio.gather(
603            self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref.address_asdict()),
604            anext(aiter(advertisement)),  # pytype: disable=name-error
605        )
606        assert_equal(dut_ref_res.result_variant(), 'connection')
607        dut_ref, ref_dut = dut_ref_res.connection, ref_dut_res.connection
608        assert_is_not_none(dut_ref)
609        assert dut_ref
610
611        # Pairing
612        (secure, wait_security) = await asyncio.gather(
613            self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3),
614            self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3),
615        )
616        assert_equal(secure.result_variant(), 'success')
617        assert_equal(wait_security.result_variant(), 'success')
618
619        await self.ref_left.aio.host.Disconnect(connection=ref_dut)
620
621        ref_dut = (await anext(aiter(advertisement))).connection
622        advertisement.cancel()
623
624    @avatar.parameterized(
625        (RANDOM, RANDOM, Ear.LEFT),
626        (RANDOM, PUBLIC, Ear.RIGHT),
627    )  # type: ignore[misc]
628    @asynchronous
629    async def test_disconnect_acceptor_dual_device(
630        self,
631        dut_address_type: OwnAddressType,
632        ref_address_type: OwnAddressType,
633        disconnect_device: Ear,
634    ) -> None:
635        """
636        Prerequisites: DUT and Ref are connected and bonded.
637        Description:
638           1. One peripheral of Ref initiates disconnection to DUT.
639           2. Verify that it is disconnected and that the other peripheral is still connected.
640        """
641
642        advertisement_left = await self.ref_advertise_asha(
643            ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
644        )
645        ref_left = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)
646        _, ref_left_dut = await self.dut_connect_to_ref(
647            advertisement=advertisement_left, ref=ref_left, dut_address_type=dut_address_type
648        )
649        advertisement_left.cancel()
650
651        advertisement_right = await self.ref_advertise_asha(
652            ref_device=self.ref_right, ref_address_type=ref_address_type, ear=Ear.RIGHT
653        )
654        ref_right = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.RIGHT)
655        _, ref_right_dut = await self.dut_connect_to_ref(
656            advertisement=advertisement_right, ref=ref_right, dut_address_type=dut_address_type
657        )
658        advertisement_right.cancel()
659
660        if disconnect_device == Ear.LEFT:
661            await self.ref_left.aio.host.Disconnect(connection=ref_left_dut)
662            assert_true(await self.is_device_connected(self.ref_right, ref_right_dut, 5), "Should be disconnected")
663            assert_false(await self.is_device_connected(self.ref_left, ref_left_dut, 5), "Should be disconnected")
664        else:
665            await self.ref_right.aio.host.Disconnect(connection=ref_right_dut)
666            assert_false(await self.is_device_connected(self.ref_right, ref_right_dut, 5), "Should be disconnected")
667            assert_true(await self.is_device_connected(self.ref_left, ref_left_dut, 5), "Should be disconnected")
668
669    @avatar.parameterized(
670        (RANDOM, RANDOM, Ear.LEFT),
671        (RANDOM, RANDOM, Ear.RIGHT),
672        (RANDOM, PUBLIC, Ear.LEFT),
673        (RANDOM, PUBLIC, Ear.RIGHT),
674    )  # type: ignore[misc]
675    @asynchronous
676    async def test_auto_connection_dual_device(
677        self, dut_address_type: OwnAddressType, ref_address_type: OwnAddressType, tested_device: Ear
678    ) -> None:
679        """
680        Prerequisites: DUT and Ref are connected and bonded. Ref is a dual device.
681        Description:
682           1. One peripheral of Ref initiates disconnection to DUT.
683           2. The disconnected peripheral starts sending ASHA advertisements.
684           3. Verify that DUT auto-connects to the peripheral.
685        """
686
687        advertisement_left = await self.ref_advertise_asha(
688            ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
689        )
690        ref_left = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.LEFT)
691        (dut_ref_left_res, ref_left_dut_res) = await asyncio.gather(
692            self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref_left.address_asdict()),
693            anext(aiter(advertisement_left)),  # pytype: disable=name-error
694        )
695        assert_equal(dut_ref_left_res.result_variant(), 'connection')
696        dut_ref_left, ref_left_dut = dut_ref_left_res.connection, ref_left_dut_res.connection
697        assert_is_not_none(dut_ref_left)
698        assert dut_ref_left
699        advertisement_left.cancel()
700
701        advertisement_right = await self.ref_advertise_asha(
702            ref_device=self.ref_right, ref_address_type=ref_address_type, ear=Ear.RIGHT
703        )
704        ref_right = await self.dut_scan_for_asha(dut_address_type=dut_address_type, ear=Ear.RIGHT)
705        (dut_ref_right_res, ref_right_dut_res) = await asyncio.gather(
706            self.dut.aio.host.ConnectLE(own_address_type=dut_address_type, **ref_right.address_asdict()),
707            anext(aiter(advertisement_right)),  # pytype: disable=name-error
708        )
709        assert_equal(dut_ref_right_res.result_variant(), 'connection')
710        dut_ref_right, ref_right_dut = dut_ref_right_res.connection, ref_right_dut_res.connection
711        assert_is_not_none(dut_ref_right)
712        assert dut_ref_right
713        advertisement_right.cancel()
714
715        # Pairing
716        (secure_left, wait_security_left) = await asyncio.gather(
717            self.dut.aio.security.Secure(connection=dut_ref_left, le=LE_LEVEL3),
718            self.ref_left.aio.security.WaitSecurity(connection=ref_left_dut, le=LE_LEVEL3),
719        )
720        assert_equal(secure_left.result_variant(), 'success')
721        assert_equal(wait_security_left.result_variant(), 'success')
722
723        (secure_right, wait_security_right) = await asyncio.gather(
724            self.dut.aio.security.Secure(connection=dut_ref_right, le=LE_LEVEL3),
725            self.ref_right.aio.security.WaitSecurity(connection=ref_right_dut, le=LE_LEVEL3),
726        )
727        assert_equal(secure_right.result_variant(), 'success')
728        assert_equal(wait_security_right.result_variant(), 'success')
729
730        if tested_device == Ear.LEFT:
731            await asyncio.gather(
732                self.ref_left.aio.host.Disconnect(connection=ref_left_dut),
733                self.dut.aio.host.WaitDisconnection(connection=dut_ref_left),
734            )
735            assert_false(await self.is_device_connected(self.ref_left, ref_left_dut, 5), "Should be disconnected")
736
737            advertisement_left = await self.ref_advertise_asha(
738                ref_device=self.ref_left, ref_address_type=ref_address_type, ear=Ear.LEFT
739            )
740            ref_left_dut = (await anext(aiter(advertisement_left))).connection
741            advertisement_left.cancel()
742        else:
743            await asyncio.gather(
744                self.ref_right.aio.host.Disconnect(connection=ref_right_dut),
745                self.dut.aio.host.WaitDisconnection(connection=dut_ref_right),
746            )
747            assert_false(await self.is_device_connected(self.ref_right, ref_right_dut, 5), "Should be disconnected")
748
749            advertisement_right = await self.ref_advertise_asha(
750                ref_device=self.ref_right, ref_address_type=ref_address_type, ear=Ear.RIGHT
751            )
752            ref_right_dut = (await anext(aiter(advertisement_right))).connection
753            advertisement_right.cancel()
754
755    @asynchronous
756    async def test_music_start(self) -> None:
757        """
758        DUT discovers Ref.
759        DUT initiates connection to Ref.
760        Verify that DUT and Ref are bonded and connected.
761        DUT starts media streaming.
762        Verify that DUT sends a correct AudioControlPoint `Start` command (codec=1,
763        audiotype=0, volume=<volume set on DUT>, otherstate=<state of Ref aux if dual devices>).
764        """
765
766        async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]:
767            advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear)
768            ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear)
769            # DUT initiates connection to ref_device.
770            dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM)
771            advertisement.cancel()
772
773            return dut_ref, ref_dut
774
775        dut_ref, ref_dut = await ref_device_connect(self.ref_left, Ear.LEFT)
776        le_psm_future = self.get_le_psm_future(self.ref_left)
777        read_only_properties_future = self.get_read_only_properties_future(self.ref_left)
778
779        # DUT starts pairing with the ref_left
780        (secure, wait_security) = await asyncio.gather(
781            self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3),
782            self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3),
783        )
784
785        assert_equal(secure.result_variant(), 'success')
786        assert_equal(wait_security.result_variant(), 'success')
787
788        le_psm_out_result = await asyncio.wait_for(le_psm_future, timeout=3.0)
789        assert_is_not_none(le_psm_out_result)
790
791        read_only_properties_result = await asyncio.wait_for(read_only_properties_future, timeout=3.0)
792        assert_is_not_none(read_only_properties_result)
793
794        dut_asha = AioAsha(self.dut.aio.channel)
795        start_future = self.get_start_future(self.ref_left)
796
797        logging.info("send start")
798        await dut_asha.WaitPeripheral(connection=dut_ref)
799        _, start_result = await asyncio.gather(
800            dut_asha.Start(connection=dut_ref), asyncio.wait_for(start_future, timeout=3.0)
801        )
802
803        logging.info(f"start_result:{start_result}")
804        assert_is_not_none(start_result)
805        assert_equal(start_result['codec'], 1)
806        assert_equal(start_result['audiotype'], 0)
807        assert_is_not_none(start_result['volume'])
808        assert_equal(start_result['otherstate'], 0)
809
810    @asynchronous
811    async def test_set_volume(self) -> None:
812        """
813        DUT discovers Ref.
814        DUT initiates connection to Ref.
815        Verify that DUT and Ref are bonded and connected.
816        DUT is streaming media to Ref.
817        Change volume on DUT.
818        Verify DUT writes the correct value to ASHA `Volume` characteristic.
819        """
820        raise signals.TestSkip("TODO: update bt test interface for SetVolume to retry")
821
822        advertisement = await self.ref_advertise_asha(ref_device=self.ref_left, ref_address_type=RANDOM, ear=Ear.LEFT)
823
824        ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=Ear.LEFT)
825
826        # DUT initiates connection to Ref.
827        dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM)
828
829        # DUT starts pairing with the ref_left
830        (secure, wait_security) = await asyncio.gather(
831            self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3),
832            self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3),
833        )
834
835        assert_equal(secure.result_variant(), 'success')
836        assert_equal(wait_security.result_variant(), 'success')
837
838        asha_service = next((x for x in self.ref_left.device.gatt_server.attributes if isinstance(x, AshaGattService)))
839        dut_asha = AioAsha(self.dut.aio.channel)
840
841        volume_future = asyncio.get_running_loop().create_future()
842
843        def volume_command_handler(connection: Connection, data: int):
844            volume_future.set_result(data)
845
846        asha_service.on('volume', volume_command_handler)
847
848        await dut_asha.WaitPeripheral(connection=dut_ref)
849        await dut_asha.Start(connection=dut_ref)
850        # set volume to max volume
851        _, volume_result = await asyncio.gather(dut_asha.SetVolume(1), asyncio.wait_for(volume_future, timeout=3.0))
852
853        logging.info(f"start_result:{volume_result}")
854        assert_is_not_none(volume_result)
855        assert_equal(volume_result, 0)
856
857    @asynchronous
858    async def test_music_stop(self) -> None:
859        """
860        DUT discovers Ref.
861        DUT initiates connection to Ref.
862        Verify that DUT and Ref are bonded and connected.
863        DUT is streaming media to Ref.
864        DUT stops media streaming on Ref.
865        Verify that DUT sends a correct AudioControlPoint `Stop` command.
866        """
867
868        async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]:
869            advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear)
870            ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear)
871            # DUT initiates connection to ref_device.
872            dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM)
873            advertisement.cancel()
874
875            return dut_ref, ref_dut
876
877        dut_ref, ref_dut = await ref_device_connect(self.ref_left, Ear.LEFT)
878
879        # DUT starts pairing with the ref_left
880        (secure, wait_security) = await asyncio.gather(
881            self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3),
882            self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3),
883        )
884
885        assert_equal(secure.result_variant(), 'success')
886        assert_equal(wait_security.result_variant(), 'success')
887
888        dut_asha = AioAsha(self.dut.aio.channel)
889
890        stop_future = self.get_stop_future(self.ref_left)
891
892        await dut_asha.WaitPeripheral(connection=dut_ref)
893        await dut_asha.Start(connection=dut_ref)
894        logging.info("send stop")
895        _, stop_result = await asyncio.gather(dut_asha.Stop(), asyncio.wait_for(stop_future, timeout=10.0))
896
897        logging.info(f"stop_result:{stop_result}")
898        assert_is_not_none(stop_result)
899
900        # Sleep 0.5 second to mitigate flaky test first.
901        await asyncio.sleep(0.5)
902
903        audio_data = await self.get_audio_data(
904            ref_asha=AioAsha(self.ref_left.aio.channel), connection=ref_dut, timeout=10
905        )
906        assert_equal(len(audio_data), 0)
907
908    @asynchronous
909    async def test_music_restart(self) -> None:
910        """
911        DUT discovers Ref.
912        DUT initiates connection to Ref.
913        Verify that DUT and Ref are bonded and connected.
914        DUT starts media streaming.
915        DUT stops media streaming.
916        Verify that DUT sends a correct AudioControlPoint `Stop` command.
917        DUT starts media streaming again.
918        Verify that DUT sends a correct AudioControlPoint `Start` command.
919        """
920
921        async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]:
922            advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear)
923            ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear)
924            # DUT initiates connection to ref_device.
925            dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM)
926            advertisement.cancel()
927
928            return dut_ref, ref_dut
929
930        dut_ref, ref_dut = await ref_device_connect(self.ref_left, Ear.LEFT)
931
932        # DUT starts pairing with the ref_left
933        (secure, wait_security) = await asyncio.gather(
934            self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3),
935            self.ref_left.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3),
936        )
937
938        assert_equal(secure.result_variant(), 'success')
939        assert_equal(wait_security.result_variant(), 'success')
940
941        dut_asha = AioAsha(self.dut.aio.channel)
942
943        stop_future = self.get_stop_future(self.ref_left)
944
945        await dut_asha.WaitPeripheral(connection=dut_ref)
946        await dut_asha.Start(connection=dut_ref)
947        _, stop_result = await asyncio.gather(dut_asha.Stop(), asyncio.wait_for(stop_future, timeout=10.0))
948
949        logging.info(f"stop_result:{stop_result}")
950        assert_is_not_none(stop_result)
951
952        # restart music streaming
953        logging.info("restart music streaming")
954
955        start_future = self.get_start_future(self.ref_left)
956
957        await dut_asha.WaitPeripheral(connection=dut_ref)
958        _, start_result = await asyncio.gather(
959            dut_asha.Start(connection=dut_ref), asyncio.wait_for(start_future, timeout=3.0)
960        )
961
962        logging.info(f"start_result:{start_result}")
963        assert_is_not_none(start_result)
964
965    @asynchronous
966    async def test_music_start_dual_device(self) -> None:
967        """
968        DUT discovers Ref.
969        DUT initiates connection to Ref.
970        Verify that DUT and Ref are bonded and connected.
971        DUT starts media streaming.
972        Verify that DUT sends a correct AudioControlPoint `Start` command (codec=1,
973        audiotype=0, volume=<volume set on DUT>, otherstate=<state of Ref aux if dual devices>).
974        """
975
976        async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]:
977            advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear)
978            ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear)
979            # DUT initiates connection to ref_device.
980            dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM)
981            advertisement.cancel()
982
983            return dut_ref, ref_dut
984
985        # connect ref_left
986        dut_ref_left, ref_left_dut = await ref_device_connect(self.ref_left, Ear.LEFT)
987        le_psm_future_left = self.get_le_psm_future(self.ref_left)
988        read_only_properties_future_left = self.get_read_only_properties_future(self.ref_left)
989
990        # DUT starts pairing with the ref_left
991        (secure_left, wait_security_left) = await asyncio.gather(
992            self.dut.aio.security.Secure(connection=dut_ref_left, le=LE_LEVEL3),
993            self.ref_left.aio.security.WaitSecurity(connection=ref_left_dut, le=LE_LEVEL3),
994        )
995
996        assert_equal(secure_left.result_variant(), 'success')
997        assert_equal(wait_security_left.result_variant(), 'success')
998
999        le_psm_out_result_left = await asyncio.wait_for(le_psm_future_left, timeout=3.0)
1000        assert_is_not_none(le_psm_out_result_left)
1001
1002        read_only_properties_result_left = await asyncio.wait_for(read_only_properties_future_left, timeout=3.0)
1003        assert_is_not_none(read_only_properties_result_left)
1004
1005        dut_asha = AioAsha(self.dut.aio.channel)
1006        start_future_left = self.get_start_future(self.ref_left)
1007
1008        logging.info("send start")
1009        await dut_asha.WaitPeripheral(connection=dut_ref_left)
1010        _, start_result_left = await asyncio.gather(
1011            dut_asha.Start(connection=dut_ref_left), asyncio.wait_for(start_future_left, timeout=3.0)
1012        )
1013
1014        logging.info(f"start_result_left:{start_result_left}")
1015        assert_is_not_none(start_result_left)
1016        assert_equal(start_result_left['codec'], 1)
1017        assert_equal(start_result_left['audiotype'], 0)
1018        assert_is_not_none(start_result_left['volume'])
1019        assert_equal(start_result_left['otherstate'], 0)
1020
1021        # Start playing audio before connecting to ref_right
1022        generated_audio = self.generate_sine(connection=dut_ref_left)
1023        dut_asha.PlaybackAudio(generated_audio)
1024
1025        # connect ref_right
1026        dut_ref_right, ref_right_dut = await ref_device_connect(self.ref_right, Ear.RIGHT)
1027        le_psm_future_right = self.get_le_psm_future(self.ref_right)
1028        read_only_properties_future_right = self.get_read_only_properties_future(self.ref_right)
1029
1030        # DUT starts pairing with the ref_right
1031        (secure_right, wait_security_right) = await asyncio.gather(
1032            self.dut.aio.security.Secure(connection=dut_ref_right, le=LE_LEVEL3),
1033            self.ref_right.aio.security.WaitSecurity(connection=ref_right_dut, le=LE_LEVEL3),
1034        )
1035
1036        assert_equal(secure_right.result_variant(), 'success')
1037        assert_equal(wait_security_right.result_variant(), 'success')
1038
1039        le_psm_out_result_right = await asyncio.wait_for(le_psm_future_right, timeout=3.0)
1040        assert_is_not_none(le_psm_out_result_right)
1041
1042        read_only_properties_result_right = await asyncio.wait_for(read_only_properties_future_right, timeout=3.0)
1043        assert_is_not_none(read_only_properties_result_right)
1044
1045        start_future_right = self.get_start_future(self.ref_right)
1046
1047        logging.info("send start_right")
1048        await dut_asha.WaitPeripheral(connection=dut_ref_right)
1049        start_result_right = await asyncio.wait_for(start_future_right, timeout=10.0)
1050
1051        logging.info(f"start_result_right:{start_result_right}")
1052        assert_is_not_none(start_result_right)
1053        assert_equal(start_result_right['codec'], 1)
1054        assert_equal(start_result_right['audiotype'], 0)
1055        assert_is_not_none(start_result_right['volume'])
1056        # ref_left already connected, otherstate = 1
1057        assert_equal(start_result_right['otherstate'], 1)
1058
1059    @asynchronous
1060    async def test_music_stop_dual_device(self) -> None:
1061        """
1062        DUT discovers Refs.
1063        DUT initiates connection to Refs.
1064        Verify that DUT and Refs are bonded and connected.
1065        DUT is streaming media to Refs.
1066        DUT stops media streaming on Refs.
1067        Verify that DUT sends a correct AudioControlPoint `Stop` command.
1068        Verify Refs cannot recevice audio data after DUT stops media streaming.
1069        """
1070
1071        async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]:
1072            advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear)
1073            ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear)
1074            # DUT initiates connection to ref_device.
1075            dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM)
1076            advertisement.cancel()
1077
1078            return dut_ref, ref_dut
1079
1080        # DUT starts connecting, pairing with the ref_left
1081        dut_ref_left, ref_left_dut = await ref_device_connect(self.ref_left, Ear.LEFT)
1082        (secure_left, wait_security_left) = await asyncio.gather(
1083            self.dut.aio.security.Secure(connection=dut_ref_left, le=LE_LEVEL3),
1084            self.ref_left.aio.security.WaitSecurity(connection=ref_left_dut, le=LE_LEVEL3),
1085        )
1086        assert_equal(secure_left.result_variant(), 'success')
1087        assert_equal(wait_security_left.result_variant(), 'success')
1088
1089        # DUT starts connecting, pairing with the ref_right
1090        dut_ref_right, ref_right_dut = await ref_device_connect(self.ref_right, Ear.RIGHT)
1091        (secure_right, wait_security_right) = await asyncio.gather(
1092            self.dut.aio.security.Secure(connection=dut_ref_right, le=LE_LEVEL3),
1093            self.ref_right.aio.security.WaitSecurity(connection=ref_right_dut, le=LE_LEVEL3),
1094        )
1095        assert_equal(secure_right.result_variant(), 'success')
1096        assert_equal(wait_security_right.result_variant(), 'success')
1097
1098        dut_asha = AioAsha(self.dut.aio.channel)
1099        ref_left_asha = AioAsha(self.ref_left.aio.channel)
1100        ref_right_asha = AioAsha(self.ref_right.aio.channel)
1101
1102        await asyncio.gather(
1103            dut_asha.WaitPeripheral(connection=dut_ref_left), dut_asha.WaitPeripheral(connection=dut_ref_right)
1104        )
1105        await dut_asha.Start(connection=dut_ref_left)
1106
1107        # Stop audio and wait until ref_device connections stopped.
1108        stop_future_left = self.get_stop_future(self.ref_left)
1109        stop_future_right = self.get_stop_future(self.ref_right)
1110
1111        logging.info("send stop")
1112        _, stop_result_left, stop_result_right = await asyncio.gather(
1113            dut_asha.Stop(),
1114            asyncio.wait_for(stop_future_left, timeout=10.0),
1115            asyncio.wait_for(stop_future_right, timeout=10.0),
1116        )
1117
1118        logging.info(f"stop_result_left:{stop_result_left}")
1119        logging.info(f"stop_result_right:{stop_result_right}")
1120        assert_is_not_none(stop_result_left)
1121        assert_is_not_none(stop_result_right)
1122
1123        # Sleep 0.5 second to mitigate flaky test first.
1124        await asyncio.sleep(0.5)
1125
1126        (audio_data_left, audio_data_right) = await asyncio.gather(
1127            self.get_audio_data(ref_asha=ref_left_asha, connection=ref_left_dut, timeout=10),
1128            self.get_audio_data(ref_asha=ref_right_asha, connection=ref_right_dut, timeout=10),
1129        )
1130
1131        assert_equal(len(audio_data_left), 0)
1132        assert_equal(len(audio_data_right), 0)
1133
1134    @asynchronous
1135    async def test_music_audio_playback(self) -> None:
1136        """
1137        DUT discovers Ref.
1138        DUT initiates connection to Ref.
1139        Verify that DUT and Ref are bonded and connected.
1140        DUT is streaming media to Ref using playback API.
1141        Verify that Ref has received audio data.
1142        """
1143
1144        async def ref_device_connect(ref_device: BumblePandoraDevice, ear: Ear) -> Tuple[Connection, Connection]:
1145            advertisement = await self.ref_advertise_asha(ref_device=ref_device, ref_address_type=RANDOM, ear=ear)
1146            ref = await self.dut_scan_for_asha(dut_address_type=RANDOM, ear=ear)
1147            # DUT initiates connection to ref_device.
1148            dut_ref, ref_dut = await self.dut_connect_to_ref(advertisement, ref, RANDOM)
1149            advertisement.cancel()
1150
1151            return dut_ref, ref_dut
1152
1153        dut_ref_left, ref_left_dut = await ref_device_connect(self.ref_left, Ear.LEFT)
1154
1155        # DUT starts pairing with the ref_left
1156        (secure_left, wait_security_left) = await asyncio.gather(
1157            self.dut.aio.security.Secure(connection=dut_ref_left, le=LE_LEVEL3),
1158            self.ref_left.aio.security.WaitSecurity(connection=ref_left_dut, le=LE_LEVEL3),
1159        )
1160
1161        assert_equal(secure_left.result_variant(), 'success')
1162        assert_equal(wait_security_left.result_variant(), 'success')
1163
1164        dut_asha = AioAsha(self.dut.aio.channel)
1165        ref_asha = AioAsha(self.ref_left.aio.channel)
1166
1167        await dut_asha.WaitPeripheral(connection=dut_ref_left)
1168        await dut_asha.Start(connection=dut_ref_left)
1169
1170        # Clear audio data before start audio playback testing
1171        await self.get_audio_data(ref_asha=ref_asha, connection=ref_left_dut, timeout=10)
1172
1173        generated_audio = self.generate_sine(connection=dut_ref_left)
1174
1175        _, audio_data = await asyncio.gather(
1176            dut_asha.PlaybackAudio(generated_audio),
1177            self.get_audio_data(ref_asha=ref_asha, connection=ref_left_dut, timeout=10),
1178        )
1179
1180        assert_not_equal(len(audio_data), 0)
1181        # TODO(duoho): decode audio_data and verify the content
1182
1183
1184if __name__ == "__main__":
1185    logging.basicConfig(level=logging.DEBUG)
1186    test_runner.main()  # type: ignore
1187