1#
2#   Copyright 2021 - The Android Open Source Project
3#
4#   Licensed under the Apache License, Version 2.0 (the "License");
5#   you may not use this file except in compliance with the License.
6#   You may obtain a copy of the License at
7#
8#       http://www.apache.org/licenses/LICENSE-2.0
9#
10#   Unless required by applicable law or agreed to in writing, software
11#   distributed under the License is distributed on an "AS IS" BASIS,
12#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13#   See the License for the specific language governing permissions and
14#   limitations under the License.
15
16from blueberry.tests.gd.cert.closable import safeClose
17from blueberry.tests.gd.cert.matchers import IsoMatchers
18from blueberry.tests.gd.cert.metadata import metadata
19from blueberry.tests.gd.cert.py_le_acl_manager import PyLeAclManager
20from blueberry.tests.gd.cert.py_le_iso import PyLeIso
21from blueberry.tests.gd.cert.py_le_iso import CisTestParameters
22from blueberry.tests.gd.cert.truth import assertThat
23from blueberry.tests.gd.cert import gd_base_test
24from blueberry.tests.gd.iso.cert_le_iso import CertLeIso
25from blueberry.facade import common_pb2 as common
26from blueberry.facade.hci import controller_facade_pb2 as controller_facade
27from blueberry.facade.hci import le_advertising_manager_facade_pb2 as le_advertising_facade
28from blueberry.facade.hci import le_initiator_address_facade_pb2 as le_initiator_address_facade
29from mobly import asserts
30from mobly import test_runner
31import hci_packets as hci
32
33
34class LeIsoTest(gd_base_test.GdBaseTestClass):
35
36    def setup_class(self):
37        gd_base_test.GdBaseTestClass.setup_class(self, dut_module='HCI_INTERFACES', cert_module='HCI_INTERFACES')
38
39    def setup_test(self):
40        gd_base_test.GdBaseTestClass.setup_test(self)
41
42        self.dut_le_acl_manager = PyLeAclManager(self.dut)
43        self.cert_le_acl_manager = PyLeAclManager(self.cert)
44
45        self.dut_address = common.BluetoothAddressWithType(
46            address=common.BluetoothAddress(address=bytes(b'D0:05:04:03:02:01')), type=common.RANDOM_DEVICE_ADDRESS)
47        self.cert_address = common.BluetoothAddressWithType(
48            address=common.BluetoothAddress(address=bytes(b'C0:11:FF:AA:33:22')), type=common.RANDOM_DEVICE_ADDRESS)
49        dut_privacy_policy = le_initiator_address_facade.PrivacyPolicy(
50            address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS,
51            address_with_type=self.dut_address,
52            rotation_irk=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
53            minimum_rotation_time=0,
54            maximum_rotation_time=0)
55        self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(dut_privacy_policy)
56        privacy_policy = le_initiator_address_facade.PrivacyPolicy(
57            address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS,
58            address_with_type=self.cert_address,
59            rotation_irk=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
60            minimum_rotation_time=0,
61            maximum_rotation_time=0)
62        self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy)
63
64        self.dut_iso = PyLeIso(self.dut)
65        self.cert_iso = CertLeIso(self.cert)
66
67    def teardown_test(self):
68        self.dut_iso.close()
69        self.cert_iso.close()
70
71        safeClose(self.dut_le_acl_manager)
72        safeClose(self.cert_le_acl_manager)
73        gd_base_test.GdBaseTestClass.teardown_test(self)
74
75    #cert becomes central of connection, dut peripheral
76    def _setup_link_from_cert(self):
77        # DUT Advertises
78        gap_name = hci.GapData(data_type=hci.GapDataType.COMPLETE_LOCAL_NAME, data=list(bytes(b'Im_The_DUT')))
79        gap_data = le_advertising_facade.GapDataMsg(data=gap_name.serialize())
80        config = le_advertising_facade.AdvertisingConfig(
81            advertisement=[gap_data],
82            interval_min=512,
83            interval_max=768,
84            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
85            own_address_type=common.USE_PUBLIC_DEVICE_ADDRESS,
86            channel_map=7,
87            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
88        request = le_advertising_facade.CreateAdvertiserRequest(config=config)
89        create_response = self.dut.hci_le_advertising_manager.CreateAdvertiser(request)
90        self.dut_le_acl = self.dut_le_acl_manager.listen_for_incoming_connections()
91        self.cert_le_acl = self.cert_le_acl_manager.connect_to_remote(self.dut_address)
92
93    def _setup_cis_from_cert(self, cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, peripherals_clock_accuracy,
94                             packing, framing, max_transport_latency_m_to_s, max_transport_latency_s_to_m, cis_id,
95                             max_sdu_m_to_s, max_sdu_s_to_m, phy_m_to_s, phy_s_to_m, bn_m_to_s, bn_s_to_m):
96
97        self.cert_iso.le_set_cig_parameters(cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m,
98                                            peripherals_clock_accuracy, packing, framing, max_transport_latency_m_to_s,
99                                            max_transport_latency_s_to_m, cis_id, max_sdu_m_to_s, max_sdu_s_to_m,
100                                            phy_m_to_s, phy_s_to_m, bn_m_to_s, bn_s_to_m)
101
102        cis_handles = self.cert_iso.wait_le_set_cig_parameters_complete()
103
104        cis_handle = cis_handles[0]
105
106        acl_connection_handle = self.cert_le_acl.handle
107        self.cert_iso.le_cretate_cis([(cis_handle, acl_connection_handle)])
108        dut_cis_stream = self.dut_iso.wait_le_cis_established()
109        cert_cis_stream = self.cert_iso.wait_le_cis_established()
110        return (dut_cis_stream, cert_cis_stream)
111
112    def skip_if_iso_not_supported(self):
113        supported = self.dut.hci_controller.IsSupportedCommand(
114            controller_facade.OpCodeMsg(op_code=int(hci.OpCode.LE_SET_CIG_PARAMETERS)))
115        if (not supported.supported):
116            asserts.skip("Skipping this test.  The chip doesn't support LE ISO")
117
118    @metadata(pts_test_id="IAL/CIS/UNF/SLA/BV-01-C",
119              pts_test_name="connected isochronous stream, unframed data, peripheral role")
120    def test_iso_cis_unf_sla_bv_01_c(self):
121        self.skip_if_iso_not_supported()
122        """
123            Verify that the IUT can send an SDU with length ≤ the Isochronous PDU length.
124        """
125        cig_id = 0x01
126        sdu_interval_m_to_s = 0
127        sdu_interval_s_to_m = 0x186a
128        peripherals_clock_accuracy = 0
129        packing = 0
130        framing = 0
131        max_transport_latency_m_to_s = 0
132        max_transport_latency_s_to_m = 7
133        cis_id = 0x01
134        max_sdu_m_to_s = 0
135        max_sdu_s_to_m = 100
136        phy_m_to_s = 0x02
137        phy_s_to_m = 0x02
138        bn_m_to_s = 0
139        bn_s_to_m = 2
140
141        self._setup_link_from_cert()
142        (dut_cis_stream,
143         cert_cis_stream) = self._setup_cis_from_cert(cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m,
144                                                      peripherals_clock_accuracy, packing, framing,
145                                                      max_transport_latency_m_to_s, max_transport_latency_s_to_m,
146                                                      cis_id, max_sdu_m_to_s, max_sdu_s_to_m, phy_m_to_s, phy_s_to_m,
147                                                      bn_m_to_s, bn_s_to_m)
148        dut_cis_stream.send(b'abcdefgh' * 10)
149        assertThat(cert_cis_stream).emits(IsoMatchers.Data(b'abcdefgh' * 10))
150
151    @metadata(pts_test_id="IAL/CIS/UNF/SLA/BV-25-C",
152              pts_test_name="connected isochronous stream, unframed data, peripheral role")
153    def test_iso_cis_unf_sla_bv_25_c(self):
154        self.skip_if_iso_not_supported()
155        """
156            Verify that the IUT can send an SDU with length ≤ the Isochronous PDU length.
157        """
158        cig_id = 0x01
159        sdu_interval_m_to_s = 0x7530
160        sdu_interval_s_to_m = 0x7530
161        peripherals_clock_accuracy = 0
162        packing = 0
163        framing = 0
164        max_transport_latency_m_to_s = 30
165        max_transport_latency_s_to_m = 30
166        cis_id = 0x01
167        max_sdu_m_to_s = 100
168        max_sdu_s_to_m = 100
169        phy_m_to_s = 0x02
170        phy_s_to_m = 0x02
171        bn_m_to_s = 3
172        bn_s_to_m = 1
173
174        self._setup_link_from_cert()
175        (dut_cis_stream,
176         cert_cis_stream) = self._setup_cis_from_cert(cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m,
177                                                      peripherals_clock_accuracy, packing, framing,
178                                                      max_transport_latency_m_to_s, max_transport_latency_s_to_m,
179                                                      cis_id, max_sdu_m_to_s, max_sdu_s_to_m, phy_m_to_s, phy_s_to_m,
180                                                      bn_m_to_s, bn_s_to_m)
181        dut_cis_stream.send(b'abcdefgh' * 10)
182        assertThat(cert_cis_stream).emits(IsoMatchers.Data(b'abcdefgh' * 10))
183
184    @metadata(pts_test_id="IAL/CIS/FRA/SLA/BV-03-C",
185              pts_test_name="connected isochronous stream, framed data, peripheral role")
186    def test_iso_cis_fra_sla_bv_03_c(self):
187        self.skip_if_iso_not_supported()
188        """
189            Verify that the IUT can send an SDU with length ≤ the Isochronous PDU length.
190        """
191        cig_id = 0x01
192        sdu_interval_m_to_s = 0x0000
193        sdu_interval_s_to_m = 0x4e30
194        peripherals_clock_accuracy = 0
195        packing = 0
196        framing = 1
197        max_transport_latency_m_to_s = 0
198        max_transport_latency_s_to_m = 21
199        cis_id = 0x01
200        max_sdu_m_to_s = 0
201        max_sdu_s_to_m = 100
202        phy_m_to_s = 0x02
203        phy_s_to_m = 0x02
204        bn_m_to_s = 0
205        bn_s_to_m = 2
206
207        self._setup_link_from_cert()
208        (dut_cis_stream,
209         cert_cis_stream) = self._setup_cis_from_cert(cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m,
210                                                      peripherals_clock_accuracy, packing, framing,
211                                                      max_transport_latency_m_to_s, max_transport_latency_s_to_m,
212                                                      cis_id, max_sdu_m_to_s, max_sdu_s_to_m, phy_m_to_s, phy_s_to_m,
213                                                      bn_m_to_s, bn_s_to_m)
214        dut_cis_stream.send(b'abcdefgh' * 10)
215        assertThat(cert_cis_stream).emits(IsoMatchers.Data(b'abcdefgh' * 10))
216
217    @metadata(pts_test_id="IAL/CIS/FRA/SLA/BV-26-C",
218              pts_test_name="connected isochronous stream, framed data, peripheral role")
219    def test_iso_cis_fra_sla_bv_26_c(self):
220        self.skip_if_iso_not_supported()
221        """
222            Verify that the IUT can send an SDU with length ≤ the Isochronous PDU length.
223        """
224        cig_id = 0x01
225        sdu_interval_m_to_s = 0x14D5
226        sdu_interval_s_to_m = 0x14D5
227        peripherals_clock_accuracy = 0
228        packing = 0
229        framing = 1
230        max_transport_latency_m_to_s = 6
231        max_transport_latency_s_to_m = 6
232        cis_id = 0x01
233        max_sdu_m_to_s = 100
234        max_sdu_s_to_m = 100
235        phy_m_to_s = 0x02
236        phy_s_to_m = 0x02
237        bn_m_to_s = 1
238        bn_s_to_m = 1
239
240        self._setup_link_from_cert()
241        (dut_cis_stream,
242         cert_cis_stream) = self._setup_cis_from_cert(cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m,
243                                                      peripherals_clock_accuracy, packing, framing,
244                                                      max_transport_latency_m_to_s, max_transport_latency_s_to_m,
245                                                      cis_id, max_sdu_m_to_s, max_sdu_s_to_m, phy_m_to_s, phy_s_to_m,
246                                                      bn_m_to_s, bn_s_to_m)
247        dut_cis_stream.send(b'abcdefgh' * 10)
248        assertThat(cert_cis_stream).emits(IsoMatchers.Data(b'abcdefgh' * 10))
249
250
251if __name__ == '__main__':
252    test_runner.main()
253