1#!/usr/bin/env python3 2# 3# Copyright 2021 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import queue 18import logging 19 20from google.protobuf import empty_pb2 as empty_proto 21 22import hci_packets as hci 23from blueberry.facade.hci import le_advertising_manager_facade_pb2 as le_advertising_facade 24from blueberry.facade.hci import le_initiator_address_facade_pb2 as le_initiator_address_facade 25from blueberry.facade import common_pb2 as common 26from blueberry.tests.gd.cert.truth import assertThat 27from blueberry.tests.gd_sl4a.lib import gd_sl4a_base_test 28from blueberry.tests.gd_sl4a.lib.bt_constants import ble_scan_settings_modes, ble_address_types, scan_result, ble_scan_settings_phys, ble_scan_settings_callback_types 29from blueberry.tests.gd_sl4a.lib.ble_lib import generate_ble_scan_objects 30 31from mobly import test_runner 32 33 34class LeAdvancedScanningTest(gd_sl4a_base_test.GdSl4aBaseTestClass): 35 36 def setup_class(self): 37 super().setup_class(cert_module='HCI_INTERFACES') 38 self.default_timeout = 60 # seconds 39 40 def setup_test(self): 41 super().setup_test() 42 43 def teardown_test(self): 44 super().teardown_test() 45 46 def __get_test_irk(self): 47 return bytes( 48 bytearray([0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10])) 49 50 def _set_cert_privacy_policy_with_random_address(self, random_address): 51 private_policy = le_initiator_address_facade.PrivacyPolicy( 52 address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS, 53 address_with_type=common.BluetoothAddressWithType( 54 address=common.BluetoothAddress(address=bytes(random_address, encoding='utf8')), 55 type=common.RANDOM_DEVICE_ADDRESS)) 56 self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy) 57 58 def _set_cert_privacy_policy_with_random_address_but_advertise_resolvable(self, irk): 59 random_address_bytes = "DD:34:02:05:5C:EE".encode() 60 private_policy = le_initiator_address_facade.PrivacyPolicy( 61 address_policy=le_initiator_address_facade.AddressPolicy.USE_RESOLVABLE_ADDRESS, 62 address_with_type=common.BluetoothAddressWithType( 63 address=common.BluetoothAddress(address=random_address_bytes), type=common.RANDOM_DEVICE_ADDRESS), 64 rotation_irk=irk) 65 self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy) 66 # Bluetooth MAC address must be upper case 67 return random_address_bytes.decode('utf-8').upper() 68 69 def __advertise_rpa_random_policy(self, legacy_pdus, irk): 70 DEVICE_NAME = 'Im_The_CERT!' 71 logging.info("Getting public address") 72 ADDRESS = self._set_cert_privacy_policy_with_random_address_but_advertise_resolvable(irk) 73 logging.info("Done %s" % ADDRESS) 74 75 # Setup cert side to advertise 76 gap_name = hci.GapData(data_type=hci.GapDataType.COMPLETE_LOCAL_NAME, 77 data=list(bytes(DEVICE_NAME, encoding='utf8'))) 78 gap_data = le_advertising_facade.GapDataMsg(data=gap_name.serialize()) 79 config = le_advertising_facade.AdvertisingConfig( 80 advertisement=[gap_data], 81 interval_min=512, 82 interval_max=768, 83 advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, 84 own_address_type=common.USE_RANDOM_DEVICE_ADDRESS, 85 channel_map=7, 86 filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) 87 extended_config = le_advertising_facade.ExtendedAdvertisingConfig( 88 include_tx_power=True, 89 connectable=True, 90 legacy_pdus=legacy_pdus, 91 advertising_config=config, 92 secondary_advertising_phy=ble_scan_settings_phys["1m"]) 93 request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config) 94 logging.info("Creating %s PDU advertiser..." % ("Legacy" if legacy_pdus else "Extended")) 95 create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request) 96 logging.info("%s PDU advertiser created." % ("Legacy" if legacy_pdus else "Extended")) 97 return (ADDRESS, create_response) 98 99 def _advertise_rpa_random_legacy_pdu(self, irk): 100 return self.__advertise_rpa_random_policy(True, irk) 101 102 def _advertise_rpa_random_extended_pdu(self, irk): 103 return self.__advertise_rpa_random_policy(False, irk) 104 105 def _set_cert_privacy_policy_with_public_address(self): 106 public_address_bytes = self.cert.hci_controller.GetMacAddress(empty_proto.Empty()).address 107 private_policy = le_initiator_address_facade.PrivacyPolicy( 108 address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS, 109 address_with_type=common.BluetoothAddressWithType( 110 address=common.BluetoothAddress(address=public_address_bytes), type=common.PUBLIC_DEVICE_ADDRESS)) 111 self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy) 112 # Bluetooth MAC address must be upper case 113 return public_address_bytes.decode('utf-8').upper() 114 115 def _set_cert_privacy_policy_with_public_address_but_advertise_resolvable(self, irk): 116 public_address_bytes = self.cert.hci_controller.GetMacAddress(empty_proto.Empty()).address 117 private_policy = le_initiator_address_facade.PrivacyPolicy( 118 address_policy=le_initiator_address_facade.AddressPolicy.USE_RESOLVABLE_ADDRESS, 119 address_with_type=common.BluetoothAddressWithType( 120 address=common.BluetoothAddress(address=public_address_bytes), type=common.PUBLIC_DEVICE_ADDRESS), 121 rotation_irk=irk) 122 self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy) 123 # Bluetooth MAC address must be upper case 124 return public_address_bytes.decode('utf-8').upper() 125 126 def __advertise_rpa_public_policy(self, legacy_pdus, irk): 127 DEVICE_NAME = 'Im_The_CERT!' 128 logging.info("Getting public address") 129 ADDRESS = self._set_cert_privacy_policy_with_public_address_but_advertise_resolvable(irk) 130 logging.info("Done %s" % ADDRESS) 131 132 # Setup cert side to advertise 133 gap_name = hci.GapData(data_type=hci.GapDataType.COMPLETE_LOCAL_NAME, 134 data=list(bytes(DEVICE_NAME, encoding='utf8'))) 135 gap_data = le_advertising_facade.GapDataMsg(data=gap_name.serialize()) 136 config = le_advertising_facade.AdvertisingConfig( 137 advertisement=[gap_data], 138 interval_min=512, 139 interval_max=768, 140 advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, 141 own_address_type=common.USE_RANDOM_DEVICE_ADDRESS, 142 channel_map=7, 143 filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) 144 extended_config = le_advertising_facade.ExtendedAdvertisingConfig( 145 include_tx_power=True, 146 connectable=True, 147 legacy_pdus=legacy_pdus, 148 advertising_config=config, 149 secondary_advertising_phy=ble_scan_settings_phys["1m"]) 150 request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config) 151 logging.info("Creating %s PDU advertiser..." % ("Legacy" if legacy_pdus else "Extended")) 152 create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request) 153 logging.info("%s PDU advertiser created." % ("Legacy" if legacy_pdus else "Extended")) 154 return (ADDRESS, create_response) 155 156 def _advertise_rpa_public_legacy_pdu(self, irk): 157 return self.__advertise_rpa_public_policy(True, irk) 158 159 def _advertise_rpa_public_extended_pdu(self, irk): 160 return self.__advertise_rpa_public_policy(False, irk) 161 162 def _wait_for_scan_result_event(self, expected_event_name): 163 try: 164 # Verify if there is scan result 165 event_info = self.dut.ed.pop_event(expected_event_name, self.default_timeout) 166 # Print out scan result 167 mac_address = event_info['data']['Result']['deviceInfo']['address'] 168 logging.info("Filter advertisement with address {}".format(mac_address)) 169 return True 170 except queue.Empty as error: 171 logging.error("Could not find initial advertisement.") 172 return False 173 174 def _stop_advertising(self, advertiser_id): 175 logging.info("Stop advertising") 176 remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=advertiser_id) 177 self.cert.hci_le_advertising_manager.RemoveAdvertiser(remove_request) 178 logging.info("Stopped advertising") 179 180 def _stop_scanning(self, scan_callback): 181 logging.info("Stop scanning") 182 self.dut.sl4a.bleStopBleScan(scan_callback) 183 logging.info("Stopped scanning") 184 185 def test_scan_filter_device_public_address_with_irk_legacy_pdu(self): 186 """ 187 The cert side will advertise a RRPA generated from the test IRK using Legacy PDU 188 189 DUT will scan for the device using the Identity Address + Address Type + IRK 190 191 Results received via ScanCallback 192 """ 193 PUBLIC_ADDRESS, create_response = self._advertise_rpa_public_legacy_pdu(self.__get_test_irk()) 194 addr_type = ble_address_types["public"] 195 logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d and IRK %s" % 196 (PUBLIC_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8"))) 197 198 # Setup SL4A DUT side to scan 199 self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 200 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) 201 expected_event_name = scan_result.format(scan_callback) 202 203 # Start scanning on SL4A DUT side 204 self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(PUBLIC_ADDRESS, int(addr_type), 205 self.__get_test_irk().decode("utf-8")) 206 self.dut.sl4a.bleBuildScanFilter(filter_list) 207 self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) 208 logging.info("Started scanning") 209 210 # Wait for results 211 got_result = self._wait_for_scan_result_event(expected_event_name) 212 213 # Test over 214 self._stop_scanning(scan_callback) 215 self._stop_advertising(create_response.advertiser_id) 216 217 assertThat(got_result).isTrue() 218 219 def test_scan_filter_device_public_address_with_irk_legacy_pdu_pending_intent(self): 220 """ 221 The cert side will advertise a RRPA generated from the test IRK using Legacy PDU 222 223 DUT will scan for the device using the Identity Address + Address Type + IRK 224 225 Results received via PendingIntent 226 """ 227 PUBLIC_ADDRESS, create_response = self._advertise_rpa_public_legacy_pdu(self.__get_test_irk()) 228 addr_type = ble_address_types["public"] 229 logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d and IRK %s" % 230 (PUBLIC_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8"))) 231 232 # Setup SL4A DUT side to scan 233 self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 234 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) 235 # The event name needs to be set to this otherwise the index iterates from the scancallbacks 236 # being run consecutively. This is a PendingIntent callback but it hooks into the 237 # ScanCallback and uses just the 1 for the index. 238 expected_event_name = "BleScan1onScanResults" 239 240 # Start scanning on SL4A DUT side 241 self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(PUBLIC_ADDRESS, int(addr_type), 242 self.__get_test_irk().decode("utf-8")) 243 self.dut.sl4a.bleBuildScanFilter(filter_list) 244 self.dut.sl4a.bleStartBleScanPendingIntent(filter_list, scan_settings) 245 logging.info("Started scanning") 246 247 # Wait for results 248 got_result = self._wait_for_scan_result_event(expected_event_name) 249 250 # Test over 251 self._stop_scanning(scan_callback) 252 self._stop_advertising(create_response.advertiser_id) 253 254 assertThat(got_result).isTrue() 255 256 def test_scan_filter_device_public_address_with_irk_extended_pdu(self): 257 """ 258 The cert side will advertise a RRPA generated from the test IRK using Extended PDU 259 260 DUT will scan for the device using the Identity Address + Address Type + IRK 261 262 Results received via PendingIntent 263 """ 264 PUBLIC_ADDRESS, create_response = self._advertise_rpa_public_extended_pdu(self.__get_test_irk()) 265 addr_type = ble_address_types["public"] 266 logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d and IRK %s" % 267 (PUBLIC_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8"))) 268 269 # Setup SL4A DUT side to scan 270 self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 271 self.dut.sl4a.bleSetScanSettingsLegacy(False) 272 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) 273 expected_event_name = scan_result.format(scan_callback) 274 275 # Start scanning on SL4A DUT side 276 self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(PUBLIC_ADDRESS, int(addr_type), 277 self.__get_test_irk().decode("utf-8")) 278 self.dut.sl4a.bleBuildScanFilter(filter_list) 279 self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) 280 logging.info("Started scanning") 281 282 # Wait for results 283 got_result = self._wait_for_scan_result_event(expected_event_name) 284 285 # Test over 286 self._stop_scanning(scan_callback) 287 self._stop_advertising(create_response.advertiser_id) 288 289 assertThat(got_result).isTrue() 290 291 def test_scan_filter_device_name_legacy_pdu(self): 292 """ 293 The cert side will advertise using PUBLIC address and device name data on legacy PDU 294 295 DUT will scan for the device using the Device Name 296 297 Results received via ScanCallback 298 """ 299 # Use public address on cert side 300 logging.info("Setting public address") 301 DEVICE_NAME = 'Im_The_CERT!' 302 public_address = self._set_cert_privacy_policy_with_public_address() 303 logging.info("Set public address") 304 305 # Setup cert side to advertise 306 gap_name = hci.GapData(data_type=hci.GapDataType.COMPLETE_LOCAL_NAME, 307 data=list(bytes(DEVICE_NAME, encoding='utf8'))) 308 gap_data = le_advertising_facade.GapDataMsg(data=gap_name.serialize()) 309 config = le_advertising_facade.AdvertisingConfig( 310 advertisement=[gap_data], 311 interval_min=512, 312 interval_max=768, 313 advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, 314 own_address_type=common.USE_PUBLIC_DEVICE_ADDRESS, 315 channel_map=7, 316 filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES, 317 tx_power=20) 318 request = le_advertising_facade.CreateAdvertiserRequest(config=config) 319 logging.info("Creating advertiser") 320 create_response = self.cert.hci_le_advertising_manager.CreateAdvertiser(request) 321 logging.info("Created advertiser") 322 323 # Setup SL4A DUT side to scan 324 logging.info("Start scanning with public address %s" % public_address) 325 self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 326 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) 327 expected_event_name = scan_result.format(scan_callback) 328 329 # Start scanning on SL4A DUT side 330 self.dut.sl4a.bleSetScanFilterDeviceName(DEVICE_NAME) 331 self.dut.sl4a.bleBuildScanFilter(filter_list) 332 self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) 333 logging.info("Started scanning") 334 335 # Wait for results 336 got_result = self._wait_for_scan_result_event(expected_event_name) 337 338 # Test over 339 self._stop_scanning(scan_callback) 340 self._stop_advertising(create_response.advertiser_id) 341 342 assertThat(got_result).isTrue() 343 344 def test_scan_filter_device_random_address_legacy_pdu(self): 345 """ 346 The cert side will advertise using RANDOM STATIC address with legacy PDU 347 348 DUT will scan for the device using the RANDOM STATIC Address of the advertising device 349 350 Results received via ScanCallback 351 """ 352 # Use random address on cert side 353 logging.info("Setting random address") 354 RANDOM_ADDRESS = 'D0:05:04:03:02:01' 355 DEVICE_NAME = 'Im_The_CERT!' 356 self._set_cert_privacy_policy_with_random_address(RANDOM_ADDRESS) 357 logging.info("Set random address") 358 359 # Setup cert side to advertise 360 gap_name = hci.GapData(data_type=hci.GapDataType.COMPLETE_LOCAL_NAME, 361 data=list(bytes(DEVICE_NAME, encoding='utf8'))) 362 gap_data = le_advertising_facade.GapDataMsg(data=gap_name.serialize()) 363 config = le_advertising_facade.AdvertisingConfig( 364 advertisement=[gap_data], 365 interval_min=512, 366 interval_max=768, 367 advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, 368 own_address_type=common.USE_RANDOM_DEVICE_ADDRESS, 369 channel_map=7, 370 filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) 371 request = le_advertising_facade.CreateAdvertiserRequest(config=config) 372 logging.info("Creating advertiser") 373 create_response = self.cert.hci_le_advertising_manager.CreateAdvertiser(request) 374 logging.info("Created advertiser") 375 376 # Setup SL4A DUT side to scan 377 addr_type = ble_address_types["random"] 378 logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d" % (RANDOM_ADDRESS, addr_type)) 379 self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 380 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) 381 expected_event_name = scan_result.format(scan_callback) 382 383 # Start scanning on SL4A DUT side 384 self.dut.sl4a.bleSetScanFilterDeviceAddressAndType(RANDOM_ADDRESS, int(addr_type)) 385 self.dut.sl4a.bleBuildScanFilter(filter_list) 386 self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) 387 logging.info("Started scanning") 388 389 # Wait for results 390 got_result = self._wait_for_scan_result_event(expected_event_name) 391 392 # Test over 393 self._stop_scanning(scan_callback) 394 self._stop_advertising(create_response.advertiser_id) 395 396 assertThat(got_result).isTrue() 397 398 def test_scan_filter_device_public_address_extended_pdu(self): 399 """ 400 The cert side will advertise using PUBLIC address with Extended PDU 401 402 DUT will scan for the device using the PUBLIC Address of the advertising device 403 404 Results received via ScanCallback 405 """ 406 # Use public address on cert side 407 logging.info("Setting public address") 408 DEVICE_NAME = 'Im_The_CERT!' 409 public_address = self._set_cert_privacy_policy_with_public_address() 410 logging.info("Set public address") 411 412 # Setup cert side to advertise 413 gap_name = hci.GapData(data_type=hci.GapDataType.COMPLETE_LOCAL_NAME, 414 data=list(bytes(DEVICE_NAME, encoding='utf8'))) 415 gap_data = le_advertising_facade.GapDataMsg(data=gap_name.serialize()) 416 config = le_advertising_facade.AdvertisingConfig( 417 advertisement=[gap_data], 418 interval_min=512, 419 interval_max=768, 420 advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, 421 own_address_type=common.USE_PUBLIC_DEVICE_ADDRESS, 422 channel_map=7, 423 filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) 424 extended_config = le_advertising_facade.ExtendedAdvertisingConfig( 425 advertising_config=config, secondary_advertising_phy=ble_scan_settings_phys["1m"]) 426 request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config) 427 logging.info("Creating advertiser") 428 create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request) 429 logging.info("Created advertiser") 430 431 # Setup SL4A DUT side to scan 432 addr_type = ble_address_types["public"] 433 logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d" % (public_address, addr_type)) 434 self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 435 self.dut.sl4a.bleSetScanSettingsLegacy(False) 436 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) 437 expected_event_name = scan_result.format(scan_callback) 438 439 # Start scanning on SL4A DUT side 440 self.dut.sl4a.bleSetScanFilterDeviceAddressAndType(public_address, int(addr_type)) 441 self.dut.sl4a.bleBuildScanFilter(filter_list) 442 self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) 443 logging.info("Started scanning") 444 445 # Wait for results 446 got_result = self._wait_for_scan_result_event(expected_event_name) 447 448 # Test over 449 self._stop_scanning(scan_callback) 450 self._stop_advertising(create_response.advertiser_id) 451 452 assertThat(got_result).isTrue() 453 454 def test_scan_filter_device_public_address_with_irk_extended_pdu_pending_intent(self): 455 """ 456 The cert side will advertise using RRPA with Extended PDU 457 458 DUT will scan for the device using the pre-shared PUBLIC ADDRESS of the advertising 459 device + IRK 460 461 Results received via PendingIntent 462 """ 463 PUBLIC_ADDRESS, create_response = self._advertise_rpa_public_extended_pdu(self.__get_test_irk()) 464 465 # Setup SL4A DUT side to scan 466 addr_type = ble_address_types["public"] 467 logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d and IRK %s" % 468 (PUBLIC_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8"))) 469 self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 470 self.dut.sl4a.bleSetScanSettingsLegacy(False) 471 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) 472 # Hard code here since callback index iterates and will cause this to fail if ran 473 # Second as the impl in SL4A sends this since its a single callback for broadcast. 474 expected_event_name = "BleScan1onScanResults" 475 476 # Start scanning on SL4A DUT side 477 self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(PUBLIC_ADDRESS, int(addr_type), 478 self.__get_test_irk().decode("utf-8")) 479 self.dut.sl4a.bleBuildScanFilter(filter_list) 480 self.dut.sl4a.bleStartBleScanPendingIntent(filter_list, scan_settings) 481 logging.info("Started scanning") 482 483 # Wait for results 484 got_result = self._wait_for_scan_result_event(expected_event_name) 485 486 # Test over 487 self._stop_scanning(scan_callback) 488 self._stop_advertising(create_response.advertiser_id) 489 490 assertThat(got_result).isTrue() 491 492 def test_scan_filter_device_random_address_with_irk_extended_pdu(self): 493 """ 494 The cert side will advertise using RRPA with Extended PDU 495 496 DUT will scan for the device using the pre-shared RANDOM STATIC ADDRESS of the advertising 497 device + IRK 498 499 Results received via ScanCallback 500 """ 501 RANDOM_ADDRESS, create_response = self._advertise_rpa_random_extended_pdu(self.__get_test_irk()) 502 503 # Setup SL4A DUT side to scan 504 addr_type = ble_address_types["random"] 505 logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d and IRK %s" % 506 (RANDOM_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8"))) 507 self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 508 self.dut.sl4a.bleSetScanSettingsLegacy(False) 509 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) 510 expected_event_name = scan_result.format(scan_callback) 511 512 # Setup SL4A DUT filter 513 self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(RANDOM_ADDRESS, int(addr_type), 514 self.__get_test_irk().decode("utf-8")) 515 self.dut.sl4a.bleBuildScanFilter(filter_list) 516 517 # Start scanning on SL4A DUT side 518 self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) 519 logging.info("Started scanning") 520 521 # Wait for results 522 got_result = self._wait_for_scan_result_event(expected_event_name) 523 524 # Test over 525 self._stop_scanning(scan_callback) 526 self._stop_advertising(create_response.advertiser_id) 527 528 assertThat(got_result).isTrue() 529 530 def test_scan_filter_device_random_address_with_irk_extended_pdu_pending_intent(self): 531 """ 532 The cert side will advertise using RRPA with Extended PDU 533 534 DUT will scan for the device using the pre-shared RANDOM STATIC ADDRESS of the advertising 535 device + IRK 536 537 Results received via PendingIntent 538 """ 539 RANDOM_ADDRESS, create_response = self._advertise_rpa_random_extended_pdu(self.__get_test_irk()) 540 541 # Setup SL4A DUT side to scan 542 addr_type = ble_address_types["random"] 543 logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d and IRK %s" % 544 (RANDOM_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8"))) 545 self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 546 self.dut.sl4a.bleSetScanSettingsLegacy(False) 547 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) 548 # Hard code here since callback index iterates and will cause this to fail if ran 549 # Second as the impl in SL4A sends this since its a single callback for broadcast. 550 expected_event_name = "BleScan1onScanResults" 551 552 # Setup SL4A DUT filter 553 self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(RANDOM_ADDRESS, int(addr_type), 554 self.__get_test_irk().decode("utf-8")) 555 self.dut.sl4a.bleBuildScanFilter(filter_list) 556 557 # Start scanning on SL4A DUT side 558 self.dut.sl4a.bleStartBleScanPendingIntent(filter_list, scan_settings) 559 logging.info("Started scanning") 560 561 # Wait for results 562 got_result = self._wait_for_scan_result_event(expected_event_name) 563 564 # Test over 565 self._stop_scanning(scan_callback) 566 self._stop_advertising(create_response.advertiser_id) 567 568 assertThat(got_result).isTrue() 569 570 def test_scan_filter_device_random_address_with_irk_extended_pdu_scan_twice(self): 571 """ 572 The cert side will advertise using RRPA with Extended PDU 573 574 DUT will scan for the device using the pre-shared RANDOM STATIC ADDRESS of the advertising 575 device + IRK 576 577 DUT will stop scanning, then start scanning again for results 578 579 Results received via ScanCallback 580 """ 581 RANDOM_ADDRESS, create_response = self._advertise_rpa_random_extended_pdu(self.__get_test_irk()) 582 583 # Setup SL4A DUT side to scan 584 addr_type = ble_address_types["random"] 585 logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d and IRK %s" % 586 (RANDOM_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8"))) 587 self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 588 self.dut.sl4a.bleSetScanSettingsLegacy(False) 589 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) 590 expected_event_name = scan_result.format(scan_callback) 591 592 # Start scanning on SL4A DUT side 593 self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(RANDOM_ADDRESS, int(addr_type), 594 self.__get_test_irk().decode("utf-8")) 595 self.dut.sl4a.bleBuildScanFilter(filter_list) 596 self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) 597 logging.info("Started scanning") 598 599 # Wait for results 600 got_result = self._wait_for_scan_result_event(expected_event_name) 601 602 # Test over 603 self._stop_scanning(scan_callback) 604 605 # Start scanning on SL4A DUT side 606 self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) 607 logging.info("Started scanning...again") 608 609 # Wait for results 610 got_result = self._wait_for_scan_result_event(expected_event_name) 611 612 # Test over 613 self._stop_scanning(scan_callback) 614 self._stop_advertising(create_response.advertiser_id) 615 616 assertThat(got_result).isTrue() 617 618 def test_scan_filter_device_random_address_with_irk_extended_pdu_pending_intent_128_640(self): 619 """ 620 The CERT side will advertise an RPA derived from the IRK. 621 622 The DUT (SL4A) side will scan for a RPA with matching IRK. 623 624 Adjust the scan intervals to Digital Carkey specific timings. 625 """ 626 DEVICE_NAME = 'Im_The_CERT!' 627 logging.info("Getting public address") 628 RANDOM_ADDRESS = self._set_cert_privacy_policy_with_random_address_but_advertise_resolvable( 629 self.__get_test_irk()) 630 logging.info("Done %s" % RANDOM_ADDRESS) 631 632 legacy_pdus = False 633 634 # Setup cert side to advertise 635 gap_name = hci.GapData(data_type=hci.GapDataType.COMPLETE_LOCAL_NAME, 636 data=list(bytes(DEVICE_NAME, encoding='utf8'))) 637 gap_data = le_advertising_facade.GapDataMsg(data=gap_name.serialize()) 638 config = le_advertising_facade.AdvertisingConfig( 639 advertisement=[gap_data], 640 interval_min=128, 641 interval_max=640, 642 advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, 643 own_address_type=common.USE_RANDOM_DEVICE_ADDRESS, 644 channel_map=7, 645 filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) 646 extended_config = le_advertising_facade.ExtendedAdvertisingConfig( 647 include_tx_power=True, 648 connectable=True, 649 legacy_pdus=legacy_pdus, 650 advertising_config=config, 651 secondary_advertising_phy=ble_scan_settings_phys["1m"]) 652 request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config) 653 logging.info("Creating %s PDU advertiser..." % ("Legacy" if legacy_pdus else "Extended")) 654 create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request) 655 logging.info("%s PDU advertiser created." % ("Legacy" if legacy_pdus else "Extended")) 656 657 # Setup SL4A DUT side to scan 658 addr_type = ble_address_types["random"] 659 logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d and IRK %s" % 660 (RANDOM_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8"))) 661 self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['ambient_discovery']) 662 self.dut.sl4a.bleSetScanSettingsLegacy(False) 663 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) 664 # Hard code here since callback index iterates and will cause this to fail if ran 665 # Second as the impl in SL4A sends this since its a single callback for broadcast. 666 expected_event_name = "BleScan1onScanResults" 667 668 # Start scanning on SL4A DUT side 669 self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(RANDOM_ADDRESS, int(addr_type), 670 self.__get_test_irk().decode("utf-8")) 671 self.dut.sl4a.bleBuildScanFilter(filter_list) 672 self.dut.sl4a.bleStartBleScanPendingIntent(filter_list, scan_settings) 673 logging.info("Started scanning") 674 675 # Wait for results 676 got_result = self._wait_for_scan_result_event(expected_event_name) 677 678 # Test over 679 self._stop_scanning(scan_callback) 680 self._stop_advertising(create_response.advertiser_id) 681 682 assertThat(got_result).isTrue() 683 684 def test_scan_filter_lost_random_address_with_irk(self): 685 RANDOM_ADDRESS, create_response = self._advertise_rpa_random_extended_pdu(self.__get_test_irk()) 686 687 # Setup SL4A DUT side to scan 688 addr_type = ble_address_types["random"] 689 logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d and IRK %s" % 690 (RANDOM_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8"))) 691 self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 692 self.dut.sl4a.bleSetScanSettingsLegacy(False) 693 self.dut.sl4a.bleSetScanSettingsCallbackType(ble_scan_settings_callback_types['found_and_lost']) 694 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) 695 expected_event_name = scan_result.format(scan_callback) 696 697 # Start scanning on SL4A DUT side 698 self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(RANDOM_ADDRESS, int(addr_type), 699 self.__get_test_irk().decode("utf-8")) 700 self.dut.sl4a.bleBuildScanFilter(filter_list) 701 self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) 702 logging.info("Started scanning") 703 704 # Wait for found event to ensure scanning is started before stopping the advertiser 705 # to trigger lost event, else the lost event might not be caught by the test 706 got_found_result = self._wait_for_scan_result_event(expected_event_name) 707 assertThat(got_found_result).isTrue() 708 709 self._stop_advertising(create_response.advertiser_id) 710 711 # Wait for lost event 712 got_lost_result = self._wait_for_scan_result_event(expected_event_name) 713 assertThat(got_lost_result).isTrue() 714 715 # Test over 716 self._stop_scanning(scan_callback) 717 718 719if __name__ == '__main__': 720 test_runner.main() 721