1# 2# Copyright 2021 - The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16from blueberry.tests.gd.cert.closable import safeClose 17from blueberry.tests.gd.cert.matchers import IsoMatchers 18from blueberry.tests.gd.cert.metadata import metadata 19from blueberry.tests.gd.cert.py_le_acl_manager import PyLeAclManager 20from blueberry.tests.gd.cert.py_le_iso import PyLeIso 21from blueberry.tests.gd.cert.py_le_iso import CisTestParameters 22from blueberry.tests.gd.cert.truth import assertThat 23from blueberry.tests.gd.cert import gd_base_test 24from blueberry.tests.gd.iso.cert_le_iso import CertLeIso 25from blueberry.facade import common_pb2 as common 26from blueberry.facade.hci import controller_facade_pb2 as controller_facade 27from blueberry.facade.hci import le_advertising_manager_facade_pb2 as le_advertising_facade 28from blueberry.facade.hci import le_initiator_address_facade_pb2 as le_initiator_address_facade 29from mobly import asserts 30from mobly import test_runner 31import hci_packets as hci 32 33 34class LeIsoTest(gd_base_test.GdBaseTestClass): 35 36 def setup_class(self): 37 gd_base_test.GdBaseTestClass.setup_class(self, dut_module='HCI_INTERFACES', cert_module='HCI_INTERFACES') 38 39 def setup_test(self): 40 gd_base_test.GdBaseTestClass.setup_test(self) 41 42 self.dut_le_acl_manager = PyLeAclManager(self.dut) 43 self.cert_le_acl_manager = PyLeAclManager(self.cert) 44 45 self.dut_address = common.BluetoothAddressWithType( 46 address=common.BluetoothAddress(address=bytes(b'D0:05:04:03:02:01')), type=common.RANDOM_DEVICE_ADDRESS) 47 self.cert_address = common.BluetoothAddressWithType( 48 address=common.BluetoothAddress(address=bytes(b'C0:11:FF:AA:33:22')), type=common.RANDOM_DEVICE_ADDRESS) 49 dut_privacy_policy = le_initiator_address_facade.PrivacyPolicy( 50 address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS, 51 address_with_type=self.dut_address, 52 rotation_irk=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', 53 minimum_rotation_time=0, 54 maximum_rotation_time=0) 55 self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(dut_privacy_policy) 56 privacy_policy = le_initiator_address_facade.PrivacyPolicy( 57 address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS, 58 address_with_type=self.cert_address, 59 rotation_irk=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', 60 minimum_rotation_time=0, 61 maximum_rotation_time=0) 62 self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy) 63 64 self.dut_iso = PyLeIso(self.dut) 65 self.cert_iso = CertLeIso(self.cert) 66 67 def teardown_test(self): 68 self.dut_iso.close() 69 self.cert_iso.close() 70 71 safeClose(self.dut_le_acl_manager) 72 safeClose(self.cert_le_acl_manager) 73 gd_base_test.GdBaseTestClass.teardown_test(self) 74 75 #cert becomes central of connection, dut peripheral 76 def _setup_link_from_cert(self): 77 # DUT Advertises 78 gap_name = hci.GapData(data_type=hci.GapDataType.COMPLETE_LOCAL_NAME, data=list(bytes(b'Im_The_DUT'))) 79 gap_data = le_advertising_facade.GapDataMsg(data=gap_name.serialize()) 80 config = le_advertising_facade.AdvertisingConfig( 81 advertisement=[gap_data], 82 interval_min=512, 83 interval_max=768, 84 advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, 85 own_address_type=common.USE_PUBLIC_DEVICE_ADDRESS, 86 channel_map=7, 87 filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) 88 request = le_advertising_facade.CreateAdvertiserRequest(config=config) 89 create_response = self.dut.hci_le_advertising_manager.CreateAdvertiser(request) 90 self.dut_le_acl = self.dut_le_acl_manager.listen_for_incoming_connections() 91 self.cert_le_acl = self.cert_le_acl_manager.connect_to_remote(self.dut_address) 92 93 def _setup_cis_from_cert(self, cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, peripherals_clock_accuracy, 94 packing, framing, max_transport_latency_m_to_s, max_transport_latency_s_to_m, cis_id, 95 max_sdu_m_to_s, max_sdu_s_to_m, phy_m_to_s, phy_s_to_m, bn_m_to_s, bn_s_to_m): 96 97 self.cert_iso.le_set_cig_parameters(cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, 98 peripherals_clock_accuracy, packing, framing, max_transport_latency_m_to_s, 99 max_transport_latency_s_to_m, cis_id, max_sdu_m_to_s, max_sdu_s_to_m, 100 phy_m_to_s, phy_s_to_m, bn_m_to_s, bn_s_to_m) 101 102 cis_handles = self.cert_iso.wait_le_set_cig_parameters_complete() 103 104 cis_handle = cis_handles[0] 105 106 acl_connection_handle = self.cert_le_acl.handle 107 self.cert_iso.le_cretate_cis([(cis_handle, acl_connection_handle)]) 108 dut_cis_stream = self.dut_iso.wait_le_cis_established() 109 cert_cis_stream = self.cert_iso.wait_le_cis_established() 110 return (dut_cis_stream, cert_cis_stream) 111 112 def skip_if_iso_not_supported(self): 113 supported = self.dut.hci_controller.IsSupportedCommand( 114 controller_facade.OpCodeMsg(op_code=int(hci.OpCode.LE_SET_CIG_PARAMETERS))) 115 if (not supported.supported): 116 asserts.skip("Skipping this test. The chip doesn't support LE ISO") 117 118 @metadata(pts_test_id="IAL/CIS/UNF/SLA/BV-01-C", 119 pts_test_name="connected isochronous stream, unframed data, peripheral role") 120 def test_iso_cis_unf_sla_bv_01_c(self): 121 self.skip_if_iso_not_supported() 122 """ 123 Verify that the IUT can send an SDU with length ≤ the Isochronous PDU length. 124 """ 125 cig_id = 0x01 126 sdu_interval_m_to_s = 0 127 sdu_interval_s_to_m = 0x186a 128 peripherals_clock_accuracy = 0 129 packing = 0 130 framing = 0 131 max_transport_latency_m_to_s = 0 132 max_transport_latency_s_to_m = 7 133 cis_id = 0x01 134 max_sdu_m_to_s = 0 135 max_sdu_s_to_m = 100 136 phy_m_to_s = 0x02 137 phy_s_to_m = 0x02 138 bn_m_to_s = 0 139 bn_s_to_m = 2 140 141 self._setup_link_from_cert() 142 (dut_cis_stream, 143 cert_cis_stream) = self._setup_cis_from_cert(cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, 144 peripherals_clock_accuracy, packing, framing, 145 max_transport_latency_m_to_s, max_transport_latency_s_to_m, 146 cis_id, max_sdu_m_to_s, max_sdu_s_to_m, phy_m_to_s, phy_s_to_m, 147 bn_m_to_s, bn_s_to_m) 148 dut_cis_stream.send(b'abcdefgh' * 10) 149 assertThat(cert_cis_stream).emits(IsoMatchers.Data(b'abcdefgh' * 10)) 150 151 @metadata(pts_test_id="IAL/CIS/UNF/SLA/BV-25-C", 152 pts_test_name="connected isochronous stream, unframed data, peripheral role") 153 def test_iso_cis_unf_sla_bv_25_c(self): 154 self.skip_if_iso_not_supported() 155 """ 156 Verify that the IUT can send an SDU with length ≤ the Isochronous PDU length. 157 """ 158 cig_id = 0x01 159 sdu_interval_m_to_s = 0x7530 160 sdu_interval_s_to_m = 0x7530 161 peripherals_clock_accuracy = 0 162 packing = 0 163 framing = 0 164 max_transport_latency_m_to_s = 30 165 max_transport_latency_s_to_m = 30 166 cis_id = 0x01 167 max_sdu_m_to_s = 100 168 max_sdu_s_to_m = 100 169 phy_m_to_s = 0x02 170 phy_s_to_m = 0x02 171 bn_m_to_s = 3 172 bn_s_to_m = 1 173 174 self._setup_link_from_cert() 175 (dut_cis_stream, 176 cert_cis_stream) = self._setup_cis_from_cert(cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, 177 peripherals_clock_accuracy, packing, framing, 178 max_transport_latency_m_to_s, max_transport_latency_s_to_m, 179 cis_id, max_sdu_m_to_s, max_sdu_s_to_m, phy_m_to_s, phy_s_to_m, 180 bn_m_to_s, bn_s_to_m) 181 dut_cis_stream.send(b'abcdefgh' * 10) 182 assertThat(cert_cis_stream).emits(IsoMatchers.Data(b'abcdefgh' * 10)) 183 184 @metadata(pts_test_id="IAL/CIS/FRA/SLA/BV-03-C", 185 pts_test_name="connected isochronous stream, framed data, peripheral role") 186 def test_iso_cis_fra_sla_bv_03_c(self): 187 self.skip_if_iso_not_supported() 188 """ 189 Verify that the IUT can send an SDU with length ≤ the Isochronous PDU length. 190 """ 191 cig_id = 0x01 192 sdu_interval_m_to_s = 0x0000 193 sdu_interval_s_to_m = 0x4e30 194 peripherals_clock_accuracy = 0 195 packing = 0 196 framing = 1 197 max_transport_latency_m_to_s = 0 198 max_transport_latency_s_to_m = 21 199 cis_id = 0x01 200 max_sdu_m_to_s = 0 201 max_sdu_s_to_m = 100 202 phy_m_to_s = 0x02 203 phy_s_to_m = 0x02 204 bn_m_to_s = 0 205 bn_s_to_m = 2 206 207 self._setup_link_from_cert() 208 (dut_cis_stream, 209 cert_cis_stream) = self._setup_cis_from_cert(cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, 210 peripherals_clock_accuracy, packing, framing, 211 max_transport_latency_m_to_s, max_transport_latency_s_to_m, 212 cis_id, max_sdu_m_to_s, max_sdu_s_to_m, phy_m_to_s, phy_s_to_m, 213 bn_m_to_s, bn_s_to_m) 214 dut_cis_stream.send(b'abcdefgh' * 10) 215 assertThat(cert_cis_stream).emits(IsoMatchers.Data(b'abcdefgh' * 10)) 216 217 @metadata(pts_test_id="IAL/CIS/FRA/SLA/BV-26-C", 218 pts_test_name="connected isochronous stream, framed data, peripheral role") 219 def test_iso_cis_fra_sla_bv_26_c(self): 220 self.skip_if_iso_not_supported() 221 """ 222 Verify that the IUT can send an SDU with length ≤ the Isochronous PDU length. 223 """ 224 cig_id = 0x01 225 sdu_interval_m_to_s = 0x14D5 226 sdu_interval_s_to_m = 0x14D5 227 peripherals_clock_accuracy = 0 228 packing = 0 229 framing = 1 230 max_transport_latency_m_to_s = 6 231 max_transport_latency_s_to_m = 6 232 cis_id = 0x01 233 max_sdu_m_to_s = 100 234 max_sdu_s_to_m = 100 235 phy_m_to_s = 0x02 236 phy_s_to_m = 0x02 237 bn_m_to_s = 1 238 bn_s_to_m = 1 239 240 self._setup_link_from_cert() 241 (dut_cis_stream, 242 cert_cis_stream) = self._setup_cis_from_cert(cig_id, sdu_interval_m_to_s, sdu_interval_s_to_m, 243 peripherals_clock_accuracy, packing, framing, 244 max_transport_latency_m_to_s, max_transport_latency_s_to_m, 245 cis_id, max_sdu_m_to_s, max_sdu_s_to_m, phy_m_to_s, phy_s_to_m, 246 bn_m_to_s, bn_s_to_m) 247 dut_cis_stream.send(b'abcdefgh' * 10) 248 assertThat(cert_cis_stream).emits(IsoMatchers.Data(b'abcdefgh' * 10)) 249 250 251if __name__ == '__main__': 252 test_runner.main() 253