1#!/usr/bin/env python3 2# Copyright 2019 - The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16import asyncio 17import logging 18 19from blueberry.tests.gd.cert.gd_device import GdHostOnlyDevice 20from blueberry.tests.gd.cert.gd_device import MOBLY_CONTROLLER_CONFIG_NAME 21from blueberry.tests.gd.cert.os_utils import get_gd_root 22from blueberry.tests.topshim.lib.async_closable import AsyncClosable 23from blueberry.tests.topshim.lib.async_closable import asyncSafeClose 24from blueberry.tests.topshim.lib.oob_data import OobData 25from blueberry.tests.topshim.lib.gatt_client import GattClient 26 27 28def create(configs): 29 return get_instances_with_configs(configs) 30 31 32def destroy(devices): 33 pass 34 35 36def replace_vars_for_topshim(string, config): 37 serial_number = config.get("serial_number") 38 if serial_number is None: 39 serial_number = "" 40 rootcanal_port = config.get("rootcanal_port") 41 if rootcanal_port is None: 42 rootcanal_port = "" 43 if serial_number == "DUT" or serial_number == "CERT": 44 raise Exception("Did you forget to configure the serial number?") 45 # We run bt_topshim_facade instead of bluetooth_stack_with_facade 46 return string.replace("$GD_ROOT", get_gd_root()) \ 47 .replace("bluetooth_stack_with_facade", "bt_topshim_facade") \ 48 .replace("$(grpc_port)", config.get("grpc_port")) \ 49 .replace("$(grpc_root_server_port)", config.get("grpc_root_server_port")) \ 50 .replace("$(rootcanal_port)", rootcanal_port) \ 51 .replace("$(signal_port)", config.get("signal_port")) \ 52 .replace("$(serial_number)", serial_number) 53 54 55def get_instances_with_configs(configs): 56 logging.info(configs) 57 devices = [] 58 for config in configs: 59 resolved_cmd = [] 60 for arg in config["cmd"]: 61 logging.debug(arg) 62 resolved_cmd.append(replace_vars_for_topshim(arg, config)) 63 verbose_mode = bool(config.get('verbose_mode', False)) 64 device = GdHostOnlyDevice(config["grpc_port"], "-1", config["signal_port"], resolved_cmd, config["label"], 65 MOBLY_CONTROLLER_CONFIG_NAME, config["name"], verbose_mode) 66 device.setup() 67 devices.append(device) 68 return devices 69 70 71TRANSPORT_CLASSIC = 1 72TRANSPORT_LE = 2 73 74 75class TopshimDevice(AsyncClosable): 76 __adapter = None 77 __gatt = None 78 __security = None 79 __hfp = None 80 __hf_client = None 81 82 async def __le_rand_wrapper(self, async_fn): 83 result = await async_fn 84 await self.__adapter.le_rand() 85 le_rand_future = await self.__adapter._listen_for_event(facade_pb2.EventType.LE_RAND) 86 return result 87 88 def __post(self, async_fn): 89 return asyncio.get_event_loop().run_until_complete(async_fn) 90 91 def __init__(self, adapter, gatt, security, hfp, hf_client): 92 self.__adapter = adapter 93 self.__gatt = gatt 94 self.__security = security 95 self.__hfp = hfp 96 self.__hf_client = hf_client 97 98 async def close(self): 99 """ 100 Implement abstract method to close out any streams or jobs. 101 """ 102 await asyncSafeClose(self.__adapter) 103 await asyncSafeClose(self.__gatt) 104 await asyncSafeClose(self.__security) 105 await asyncSafeClose(self.__hfp) 106 await asyncSafeClose(self.__hf_client) 107 108 def enable_inquiry_scan(self): 109 f = self.__post(self.__adapter.enable_inquiry_scan()) 110 return self.__post(self.__discovery_mode_waiter(f)) 111 112 def enable_page_scan(self): 113 f = self.__post(self.__adapter.enable_page_scan()) 114 return self.__post(self.__discovery_mode_waiter(f)) 115 116 def disable_page_scan(self): 117 f = self.__post(self.__adapter.disable_page_scan()) 118 return self.__post(self.__discovery_mode_waiter(f)) 119 120 async def __discovery_mode_waiter(self, f): 121 params = await f 122 status, discovery_mode = params["status"].data[0], params["AdapterScanMode"].data[0] 123 return (status, discovery_mode) 124 125 def start_advertising(self): 126 """ 127 Starts BLE Advertiser for the stack. 128 Assumes stack defaults. Which in our case would be RRPA 129 """ 130 self.__post(self.__gatt.advertising_enable()) 131 132 def stop_advertising(self): 133 """ 134 Stop BLE Advertiser. 135 """ 136 self.__post(self.__gatt.advertising_disable()) 137 138 def start_scanning(self): 139 pass 140 141 def stop_scanning(self): 142 pass 143 144 def clear_event_mask(self): 145 self.__post(self.__adapter.clear_event_mask()) 146 147 def clear_event_filter(self): 148 self.__post(self.__adapter.clear_event_filter()) 149 150 def clear_filter_accept_list(self): 151 self.__post(self.__adapter.clear_filter_accept_list()) 152 153 def disconnect_all_acls(self): 154 self.__post(self.__adapter.disconnect_all_acls()) 155 156 def allow_wake_by_hid(self): 157 self.__post(self.__adapter.allow_wake_by_hid()) 158 159 def set_default_event_mask_except(self, mask, le_mask): 160 self.__post(self.__adapter.set_default_event_mask_except(mask, le_mask)) 161 162 def set_event_filter_inquiry_result_all_devices(self): 163 self.__post(self.__adapter.set_event_filter_inquiry_result_all_devices()) 164 165 def set_event_filter_connection_setup_all_devices(self): 166 self.__post(self.__adapter.set_event_filter_connection_setup_all_devices()) 167 168 def le_rand(self): 169 self.__post(self.__adapter.le_rand()) 170 171 def create_bond(self, address, transport=1): 172 """ 173 Create a bonding entry for a given address with a particular transport type. 174 """ 175 f = self.__post(self.__security.create_bond(address, transport)) 176 return self.__post(self.__bond_change_waiter(f)) 177 178 def remove_bonded_device(self, address): 179 """ 180 Removes a bonding entry for a given address. 181 """ 182 self.__post(self.__security.remove_bond(address)) 183 184 async def __bond_change_waiter(self, f): 185 params = await f 186 state, address = params["bond_state"].data[0], params["address"].data[0] 187 return (state, address) 188 189 def generate_local_oob_data(self, transport=TRANSPORT_LE): 190 """ 191 Generate local Out of Band data 192 193 @param transport TRANSPORT_CLASSIC or TRANSPORT_LE 194 195 @return a future to await on fo the data 196 """ 197 f = self.__post(self.__security.generate_local_oob_data(transport)) 198 199 async def waiter(f): 200 params = await f 201 return OobData(params["is_valid"].data[0], params["transport"].data[0], params["address"].data[0], 202 params["confirmation"].data[0], params["randomizer"].data[0]) 203 204 return self.__post(waiter(f)) 205 206 def set_local_io_caps(self, io_capability=0): 207 f = self.__post(self.__adapter.set_local_io_caps(io_capability)) 208 209 async def waiter(f): 210 params = await f 211 status, io_caps = params["status"].data[0], params["LocalIoCaps"].data[0] 212 return (status, io_caps) 213 214 return self.__post(waiter(f)) 215 216 def toggle_discovery(self, is_start): 217 f = self.__post(self.__adapter.toggle_discovery(is_start)) 218 219 async def waiter(f): 220 params = await f 221 return params["discovery_state"].data[0] 222 223 return self.__post(waiter(f)) 224 225 def find_device(self): 226 """ 227 Attempts to find discoverable devices when discovery is toggled on. 228 229 @return a list of properties of found device. 230 """ 231 f = self.__post(self.__adapter.find_device()) 232 233 async def waiter(f): 234 try: 235 params = await f 236 return params["BdAddr"].data[0] 237 except: 238 # The future `f` has a timeout after 2s post which it is cancelled. 239 print("No device was found. Timed out.") 240 return None 241 242 return self.__post(waiter(f)) 243 244 def start_slc(self, address): 245 f = self.__post(self.__hfp.start_slc(address)) 246 return self.__post(self.__hfp_connection_state_waiter(f)) 247 248 def stop_slc(self, address): 249 f = self.__post(self.__hfp.stop_slc(address)) 250 return self.__post(self.__hfp_connection_state_waiter(f)) 251 252 def wait_for_hfp_connection_state_change(self): 253 f = self.__post(self.__hfp.wait_for_hfp_connection_state_change()) 254 return self.__post(self.__hfp_connection_state_waiter(f)) 255 256 async def __hfp_connection_state_waiter(self, f): 257 data = await f 258 data_list = data.split(", ") 259 state, address = data_list[0].strip(), data_list[1].strip() 260 return (state, address) 261