1#!/usr/bin/env python3
2#
3#   Copyright 2021 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import queue
18import logging
19
20from google.protobuf import empty_pb2 as empty_proto
21
22import hci_packets as hci
23from blueberry.facade.hci import le_advertising_manager_facade_pb2 as le_advertising_facade
24from blueberry.facade.hci import le_initiator_address_facade_pb2 as le_initiator_address_facade
25from blueberry.facade import common_pb2 as common
26from blueberry.tests.gd.cert.truth import assertThat
27from blueberry.tests.gd_sl4a.lib import gd_sl4a_base_test
28from blueberry.tests.gd_sl4a.lib.bt_constants import ble_scan_settings_modes, ble_address_types, scan_result, ble_scan_settings_phys, ble_scan_settings_callback_types
29from blueberry.tests.gd_sl4a.lib.ble_lib import generate_ble_scan_objects
30
31from mobly import test_runner
32
33
34class LeAdvancedScanningTest(gd_sl4a_base_test.GdSl4aBaseTestClass):
35
36    def setup_class(self):
37        super().setup_class(cert_module='HCI_INTERFACES')
38        self.default_timeout = 60  # seconds
39
40    def setup_test(self):
41        super().setup_test()
42
43    def teardown_test(self):
44        super().teardown_test()
45
46    def __get_test_irk(self):
47        return bytes(
48            bytearray([0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10]))
49
50    def _set_cert_privacy_policy_with_random_address(self, random_address):
51        private_policy = le_initiator_address_facade.PrivacyPolicy(
52            address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS,
53            address_with_type=common.BluetoothAddressWithType(
54                address=common.BluetoothAddress(address=bytes(random_address, encoding='utf8')),
55                type=common.RANDOM_DEVICE_ADDRESS))
56        self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy)
57
58    def _set_cert_privacy_policy_with_random_address_but_advertise_resolvable(self, irk):
59        random_address_bytes = "DD:34:02:05:5C:EE".encode()
60        private_policy = le_initiator_address_facade.PrivacyPolicy(
61            address_policy=le_initiator_address_facade.AddressPolicy.USE_RESOLVABLE_ADDRESS,
62            address_with_type=common.BluetoothAddressWithType(
63                address=common.BluetoothAddress(address=random_address_bytes), type=common.RANDOM_DEVICE_ADDRESS),
64            rotation_irk=irk)
65        self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy)
66        # Bluetooth MAC address must be upper case
67        return random_address_bytes.decode('utf-8').upper()
68
69    def __advertise_rpa_random_policy(self, legacy_pdus, irk):
70        DEVICE_NAME = 'Im_The_CERT!'
71        logging.info("Getting public address")
72        ADDRESS = self._set_cert_privacy_policy_with_random_address_but_advertise_resolvable(irk)
73        logging.info("Done %s" % ADDRESS)
74
75        # Setup cert side to advertise
76        gap_name = hci.GapData(data_type=hci.GapDataType.COMPLETE_LOCAL_NAME,
77                               data=list(bytes(DEVICE_NAME, encoding='utf8')))
78        gap_data = le_advertising_facade.GapDataMsg(data=gap_name.serialize())
79        config = le_advertising_facade.AdvertisingConfig(
80            advertisement=[gap_data],
81            interval_min=512,
82            interval_max=768,
83            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
84            own_address_type=common.USE_RANDOM_DEVICE_ADDRESS,
85            channel_map=7,
86            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
87        extended_config = le_advertising_facade.ExtendedAdvertisingConfig(
88            include_tx_power=True,
89            connectable=True,
90            legacy_pdus=legacy_pdus,
91            advertising_config=config,
92            secondary_advertising_phy=ble_scan_settings_phys["1m"])
93        request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config)
94        logging.info("Creating %s PDU advertiser..." % ("Legacy" if legacy_pdus else "Extended"))
95        create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request)
96        logging.info("%s PDU advertiser created." % ("Legacy" if legacy_pdus else "Extended"))
97        return (ADDRESS, create_response)
98
99    def _advertise_rpa_random_legacy_pdu(self, irk):
100        return self.__advertise_rpa_random_policy(True, irk)
101
102    def _advertise_rpa_random_extended_pdu(self, irk):
103        return self.__advertise_rpa_random_policy(False, irk)
104
105    def _set_cert_privacy_policy_with_public_address(self):
106        public_address_bytes = self.cert.hci_controller.GetMacAddress(empty_proto.Empty()).address
107        private_policy = le_initiator_address_facade.PrivacyPolicy(
108            address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS,
109            address_with_type=common.BluetoothAddressWithType(
110                address=common.BluetoothAddress(address=public_address_bytes), type=common.PUBLIC_DEVICE_ADDRESS))
111        self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy)
112        # Bluetooth MAC address must be upper case
113        return public_address_bytes.decode('utf-8').upper()
114
115    def _set_cert_privacy_policy_with_public_address_but_advertise_resolvable(self, irk):
116        public_address_bytes = self.cert.hci_controller.GetMacAddress(empty_proto.Empty()).address
117        private_policy = le_initiator_address_facade.PrivacyPolicy(
118            address_policy=le_initiator_address_facade.AddressPolicy.USE_RESOLVABLE_ADDRESS,
119            address_with_type=common.BluetoothAddressWithType(
120                address=common.BluetoothAddress(address=public_address_bytes), type=common.PUBLIC_DEVICE_ADDRESS),
121            rotation_irk=irk)
122        self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy)
123        # Bluetooth MAC address must be upper case
124        return public_address_bytes.decode('utf-8').upper()
125
126    def __advertise_rpa_public_policy(self, legacy_pdus, irk):
127        DEVICE_NAME = 'Im_The_CERT!'
128        logging.info("Getting public address")
129        ADDRESS = self._set_cert_privacy_policy_with_public_address_but_advertise_resolvable(irk)
130        logging.info("Done %s" % ADDRESS)
131
132        # Setup cert side to advertise
133        gap_name = hci.GapData(data_type=hci.GapDataType.COMPLETE_LOCAL_NAME,
134                               data=list(bytes(DEVICE_NAME, encoding='utf8')))
135        gap_data = le_advertising_facade.GapDataMsg(data=gap_name.serialize())
136        config = le_advertising_facade.AdvertisingConfig(
137            advertisement=[gap_data],
138            interval_min=512,
139            interval_max=768,
140            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
141            own_address_type=common.USE_RANDOM_DEVICE_ADDRESS,
142            channel_map=7,
143            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
144        extended_config = le_advertising_facade.ExtendedAdvertisingConfig(
145            include_tx_power=True,
146            connectable=True,
147            legacy_pdus=legacy_pdus,
148            advertising_config=config,
149            secondary_advertising_phy=ble_scan_settings_phys["1m"])
150        request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config)
151        logging.info("Creating %s PDU advertiser..." % ("Legacy" if legacy_pdus else "Extended"))
152        create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request)
153        logging.info("%s PDU advertiser created." % ("Legacy" if legacy_pdus else "Extended"))
154        return (ADDRESS, create_response)
155
156    def _advertise_rpa_public_legacy_pdu(self, irk):
157        return self.__advertise_rpa_public_policy(True, irk)
158
159    def _advertise_rpa_public_extended_pdu(self, irk):
160        return self.__advertise_rpa_public_policy(False, irk)
161
162    def _wait_for_scan_result_event(self, expected_event_name):
163        try:
164            # Verify if there is scan result
165            event_info = self.dut.ed.pop_event(expected_event_name, self.default_timeout)
166            # Print out scan result
167            mac_address = event_info['data']['Result']['deviceInfo']['address']
168            logging.info("Filter advertisement with address {}".format(mac_address))
169            return True
170        except queue.Empty as error:
171            logging.error("Could not find initial advertisement.")
172            return False
173
174    def _stop_advertising(self, advertiser_id):
175        logging.info("Stop advertising")
176        remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=advertiser_id)
177        self.cert.hci_le_advertising_manager.RemoveAdvertiser(remove_request)
178        logging.info("Stopped advertising")
179
180    def _stop_scanning(self, scan_callback):
181        logging.info("Stop scanning")
182        self.dut.sl4a.bleStopBleScan(scan_callback)
183        logging.info("Stopped scanning")
184
185    def test_scan_filter_device_public_address_with_irk_legacy_pdu(self):
186        """
187        The cert side will advertise a RRPA generated from the test IRK using Legacy PDU
188
189        DUT will scan for the device using the Identity Address + Address Type + IRK
190
191        Results received via ScanCallback
192        """
193        PUBLIC_ADDRESS, create_response = self._advertise_rpa_public_legacy_pdu(self.__get_test_irk())
194        addr_type = ble_address_types["public"]
195        logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d and IRK %s" %
196                     (PUBLIC_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8")))
197
198        # Setup SL4A DUT side to scan
199        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
200        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
201        expected_event_name = scan_result.format(scan_callback)
202
203        # Start scanning on SL4A DUT side
204        self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(PUBLIC_ADDRESS, int(addr_type),
205                                                              self.__get_test_irk().decode("utf-8"))
206        self.dut.sl4a.bleBuildScanFilter(filter_list)
207        self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
208        logging.info("Started scanning")
209
210        # Wait for results
211        got_result = self._wait_for_scan_result_event(expected_event_name)
212
213        # Test over
214        self._stop_scanning(scan_callback)
215        self._stop_advertising(create_response.advertiser_id)
216
217        assertThat(got_result).isTrue()
218
219    def test_scan_filter_device_public_address_with_irk_legacy_pdu_pending_intent(self):
220        """
221        The cert side will advertise a RRPA generated from the test IRK using Legacy PDU
222
223        DUT will scan for the device using the Identity Address + Address Type + IRK
224
225        Results received via PendingIntent
226        """
227        PUBLIC_ADDRESS, create_response = self._advertise_rpa_public_legacy_pdu(self.__get_test_irk())
228        addr_type = ble_address_types["public"]
229        logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d and IRK %s" %
230                     (PUBLIC_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8")))
231
232        # Setup SL4A DUT side to scan
233        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
234        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
235        # The event name needs to be set to this otherwise the index iterates from the scancallbacks
236        # being run consecutively.  This is a PendingIntent callback but it hooks into the
237        # ScanCallback and uses just the 1 for the index.
238        expected_event_name = "BleScan1onScanResults"
239
240        # Start scanning on SL4A DUT side
241        self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(PUBLIC_ADDRESS, int(addr_type),
242                                                              self.__get_test_irk().decode("utf-8"))
243        self.dut.sl4a.bleBuildScanFilter(filter_list)
244        self.dut.sl4a.bleStartBleScanPendingIntent(filter_list, scan_settings)
245        logging.info("Started scanning")
246
247        # Wait for results
248        got_result = self._wait_for_scan_result_event(expected_event_name)
249
250        # Test over
251        self._stop_scanning(scan_callback)
252        self._stop_advertising(create_response.advertiser_id)
253
254        assertThat(got_result).isTrue()
255
256    def test_scan_filter_device_public_address_with_irk_extended_pdu(self):
257        """
258        The cert side will advertise a RRPA generated from the test IRK using Extended PDU
259
260        DUT will scan for the device using the Identity Address + Address Type + IRK
261
262        Results received via PendingIntent
263        """
264        PUBLIC_ADDRESS, create_response = self._advertise_rpa_public_extended_pdu(self.__get_test_irk())
265        addr_type = ble_address_types["public"]
266        logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d and IRK %s" %
267                     (PUBLIC_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8")))
268
269        # Setup SL4A DUT side to scan
270        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
271        self.dut.sl4a.bleSetScanSettingsLegacy(False)
272        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
273        expected_event_name = scan_result.format(scan_callback)
274
275        # Start scanning on SL4A DUT side
276        self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(PUBLIC_ADDRESS, int(addr_type),
277                                                              self.__get_test_irk().decode("utf-8"))
278        self.dut.sl4a.bleBuildScanFilter(filter_list)
279        self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
280        logging.info("Started scanning")
281
282        # Wait for results
283        got_result = self._wait_for_scan_result_event(expected_event_name)
284
285        # Test over
286        self._stop_scanning(scan_callback)
287        self._stop_advertising(create_response.advertiser_id)
288
289        assertThat(got_result).isTrue()
290
291    def test_scan_filter_device_name_legacy_pdu(self):
292        """
293        The cert side will advertise using PUBLIC address and device name data on legacy PDU
294
295        DUT will scan for the device using the Device Name
296
297        Results received via ScanCallback
298        """
299        # Use public address on cert side
300        logging.info("Setting public address")
301        DEVICE_NAME = 'Im_The_CERT!'
302        public_address = self._set_cert_privacy_policy_with_public_address()
303        logging.info("Set public address")
304
305        # Setup cert side to advertise
306        gap_name = hci.GapData(data_type=hci.GapDataType.COMPLETE_LOCAL_NAME,
307                               data=list(bytes(DEVICE_NAME, encoding='utf8')))
308        gap_data = le_advertising_facade.GapDataMsg(data=gap_name.serialize())
309        config = le_advertising_facade.AdvertisingConfig(
310            advertisement=[gap_data],
311            interval_min=512,
312            interval_max=768,
313            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
314            own_address_type=common.USE_PUBLIC_DEVICE_ADDRESS,
315            channel_map=7,
316            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES,
317            tx_power=20)
318        request = le_advertising_facade.CreateAdvertiserRequest(config=config)
319        logging.info("Creating advertiser")
320        create_response = self.cert.hci_le_advertising_manager.CreateAdvertiser(request)
321        logging.info("Created advertiser")
322
323        # Setup SL4A DUT side to scan
324        logging.info("Start scanning with public address %s" % public_address)
325        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
326        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
327        expected_event_name = scan_result.format(scan_callback)
328
329        # Start scanning on SL4A DUT side
330        self.dut.sl4a.bleSetScanFilterDeviceName(DEVICE_NAME)
331        self.dut.sl4a.bleBuildScanFilter(filter_list)
332        self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
333        logging.info("Started scanning")
334
335        # Wait for results
336        got_result = self._wait_for_scan_result_event(expected_event_name)
337
338        # Test over
339        self._stop_scanning(scan_callback)
340        self._stop_advertising(create_response.advertiser_id)
341
342        assertThat(got_result).isTrue()
343
344    def test_scan_filter_device_random_address_legacy_pdu(self):
345        """
346        The cert side will advertise using RANDOM STATIC address with legacy PDU
347
348        DUT will scan for the device using the RANDOM STATIC Address of the advertising device
349
350        Results received via ScanCallback
351        """
352        # Use random address on cert side
353        logging.info("Setting random address")
354        RANDOM_ADDRESS = 'D0:05:04:03:02:01'
355        DEVICE_NAME = 'Im_The_CERT!'
356        self._set_cert_privacy_policy_with_random_address(RANDOM_ADDRESS)
357        logging.info("Set random address")
358
359        # Setup cert side to advertise
360        gap_name = hci.GapData(data_type=hci.GapDataType.COMPLETE_LOCAL_NAME,
361                               data=list(bytes(DEVICE_NAME, encoding='utf8')))
362        gap_data = le_advertising_facade.GapDataMsg(data=gap_name.serialize())
363        config = le_advertising_facade.AdvertisingConfig(
364            advertisement=[gap_data],
365            interval_min=512,
366            interval_max=768,
367            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
368            own_address_type=common.USE_RANDOM_DEVICE_ADDRESS,
369            channel_map=7,
370            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
371        request = le_advertising_facade.CreateAdvertiserRequest(config=config)
372        logging.info("Creating advertiser")
373        create_response = self.cert.hci_le_advertising_manager.CreateAdvertiser(request)
374        logging.info("Created advertiser")
375
376        # Setup SL4A DUT side to scan
377        addr_type = ble_address_types["random"]
378        logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d" % (RANDOM_ADDRESS, addr_type))
379        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
380        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
381        expected_event_name = scan_result.format(scan_callback)
382
383        # Start scanning on SL4A DUT side
384        self.dut.sl4a.bleSetScanFilterDeviceAddressAndType(RANDOM_ADDRESS, int(addr_type))
385        self.dut.sl4a.bleBuildScanFilter(filter_list)
386        self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
387        logging.info("Started scanning")
388
389        # Wait for results
390        got_result = self._wait_for_scan_result_event(expected_event_name)
391
392        # Test over
393        self._stop_scanning(scan_callback)
394        self._stop_advertising(create_response.advertiser_id)
395
396        assertThat(got_result).isTrue()
397
398    def test_scan_filter_device_public_address_extended_pdu(self):
399        """
400        The cert side will advertise using PUBLIC address with Extended PDU
401
402        DUT will scan for the device using the PUBLIC Address of the advertising device
403
404        Results received via ScanCallback
405        """
406        # Use public address on cert side
407        logging.info("Setting public address")
408        DEVICE_NAME = 'Im_The_CERT!'
409        public_address = self._set_cert_privacy_policy_with_public_address()
410        logging.info("Set public address")
411
412        # Setup cert side to advertise
413        gap_name = hci.GapData(data_type=hci.GapDataType.COMPLETE_LOCAL_NAME,
414                               data=list(bytes(DEVICE_NAME, encoding='utf8')))
415        gap_data = le_advertising_facade.GapDataMsg(data=gap_name.serialize())
416        config = le_advertising_facade.AdvertisingConfig(
417            advertisement=[gap_data],
418            interval_min=512,
419            interval_max=768,
420            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
421            own_address_type=common.USE_PUBLIC_DEVICE_ADDRESS,
422            channel_map=7,
423            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
424        extended_config = le_advertising_facade.ExtendedAdvertisingConfig(
425            advertising_config=config, secondary_advertising_phy=ble_scan_settings_phys["1m"])
426        request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config)
427        logging.info("Creating advertiser")
428        create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request)
429        logging.info("Created advertiser")
430
431        # Setup SL4A DUT side to scan
432        addr_type = ble_address_types["public"]
433        logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d" % (public_address, addr_type))
434        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
435        self.dut.sl4a.bleSetScanSettingsLegacy(False)
436        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
437        expected_event_name = scan_result.format(scan_callback)
438
439        # Start scanning on SL4A DUT side
440        self.dut.sl4a.bleSetScanFilterDeviceAddressAndType(public_address, int(addr_type))
441        self.dut.sl4a.bleBuildScanFilter(filter_list)
442        self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
443        logging.info("Started scanning")
444
445        # Wait for results
446        got_result = self._wait_for_scan_result_event(expected_event_name)
447
448        # Test over
449        self._stop_scanning(scan_callback)
450        self._stop_advertising(create_response.advertiser_id)
451
452        assertThat(got_result).isTrue()
453
454    def test_scan_filter_device_public_address_with_irk_extended_pdu_pending_intent(self):
455        """
456        The cert side will advertise using RRPA with Extended PDU
457
458        DUT will scan for the device using the pre-shared PUBLIC ADDRESS of the advertising
459        device + IRK
460
461        Results received via PendingIntent
462        """
463        PUBLIC_ADDRESS, create_response = self._advertise_rpa_public_extended_pdu(self.__get_test_irk())
464
465        # Setup SL4A DUT side to scan
466        addr_type = ble_address_types["public"]
467        logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d and IRK %s" %
468                     (PUBLIC_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8")))
469        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
470        self.dut.sl4a.bleSetScanSettingsLegacy(False)
471        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
472        # Hard code here since callback index iterates and will cause this to fail if ran
473        # Second as the impl in SL4A sends this since its a single callback for broadcast.
474        expected_event_name = "BleScan1onScanResults"
475
476        # Start scanning on SL4A DUT side
477        self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(PUBLIC_ADDRESS, int(addr_type),
478                                                              self.__get_test_irk().decode("utf-8"))
479        self.dut.sl4a.bleBuildScanFilter(filter_list)
480        self.dut.sl4a.bleStartBleScanPendingIntent(filter_list, scan_settings)
481        logging.info("Started scanning")
482
483        # Wait for results
484        got_result = self._wait_for_scan_result_event(expected_event_name)
485
486        # Test over
487        self._stop_scanning(scan_callback)
488        self._stop_advertising(create_response.advertiser_id)
489
490        assertThat(got_result).isTrue()
491
492    def test_scan_filter_device_random_address_with_irk_extended_pdu(self):
493        """
494        The cert side will advertise using RRPA with Extended PDU
495
496        DUT will scan for the device using the pre-shared RANDOM STATIC ADDRESS of the advertising
497        device + IRK
498
499        Results received via ScanCallback
500        """
501        RANDOM_ADDRESS, create_response = self._advertise_rpa_random_extended_pdu(self.__get_test_irk())
502
503        # Setup SL4A DUT side to scan
504        addr_type = ble_address_types["random"]
505        logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d and IRK %s" %
506                     (RANDOM_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8")))
507        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
508        self.dut.sl4a.bleSetScanSettingsLegacy(False)
509        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
510        expected_event_name = scan_result.format(scan_callback)
511
512        # Setup SL4A DUT filter
513        self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(RANDOM_ADDRESS, int(addr_type),
514                                                              self.__get_test_irk().decode("utf-8"))
515        self.dut.sl4a.bleBuildScanFilter(filter_list)
516
517        # Start scanning on SL4A DUT side
518        self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
519        logging.info("Started scanning")
520
521        # Wait for results
522        got_result = self._wait_for_scan_result_event(expected_event_name)
523
524        # Test over
525        self._stop_scanning(scan_callback)
526        self._stop_advertising(create_response.advertiser_id)
527
528        assertThat(got_result).isTrue()
529
530    def test_scan_filter_device_random_address_with_irk_extended_pdu_pending_intent(self):
531        """
532        The cert side will advertise using RRPA with Extended PDU
533
534        DUT will scan for the device using the pre-shared RANDOM STATIC ADDRESS of the advertising
535        device + IRK
536
537        Results received via PendingIntent
538        """
539        RANDOM_ADDRESS, create_response = self._advertise_rpa_random_extended_pdu(self.__get_test_irk())
540
541        # Setup SL4A DUT side to scan
542        addr_type = ble_address_types["random"]
543        logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d and IRK %s" %
544                     (RANDOM_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8")))
545        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
546        self.dut.sl4a.bleSetScanSettingsLegacy(False)
547        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
548        # Hard code here since callback index iterates and will cause this to fail if ran
549        # Second as the impl in SL4A sends this since its a single callback for broadcast.
550        expected_event_name = "BleScan1onScanResults"
551
552        # Setup SL4A DUT filter
553        self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(RANDOM_ADDRESS, int(addr_type),
554                                                              self.__get_test_irk().decode("utf-8"))
555        self.dut.sl4a.bleBuildScanFilter(filter_list)
556
557        # Start scanning on SL4A DUT side
558        self.dut.sl4a.bleStartBleScanPendingIntent(filter_list, scan_settings)
559        logging.info("Started scanning")
560
561        # Wait for results
562        got_result = self._wait_for_scan_result_event(expected_event_name)
563
564        # Test over
565        self._stop_scanning(scan_callback)
566        self._stop_advertising(create_response.advertiser_id)
567
568        assertThat(got_result).isTrue()
569
570    def test_scan_filter_device_random_address_with_irk_extended_pdu_scan_twice(self):
571        """
572        The cert side will advertise using RRPA with Extended PDU
573
574        DUT will scan for the device using the pre-shared RANDOM STATIC ADDRESS of the advertising
575        device + IRK
576
577        DUT will stop scanning, then start scanning again for results
578
579        Results received via ScanCallback
580        """
581        RANDOM_ADDRESS, create_response = self._advertise_rpa_random_extended_pdu(self.__get_test_irk())
582
583        # Setup SL4A DUT side to scan
584        addr_type = ble_address_types["random"]
585        logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d and IRK %s" %
586                     (RANDOM_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8")))
587        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
588        self.dut.sl4a.bleSetScanSettingsLegacy(False)
589        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
590        expected_event_name = scan_result.format(scan_callback)
591
592        # Start scanning on SL4A DUT side
593        self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(RANDOM_ADDRESS, int(addr_type),
594                                                              self.__get_test_irk().decode("utf-8"))
595        self.dut.sl4a.bleBuildScanFilter(filter_list)
596        self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
597        logging.info("Started scanning")
598
599        # Wait for results
600        got_result = self._wait_for_scan_result_event(expected_event_name)
601
602        # Test over
603        self._stop_scanning(scan_callback)
604
605        # Start scanning on SL4A DUT side
606        self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
607        logging.info("Started scanning...again")
608
609        # Wait for results
610        got_result = self._wait_for_scan_result_event(expected_event_name)
611
612        # Test over
613        self._stop_scanning(scan_callback)
614        self._stop_advertising(create_response.advertiser_id)
615
616        assertThat(got_result).isTrue()
617
618    def test_scan_filter_device_random_address_with_irk_extended_pdu_pending_intent_128_640(self):
619        """
620        The CERT side will advertise an RPA derived from the IRK.
621
622        The DUT (SL4A) side will scan for a RPA with matching IRK.
623
624        Adjust the scan intervals to Digital Carkey specific timings.
625        """
626        DEVICE_NAME = 'Im_The_CERT!'
627        logging.info("Getting public address")
628        RANDOM_ADDRESS = self._set_cert_privacy_policy_with_random_address_but_advertise_resolvable(
629            self.__get_test_irk())
630        logging.info("Done %s" % RANDOM_ADDRESS)
631
632        legacy_pdus = False
633
634        # Setup cert side to advertise
635        gap_name = hci.GapData(data_type=hci.GapDataType.COMPLETE_LOCAL_NAME,
636                               data=list(bytes(DEVICE_NAME, encoding='utf8')))
637        gap_data = le_advertising_facade.GapDataMsg(data=gap_name.serialize())
638        config = le_advertising_facade.AdvertisingConfig(
639            advertisement=[gap_data],
640            interval_min=128,
641            interval_max=640,
642            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
643            own_address_type=common.USE_RANDOM_DEVICE_ADDRESS,
644            channel_map=7,
645            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
646        extended_config = le_advertising_facade.ExtendedAdvertisingConfig(
647            include_tx_power=True,
648            connectable=True,
649            legacy_pdus=legacy_pdus,
650            advertising_config=config,
651            secondary_advertising_phy=ble_scan_settings_phys["1m"])
652        request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config)
653        logging.info("Creating %s PDU advertiser..." % ("Legacy" if legacy_pdus else "Extended"))
654        create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request)
655        logging.info("%s PDU advertiser created." % ("Legacy" if legacy_pdus else "Extended"))
656
657        # Setup SL4A DUT side to scan
658        addr_type = ble_address_types["random"]
659        logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d and IRK %s" %
660                     (RANDOM_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8")))
661        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['ambient_discovery'])
662        self.dut.sl4a.bleSetScanSettingsLegacy(False)
663        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
664        # Hard code here since callback index iterates and will cause this to fail if ran
665        # Second as the impl in SL4A sends this since its a single callback for broadcast.
666        expected_event_name = "BleScan1onScanResults"
667
668        # Start scanning on SL4A DUT side
669        self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(RANDOM_ADDRESS, int(addr_type),
670                                                              self.__get_test_irk().decode("utf-8"))
671        self.dut.sl4a.bleBuildScanFilter(filter_list)
672        self.dut.sl4a.bleStartBleScanPendingIntent(filter_list, scan_settings)
673        logging.info("Started scanning")
674
675        # Wait for results
676        got_result = self._wait_for_scan_result_event(expected_event_name)
677
678        # Test over
679        self._stop_scanning(scan_callback)
680        self._stop_advertising(create_response.advertiser_id)
681
682        assertThat(got_result).isTrue()
683
684    def test_scan_filter_lost_random_address_with_irk(self):
685        RANDOM_ADDRESS, create_response = self._advertise_rpa_random_extended_pdu(self.__get_test_irk())
686
687        # Setup SL4A DUT side to scan
688        addr_type = ble_address_types["random"]
689        logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d and IRK %s" %
690                     (RANDOM_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8")))
691        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
692        self.dut.sl4a.bleSetScanSettingsLegacy(False)
693        self.dut.sl4a.bleSetScanSettingsCallbackType(ble_scan_settings_callback_types['found_and_lost'])
694        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
695        expected_event_name = scan_result.format(scan_callback)
696
697        # Start scanning on SL4A DUT side
698        self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(RANDOM_ADDRESS, int(addr_type),
699                                                              self.__get_test_irk().decode("utf-8"))
700        self.dut.sl4a.bleBuildScanFilter(filter_list)
701        self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
702        logging.info("Started scanning")
703
704        # Wait for found event to ensure scanning is started before stopping the advertiser
705        # to trigger lost event, else the lost event might not be caught by the test
706        got_found_result = self._wait_for_scan_result_event(expected_event_name)
707        assertThat(got_found_result).isTrue()
708
709        self._stop_advertising(create_response.advertiser_id)
710
711        # Wait for lost event
712        got_lost_result = self._wait_for_scan_result_event(expected_event_name)
713        assertThat(got_lost_result).isTrue()
714
715        # Test over
716        self._stop_scanning(scan_callback)
717
718
719if __name__ == '__main__':
720    test_runner.main()
721