1#!/usr/bin/env python3 2# 3# Copyright 2022 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18import queue 19 20from blueberry.facade import common_pb2 as common 21from blueberry.tests.gd.cert.closable import Closable 22from blueberry.tests.gd.cert.closable import safeClose 23from blueberry.tests.gd.cert.truth import assertThat 24from blueberry.tests.gd_sl4a.lib.ble_lib import generate_ble_scan_objects 25from blueberry.tests.gd_sl4a.lib.bt_constants import ble_scan_settings_modes 26from blueberry.tests.gd_sl4a.lib.bt_constants import scan_result 27from blueberry.tests.sl4a_sl4a.lib import sl4a_sl4a_base_test 28 29 30class LeScanner(Closable): 31 32 is_scanning = False 33 device = None 34 filter_list = None 35 scan_settings = None 36 scan_callback = None 37 38 def __init__(self, device): 39 self.device = device 40 41 def __wait_for_scan_result_event(self, expected_event_name, timeout=60): 42 try: 43 event_info = self.device.ed.pop_event(expected_event_name, timeout) 44 except queue.Empty as error: 45 logging.error("Could not find scan result event: %s", expected_event_name) 46 return None 47 return event_info['data']['Result']['deviceInfo']['address'] 48 49 def scan_for_address_expect_none(self, address, addr_type): 50 if self.is_scanning: 51 print("Already scanning!") 52 return None 53 self.is_scanning = True 54 logging.info("Start scanning for identity address {} or type {}".format(address, addr_type)) 55 self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 56 self.device.sl4a.bleSetScanSettingsLegacy(False) 57 self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a) 58 expected_event_name = scan_result.format(self.scan_callback) 59 60 # Start scanning on SL4A DUT 61 self.device.sl4a.bleSetScanFilterDeviceAddressAndType(address, addr_type) 62 self.device.sl4a.bleBuildScanFilter(self.filter_list) 63 self.device.sl4a.bleStartBleScan(self.filter_list, self.scan_settings, self.scan_callback) 64 65 # Verify that scan result is received on SL4A DUT 66 advertising_address = self.__wait_for_scan_result_event(expected_event_name, 1) 67 assertThat(advertising_address).isNone() 68 logging.info("Filter advertisement with address {}".format(advertising_address)) 69 return advertising_address 70 71 def scan_for_address(self, address, addr_type): 72 if self.is_scanning: 73 print("Already scanning!") 74 return None 75 self.is_scanning = True 76 logging.info("Start scanning for identity address {} or type {}".format(address, addr_type)) 77 self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 78 self.device.sl4a.bleSetScanSettingsLegacy(False) 79 self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a) 80 expected_event_name = scan_result.format(self.scan_callback) 81 82 # Start scanning on SL4A DUT 83 self.device.sl4a.bleSetScanFilterDeviceAddressAndType(address, addr_type) 84 self.device.sl4a.bleBuildScanFilter(self.filter_list) 85 self.device.sl4a.bleStartBleScan(self.filter_list, self.scan_settings, self.scan_callback) 86 87 # Verify that scan result is received on SL4A DUT 88 advertising_address = self.__wait_for_scan_result_event(expected_event_name) 89 assertThat(advertising_address).isNotNone() 90 logging.info("Filter advertisement with address {}".format(advertising_address)) 91 return advertising_address 92 93 def scan_for_address_with_irk(self, address, addr_type, irk): 94 if self.is_scanning: 95 print("Already scanning!") 96 return None 97 self.is_scanning = True 98 logging.info("Start scanning for identity address {} or type {} using irk {}".format(address, addr_type, irk)) 99 self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 100 self.device.sl4a.bleSetScanSettingsLegacy(False) 101 self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a) 102 expected_event_name = scan_result.format(self.scan_callback) 103 104 # Start scanning on SL4A DUT 105 self.device.sl4a.bleSetScanFilterDeviceAddressTypeAndIrkHexString(address, addr_type, irk) 106 self.device.sl4a.bleBuildScanFilter(self.filter_list) 107 self.device.sl4a.bleStartBleScan(self.filter_list, self.scan_settings, self.scan_callback) 108 109 # Verify that scan result is received on SL4A DUT 110 advertising_address = self.__wait_for_scan_result_event(expected_event_name) 111 assertThat(advertising_address).isNotNone() 112 logging.info("Filter advertisement with address {}".format(advertising_address)) 113 return advertising_address 114 115 def scan_for_address_with_irk_pending_intent(self, address, addr_type, irk): 116 if self.is_scanning: 117 print("Already scanning!") 118 return None 119 self.is_scanning = True 120 logging.info("Start scanning for identity address {} or type {} using irk {}".format(address, addr_type, irk)) 121 self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 122 self.device.sl4a.bleSetScanSettingsLegacy(False) 123 self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a) 124 # Hard code here since callback index iterates and will cause this to fail if ran 125 # Second as the impl in SL4A sends this since it's a single callback for broadcast. 126 expected_event_name = "BleScan1onScanResults" 127 128 # Start scanning on SL4A DUT 129 self.device.sl4a.bleSetScanFilterDeviceAddressTypeAndIrkHexString(address, addr_type, irk) 130 self.device.sl4a.bleBuildScanFilter(self.filter_list) 131 self.device.sl4a.bleStartBleScanPendingIntent(self.filter_list, self.scan_settings) 132 133 # Verify that scan result is received on SL4A DUT 134 advertising_address = self.__wait_for_scan_result_event(expected_event_name) 135 assertThat(advertising_address).isNotNone() 136 logging.info("Filter advertisement with address {}".format(advertising_address)) 137 return advertising_address 138 139 def scan_for_name(self, name): 140 if self.is_scanning: 141 print("Already scanning!") 142 return 143 self.is_scanning = True 144 logging.info("Start scanning for name {}".format(name)) 145 self.device.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 146 self.device.sl4a.bleSetScanSettingsLegacy(False) 147 self.filter_list, self.scan_settings, self.scan_callback = generate_ble_scan_objects(self.device.sl4a) 148 expected_event_name = scan_result.format(1) 149 self.device.ed.clear_events(expected_event_name) 150 151 # Start scanning on SL4A DUT 152 self.device.sl4a.bleSetScanFilterDeviceName(name) 153 self.device.sl4a.bleBuildScanFilter(self.filter_list) 154 self.device.sl4a.bleStartBleScanPendingIntent(self.filter_list, self.scan_settings) 155 156 # Verify that scan result is received on SL4A DUT 157 advertising_address = self.__wait_for_scan_result_event(expected_event_name) 158 assertThat(advertising_address).isNotNone() 159 logging.info("Filter advertisement with address {}".format(advertising_address)) 160 return advertising_address 161 162 def stop_scanning(self): 163 """ 164 Warning: no java callback registered for this 165 """ 166 if self.is_scanning: 167 logging.info("Stopping scan") 168 self.device.sl4a.bleStopBleScan(self.scan_callback) 169 self.is_scanning = False 170 171 def close(self): 172 self.stop_scanning() 173 self.device = None 174