1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from blueberry.tests.gd.cert import gd_base_test 18from blueberry.tests.gd.cert.closable import safeClose 19from blueberry.tests.gd.cert.truth import assertThat 20from blueberry.tests.gd.cert.py_hci import PyHci, PyHciAdvertisement 21from blueberry.tests.gd.cert.py_le_acl_manager import PyLeAclManager 22from blueberry.facade import common_pb2 as common 23from blueberry.facade.hci import le_acl_manager_facade_pb2 as le_acl_manager_facade 24from blueberry.facade.hci import le_advertising_manager_facade_pb2 as le_advertising_facade 25from blueberry.facade.hci import le_initiator_address_facade_pb2 as le_initiator_address_facade 26from blueberry.facade.hci import hci_facade_pb2 as hci_facade 27from mobly import test_runner 28import hci_packets as hci 29 30 31class LeAclManagerTest(gd_base_test.GdBaseTestClass): 32 33 def setup_class(self): 34 gd_base_test.GdBaseTestClass.setup_class(self, dut_module='HCI_INTERFACES', cert_module='HCI') 35 36 def setup_test(self): 37 gd_base_test.GdBaseTestClass.setup_test(self) 38 self.cert_hci = PyHci(self.cert, acl_streaming=True) 39 self.dut_le_acl_manager = PyLeAclManager(self.dut) 40 self.cert_public_address = self.cert_hci.read_own_address() 41 self.dut_public_address = self.dut.hci_controller.GetMacAddressSimple().decode("utf-8") 42 self.dut_random_address = 'd0:05:04:03:02:01' 43 self.cert_random_address = 'c0:05:04:03:02:01' 44 45 def teardown_test(self): 46 safeClose(self.dut_le_acl_manager) 47 self.cert_hci.close() 48 gd_base_test.GdBaseTestClass.teardown_test(self) 49 50 def set_privacy_policy_static(self): 51 private_policy = le_initiator_address_facade.PrivacyPolicy( 52 address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS, 53 address_with_type=common.BluetoothAddressWithType( 54 address=common.BluetoothAddress(address=bytes(self.dut_random_address, "utf-8")), 55 type=common.RANDOM_DEVICE_ADDRESS)) 56 self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy) 57 58 def register_for_event(self, event_code): 59 msg = hci_facade.EventRequest(code=int(event_code)) 60 self.cert.hci.RequestEvent(msg) 61 62 def register_for_le_event(self, event_code): 63 msg = hci_facade.EventRequest(code=int(event_code)) 64 self.cert.hci.RequestLeSubevent(msg) 65 66 def enqueue_hci_command(self, command): 67 cmd_bytes = bytes(command.serialize()) 68 cmd = common.Data(payload=cmd_bytes) 69 self.cert.hci.SendCommand(cmd) 70 71 def enqueue_acl_data(self, handle, pb_flag, b_flag, data): 72 acl = hci.Acl(handle=handle, packet_boundary_flag=pb_flag, broadcast_flag=b_flag, payload=data) 73 self.cert.hci.SendAcl(common.Data(payload=acl.serialize())) 74 75 def dut_connects(self): 76 # Cert Advertises 77 advertising_handle = 0 78 py_hci_adv = PyHciAdvertisement(advertising_handle, self.cert_hci) 79 80 self.cert_hci.create_advertisement( 81 advertising_handle, 82 self.cert_random_address, 83 hci.LegacyAdvertisingEventProperties.ADV_IND, 84 ) 85 86 py_hci_adv.set_data(b'Im_A_Cert') 87 py_hci_adv.set_scan_response(b'Im_A_C') 88 py_hci_adv.start() 89 90 dut_le_acl = self.dut_le_acl_manager.connect_to_remote( 91 remote_addr=common.BluetoothAddressWithType(address=common.BluetoothAddress( 92 address=bytes(self.cert_random_address, 'utf8')), 93 type=int(hci.AddressType.RANDOM_DEVICE_ADDRESS))) 94 95 cert_le_acl = self.cert_hci.incoming_le_connection() 96 return dut_le_acl, cert_le_acl 97 98 def cert_advertises_resolvable(self): 99 self.cert_hci.add_device_to_resolving_list(hci.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, 100 self.dut_public_address, 101 b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f', 102 b'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f') 103 104 # Cert Advertises 105 advertising_handle = 0 106 py_hci_adv = PyHciAdvertisement(advertising_handle, self.cert_hci) 107 108 self.cert_hci.create_advertisement(advertising_handle, 109 self.cert_random_address, 110 hci.LegacyAdvertisingEventProperties.ADV_IND, 111 own_address_type=hci.OwnAddressType.RESOLVABLE_OR_PUBLIC_ADDRESS, 112 peer_address=self.dut_public_address, 113 peer_address_type=hci.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS) 114 115 py_hci_adv.set_data(b'Im_A_Cert') 116 py_hci_adv.set_scan_response(b'Im_A_C') 117 py_hci_adv.start() 118 119 def dut_connects_cert_resolvable(self): 120 self.dut.hci_le_acl_manager.AddDeviceToResolvingList( 121 le_acl_manager_facade.IrkMsg( 122 peer=common.BluetoothAddressWithType( 123 address=common.BluetoothAddress(address=repr(self.cert_public_address).encode('utf-8')), 124 type=int(hci.AddressType.PUBLIC_DEVICE_ADDRESS)), 125 peer_irk=b'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f', 126 local_irk=b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f', 127 )) 128 129 dut_le_acl = self.dut_le_acl_manager.connect_to_remote( 130 remote_addr=common.BluetoothAddressWithType(address=common.BluetoothAddress( 131 address=repr(self.cert_public_address).encode('utf-8')), 132 type=int(hci.AddressType.PUBLIC_DEVICE_ADDRESS))) 133 134 cert_le_acl = self.cert_hci.incoming_le_connection() 135 return dut_le_acl, cert_le_acl 136 137 def send_receive_and_check(self, dut_le_acl, cert_le_acl): 138 self.enqueue_acl_data(cert_le_acl.handle, hci.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, 139 hci.BroadcastFlag.POINT_TO_POINT, bytes(b'\x19\x00\x07\x00SomeAclData from the Cert')) 140 141 dut_le_acl.send(b'\x1C\x00\x07\x00SomeMoreAclData from the DUT') 142 assertThat(cert_le_acl.our_acl_stream).emits(lambda packet: b'SomeMoreAclData' in packet.payload) 143 assertThat(dut_le_acl).emits(lambda packet: b'SomeAclData' in packet.payload) 144 145 def test_dut_connects(self): 146 self.set_privacy_policy_static() 147 dut_le_acl, cert_le_acl = self.dut_connects() 148 149 assertThat(cert_le_acl.handle).isNotNone() 150 assertThat(cert_le_acl.peer).isEqualTo(self.dut_random_address) 151 assertThat(cert_le_acl.peer_type).isEqualTo(hci.AddressType.RANDOM_DEVICE_ADDRESS) 152 153 assertThat(dut_le_acl.handle).isNotNone() 154 assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address) 155 assertThat(dut_le_acl.remote_address_type).isEqualTo(hci.AddressType.RANDOM_DEVICE_ADDRESS) 156 157 self.send_receive_and_check(dut_le_acl, cert_le_acl) 158 159 def test_dut_connects_resolvable_address(self): 160 privacy_policy = le_initiator_address_facade.PrivacyPolicy( 161 address_policy=le_initiator_address_facade.AddressPolicy.USE_RESOLVABLE_ADDRESS, 162 rotation_irk=b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f', 163 minimum_rotation_time=7 * 60 * 1000, 164 maximum_rotation_time=15 * 60 * 1000) 165 self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy) 166 dut_le_acl, cert_le_acl = self.dut_connects() 167 168 assertThat(cert_le_acl.handle).isNotNone() 169 assertThat(cert_le_acl.peer).isNotEqualTo(self.dut_public_address) 170 assertThat(cert_le_acl.peer).isNotEqualTo(self.dut_random_address) 171 assertThat(cert_le_acl.peer_type).isEqualTo(hci.AddressType.RANDOM_DEVICE_ADDRESS) 172 173 assertThat(dut_le_acl.handle).isNotNone() 174 assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address) 175 assertThat(dut_le_acl.remote_address_type).isEqualTo(hci.AddressType.RANDOM_DEVICE_ADDRESS) 176 177 self.send_receive_and_check(dut_le_acl, cert_le_acl) 178 179 def test_dut_connects_resolvable_address_public(self): 180 privacy_policy = le_initiator_address_facade.PrivacyPolicy( 181 address_policy=le_initiator_address_facade.AddressPolicy.USE_RESOLVABLE_ADDRESS, 182 rotation_irk=b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f', 183 minimum_rotation_time=7 * 60 * 1000, 184 maximum_rotation_time=15 * 60 * 1000) 185 self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy) 186 self.cert_advertises_resolvable() 187 dut_le_acl, cert_le_acl = self.dut_connects_cert_resolvable() 188 189 assertThat(cert_le_acl.handle).isNotNone() 190 assertThat(cert_le_acl.peer).isNotEqualTo(self.dut_public_address) 191 assertThat(cert_le_acl.peer).isNotEqualTo(self.dut_random_address) 192 assertThat(cert_le_acl.peer_type).isEqualTo(hci.AddressType.RANDOM_DEVICE_ADDRESS) 193 194 assertThat(dut_le_acl.handle).isNotNone() 195 assertThat(dut_le_acl.remote_address).isEqualTo(repr(self.cert_public_address)) 196 assertThat(dut_le_acl.remote_address_type).isEqualTo(hci.AddressType.PUBLIC_DEVICE_ADDRESS) 197 198 self.send_receive_and_check(dut_le_acl, cert_le_acl) 199 200 def test_dut_connects_non_resolvable_address(self): 201 privacy_policy = le_initiator_address_facade.PrivacyPolicy( 202 address_policy=le_initiator_address_facade.AddressPolicy.USE_NON_RESOLVABLE_ADDRESS, 203 rotation_irk=b'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f', 204 minimum_rotation_time=8 * 60 * 1000, 205 maximum_rotation_time=14 * 60 * 1000) 206 self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(privacy_policy) 207 dut_le_acl, cert_le_acl = self.dut_connects() 208 209 assertThat(cert_le_acl.handle).isNotNone() 210 assertThat(cert_le_acl.peer).isNotEqualTo(self.dut_public_address) 211 assertThat(cert_le_acl.peer).isNotEqualTo(self.dut_random_address) 212 assertThat(cert_le_acl.peer_type).isEqualTo(hci.AddressType.RANDOM_DEVICE_ADDRESS) 213 214 assertThat(dut_le_acl.handle).isNotNone() 215 assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address) 216 assertThat(dut_le_acl.remote_address_type).isEqualTo(hci.AddressType.RANDOM_DEVICE_ADDRESS) 217 218 self.send_receive_and_check(dut_le_acl, cert_le_acl) 219 220 def test_dut_connects_public_address(self): 221 self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress( 222 le_initiator_address_facade.PrivacyPolicy( 223 address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS)) 224 dut_le_acl, cert_le_acl = self.dut_connects() 225 226 assertThat(cert_le_acl.handle).isNotNone() 227 assertThat(cert_le_acl.peer).isEqualTo(self.dut_public_address) 228 assertThat(cert_le_acl.peer_type).isEqualTo(hci.AddressType.PUBLIC_DEVICE_ADDRESS) 229 230 assertThat(dut_le_acl.handle).isNotNone() 231 assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address) 232 assertThat(dut_le_acl.remote_address_type).isEqualTo(hci.AddressType.RANDOM_DEVICE_ADDRESS) 233 234 self.send_receive_and_check(dut_le_acl, cert_le_acl) 235 236 def test_dut_connects_public_address_cancelled(self): 237 # TODO (Add cancel) 238 self.dut.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress( 239 le_initiator_address_facade.PrivacyPolicy( 240 address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS)) 241 dut_le_acl, cert_le_acl = self.dut_connects() 242 243 assertThat(cert_le_acl.handle).isNotNone() 244 assertThat(cert_le_acl.peer).isEqualTo(self.dut_public_address) 245 assertThat(cert_le_acl.peer_type).isEqualTo(hci.AddressType.PUBLIC_DEVICE_ADDRESS) 246 247 assertThat(dut_le_acl.handle).isNotNone() 248 assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address) 249 assertThat(dut_le_acl.remote_address_type).isEqualTo(hci.AddressType.RANDOM_DEVICE_ADDRESS) 250 251 self.send_receive_and_check(dut_le_acl, cert_le_acl) 252 253 def test_cert_connects(self): 254 self.set_privacy_policy_static() 255 self.dut_le_acl_manager.listen_for_incoming_connections() 256 257 # DUT Advertises 258 gap_name = hci.GapData(data_type=hci.GapDataType.COMPLETE_LOCAL_NAME, data=list(bytes(b'Im_The_DUT'))) 259 gap_data = le_advertising_facade.GapDataMsg(data=gap_name.serialize()) 260 config = le_advertising_facade.AdvertisingConfig( 261 advertisement=[gap_data], 262 interval_min=512, 263 interval_max=768, 264 advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, 265 own_address_type=common.USE_RANDOM_DEVICE_ADDRESS, 266 peer_address_type=common.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, 267 peer_address=common.BluetoothAddress(address=bytes(b'A6:A5:A4:A3:A2:A1')), 268 channel_map=7, 269 filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) 270 request = le_advertising_facade.CreateAdvertiserRequest(config=config) 271 272 self.dut.hci_le_advertising_manager.CreateAdvertiser(request) 273 274 # Cert Connects 275 self.cert_hci.set_random_le_address(self.cert_random_address) 276 self.cert_hci.initiate_le_connection(self.dut_random_address) 277 278 # Cert gets ConnectionComplete with a handle and sends ACL data 279 cert_le_acl = self.cert_hci.incoming_le_connection() 280 281 cert_le_acl.send(hci.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, hci.BroadcastFlag.POINT_TO_POINT, 282 b'\x19\x00\x07\x00SomeAclData from the Cert') 283 284 # DUT gets a connection complete event and sends and receives 285 dut_le_acl = self.dut_le_acl_manager.complete_incoming_connection() 286 assertThat(cert_le_acl.handle).isNotNone() 287 assertThat(cert_le_acl.peer).isEqualTo(self.dut_random_address) 288 assertThat(cert_le_acl.peer_type).isEqualTo(hci.AddressType.RANDOM_DEVICE_ADDRESS) 289 290 assertThat(dut_le_acl.handle).isNotNone() 291 assertThat(dut_le_acl.remote_address).isEqualTo(self.cert_random_address) 292 assertThat(dut_le_acl.remote_address_type).isEqualTo(hci.AddressType.RANDOM_DEVICE_ADDRESS) 293 294 self.send_receive_and_check(dut_le_acl, cert_le_acl) 295 296 def test_recombination_l2cap_packet(self): 297 self.set_privacy_policy_static() 298 dut_le_acl, cert_le_acl = self.dut_connects() 299 cert_handle = cert_le_acl.handle 300 self.enqueue_acl_data(cert_handle, hci.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, 301 hci.BroadcastFlag.POINT_TO_POINT, bytes(b'\x06\x00\x07\x00Hello')) 302 self.enqueue_acl_data(cert_handle, hci.PacketBoundaryFlag.CONTINUING_FRAGMENT, hci.BroadcastFlag.POINT_TO_POINT, 303 bytes(b'!')) 304 305 assertThat(dut_le_acl).emits(lambda packet: b'Hello!' in packet.payload) 306 307 def test_background_connection(self): 308 self.set_privacy_policy_static() 309 310 # Start background and direct connection 311 token_direct = self.dut_le_acl_manager.initiate_connection( 312 remote_addr=common.BluetoothAddressWithType(address=common.BluetoothAddress( 313 address=bytes('0C:05:04:03:02:02', 'utf8')), 314 type=int(hci.AddressType.RANDOM_DEVICE_ADDRESS))) 315 316 token_background = self.dut_le_acl_manager.initiate_connection(remote_addr=common.BluetoothAddressWithType( 317 address=common.BluetoothAddress(address=bytes(self.cert_random_address, 'utf8')), 318 type=int(hci.AddressType.RANDOM_DEVICE_ADDRESS)), 319 is_direct=False) 320 321 # Wait for direct connection timeout 322 self.dut_le_acl_manager.wait_for_connection_fail(token_direct) 323 324 # Cert Advertises 325 advertising_handle = 0 326 327 py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, self.cert_random_address, 328 hci.LegacyAdvertisingEventProperties.ADV_IND, 155, 165) 329 330 py_hci_adv.set_data(b'Im_A_Cert') 331 py_hci_adv.set_scan_response(b'Im_A_C') 332 py_hci_adv.start() 333 334 # Check background connection complete 335 self.dut_le_acl_manager.complete_outgoing_connection(token_background) 336 337 def skip_flaky_test_multiple_background_connections(self): 338 self.set_privacy_policy_static() 339 340 # Start two background connections 341 token_1 = self.dut_le_acl_manager.initiate_connection(remote_addr=common.BluetoothAddressWithType( 342 address=common.BluetoothAddress(address=bytes(self.cert_random_address, 'utf8')), 343 type=int(hci.AddressType.RANDOM_DEVICE_ADDRESS)), 344 is_direct=False) 345 346 token_2 = self.dut_le_acl_manager.initiate_connection(remote_addr=common.BluetoothAddressWithType( 347 address=common.BluetoothAddress(address=bytes('0C:05:04:03:02:02', 'utf8')), 348 type=int(hci.AddressType.RANDOM_DEVICE_ADDRESS)), 349 is_direct=False) 350 351 # Cert Advertises 352 advertising_handle = 0 353 354 py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, self.cert_random_address, 355 hci.LegacyAdvertisingEventProperties.ADV_IND, 155, 165) 356 357 py_hci_adv.set_data(b'Im_A_Cert') 358 py_hci_adv.set_scan_response(b'Im_A_C') 359 py_hci_adv.start() 360 361 # First background connection completes 362 connection = self.dut_le_acl_manager.complete_outgoing_connection(token_1) 363 connection.close() 364 365 # Cert Advertises again 366 advertising_handle = 0 367 368 py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, '0C:05:04:03:02:02', 369 hci.LegacyAdvertisingEventProperties.ADV_IND, 155, 165) 370 371 py_hci_adv.set_data(b'Im_A_Cert') 372 py_hci_adv.set_scan_response(b'Im_A_C') 373 py_hci_adv.start() 374 375 # Second background connection completes 376 connection = self.dut_le_acl_manager.complete_outgoing_connection(token_2) 377 connection.close() 378 379 def test_direct_connection(self): 380 self.set_privacy_policy_static() 381 382 advertising_handle = 0 383 py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, self.cert_random_address, 384 hci.LegacyAdvertisingEventProperties.ADV_IND, 155, 165) 385 386 py_hci_adv.set_data(b'Im_A_Cert') 387 py_hci_adv.set_scan_response(b'Im_A_C') 388 py_hci_adv.start() 389 390 # Start direct connection 391 token = self.dut_le_acl_manager.initiate_connection(remote_addr=common.BluetoothAddressWithType( 392 address=common.BluetoothAddress(address=bytes(self.cert_random_address, 'utf8')), 393 type=int(hci.AddressType.RANDOM_DEVICE_ADDRESS)), 394 is_direct=True) 395 self.dut_le_acl_manager.complete_outgoing_connection(token) 396 397 def test_background_connection_list(self): 398 self.set_privacy_policy_static() 399 400 # Start background connection 401 token_background = self.dut_le_acl_manager.initiate_connection(remote_addr=common.BluetoothAddressWithType( 402 address=common.BluetoothAddress(address=bytes(self.cert_random_address, 'utf8')), 403 type=int(hci.AddressType.RANDOM_DEVICE_ADDRESS)), 404 is_direct=False) 405 406 # Cert Advertises 407 advertising_handle = 0 408 409 py_hci_adv = self.cert_hci.create_advertisement(advertising_handle, self.cert_random_address, 410 hci.LegacyAdvertisingEventProperties.ADV_IND, 155, 165) 411 412 py_hci_adv.set_data(b'Im_A_Cert') 413 py_hci_adv.set_scan_response(b'Im_A_C') 414 py_hci_adv.start() 415 416 # Check background connection complete 417 self.dut_le_acl_manager.complete_outgoing_connection(token_background) 418 419 msg = self.dut_le_acl_manager.is_on_background_list( 420 remote_addr=common.BluetoothAddressWithType(address=common.BluetoothAddress( 421 address=bytes(self.cert_random_address, 'utf8')), 422 type=int(hci.AddressType.RANDOM_DEVICE_ADDRESS))) 423 assertThat(msg.is_on_background_list).isEqualTo(True) 424 425 self.dut_le_acl_manager.remove_from_background_list( 426 remote_addr=common.BluetoothAddressWithType(address=common.BluetoothAddress( 427 address=bytes(self.cert_random_address, 'utf8')), 428 type=int(hci.AddressType.RANDOM_DEVICE_ADDRESS))) 429 430 msg = self.dut_le_acl_manager.is_on_background_list( 431 remote_addr=common.BluetoothAddressWithType(address=common.BluetoothAddress( 432 address=bytes(self.cert_random_address, 'utf8')), 433 type=int(hci.AddressType.RANDOM_DEVICE_ADDRESS))) 434 assertThat(msg.is_on_background_list).isEqualTo(False) 435 436 437if __name__ == '__main__': 438 test_runner.main() 439