1#!/usr/bin/env python3 2# 3# Copyright (C) 2016 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16""" 17Ble libraries 18""" 19 20import time 21import queue 22import logging 23 24from blueberry.tests.gd_sl4a.lib.bt_constants import ble_advertise_settings_modes 25from blueberry.tests.gd_sl4a.lib.bt_constants import small_timeout 26from blueberry.tests.gd_sl4a.lib.bt_constants import adv_fail 27from blueberry.tests.gd_sl4a.lib.bt_constants import adv_succ 28from blueberry.tests.gd_sl4a.lib.bt_constants import advertising_set_on_own_address_read 29from blueberry.tests.gd_sl4a.lib.bt_constants import advertising_set_started 30from blueberry.tests.gd_sl4a.lib.bt_constants import bluetooth_on 31from blueberry.tests.gd_sl4a.lib.bt_constants import bluetooth_off 32from blueberry.tests.gd_sl4a.lib.bt_constants import bt_default_timeout 33 34 35def enable_bluetooth(sl4a, ed): 36 if sl4a.bluetoothCheckState(): 37 return True 38 39 sl4a.bluetoothToggleState(True) 40 expected_bluetooth_on_event_name = bluetooth_on 41 try: 42 ed.pop_event(expected_bluetooth_on_event_name, bt_default_timeout) 43 except Exception: 44 logging.info("Failed to toggle Bluetooth on (no broadcast received)") 45 if sl4a.bluetoothCheckState(): 46 logging.info(".. actual state is ON") 47 return True 48 logging.info(".. actual state is OFF") 49 return False 50 51 return True 52 53 54def disable_bluetooth(sl4a, ed): 55 if not sl4a.bluetoothCheckState(): 56 return True 57 sl4a.bluetoothToggleState(False) 58 expected_bluetooth_off_event_name = bluetooth_off 59 try: 60 ed.pop_event(expected_bluetooth_off_event_name, bt_default_timeout) 61 except Exception: 62 logging.info("Failed to toggle Bluetooth off (no broadcast received)") 63 if sl4a.bluetoothCheckState(): 64 logging.info(".. actual state is ON") 65 return False 66 logging.info(".. actual state is OFF") 67 return True 68 return True 69 70 71def generate_ble_scan_objects(sl4a): 72 """Generate generic LE scan objects. 73 74 Args: 75 sl4a: The SL4A object to generate LE scan objects from. 76 77 Returns: 78 filter_list: The generated scan filter list id. 79 scan_settings: The generated scan settings id. 80 scan_callback: The generated scan callback id. 81 """ 82 filter_list = sl4a.bleGenFilterList() 83 scan_settings = sl4a.bleBuildScanSetting() 84 scan_callback = sl4a.bleGenScanCallback() 85 return filter_list, scan_settings, scan_callback 86 87 88def generate_ble_advertise_objects(sl4a): 89 """Generate generic LE advertise objects. 90 91 Args: 92 sl4a: The SL4A object to generate advertise LE objects from. 93 94 Returns: 95 advertise_callback: The generated advertise callback id. 96 advertise_data: The generated advertise data id. 97 advertise_settings: The generated advertise settings id. 98 """ 99 advertise_callback = sl4a.bleGenBleAdvertiseCallback() 100 advertise_data = sl4a.bleBuildAdvertiseData() 101 advertise_settings = sl4a.bleBuildAdvertiseSettings() 102 return advertise_callback, advertise_data, advertise_settings 103 104 105class BleLib(): 106 107 def __init__(self, dut): 108 self.advertisement_list = [] 109 self.dut = dut 110 self.default_timeout = 5 111 self.set_advertisement_list = [] 112 self.generic_uuid = "0000{}-0000-1000-8000-00805f9b34fb" 113 114 def _verify_ble_adv_started(self, advertise_callback): 115 """Helper for verifying if an advertisment started or not""" 116 regex = "({}|{})".format(adv_succ.format(advertise_callback), adv_fail.format(advertise_callback)) 117 try: 118 event = self.dut.ed.pop_events(regex, 5, small_timeout) 119 except queue.Empty: 120 logging.error("Failed to get success or failed event.") 121 return 122 if event[0]["name"] == adv_succ.format(advertise_callback): 123 logging.info("Advertisement started successfully.") 124 return True 125 else: 126 logging.info("Advertisement failed to start.") 127 return False 128 129 def start_generic_connectable_advertisement(self, line): 130 """Start a connectable LE advertisement""" 131 scan_response = None 132 if line: 133 scan_response = bool(line) 134 self.dut.sl4a.bleSetAdvertiseSettingsAdvertiseMode(ble_advertise_settings_modes['low_latency']) 135 self.dut.sl4a.bleSetAdvertiseSettingsIsConnectable(True) 136 advertise_callback, advertise_data, advertise_settings = (generate_ble_advertise_objects(self.dut.sl4a)) 137 if scan_response: 138 self.dut.sl4a.bleStartBleAdvertisingWithScanResponse(advertise_callback, advertise_data, advertise_settings, 139 advertise_data) 140 else: 141 self.dut.sl4a.bleStartBleAdvertising(advertise_callback, advertise_data, advertise_settings) 142 if self._verify_ble_adv_started(advertise_callback): 143 logging.info("Tracking Callback ID: {}".format(advertise_callback)) 144 self.advertisement_list.append(advertise_callback) 145 logging.info(self.advertisement_list) 146 147 def start_connectable_advertisement_set(self, line): 148 """Start Connectable Advertisement Set""" 149 adv_callback = self.dut.sl4a.bleAdvSetGenCallback() 150 adv_data = { 151 "includeDeviceName": True, 152 } 153 self.dut.sl4a.bleAdvSetStartAdvertisingSet({ 154 "connectable": True, 155 "legacyMode": False, 156 "primaryPhy": "PHY_LE_1M", 157 "secondaryPhy": "PHY_LE_1M", 158 "interval": 320 159 }, adv_data, None, None, None, 0, 0, adv_callback) 160 evt = self.dut.ed.pop_event(advertising_set_started.format(adv_callback), self.default_timeout) 161 set_id = evt['data']['setId'] 162 logging.error("did not receive the set started event!") 163 evt = self.dut.ed.pop_event(advertising_set_on_own_address_read.format(set_id), self.default_timeout) 164 address = evt['data']['address'] 165 logging.info("Advertiser address is: {}".format(str(address))) 166 self.set_advertisement_list.append(adv_callback) 167 168 def stop_all_advertisement_set(self, line): 169 """Stop all Advertisement Sets""" 170 for adv in self.set_advertisement_list: 171 try: 172 self.dut.sl4a.bleAdvSetStopAdvertisingSet(adv) 173 except Exception as err: 174 logging.error("Failed to stop advertisement: {}".format(err)) 175 176 def adv_add_service_uuid_list(self, line): 177 """Add service UUID to the LE advertisement inputs: 178 [uuid1 uuid2 ... uuidN]""" 179 uuids = line.split() 180 uuid_list = [] 181 for uuid in uuids: 182 if len(uuid) == 4: 183 uuid = self.generic_uuid.format(line) 184 uuid_list.append(uuid) 185 self.dut.sl4a.bleSetAdvertiseDataSetServiceUuids(uuid_list) 186 187 def adv_data_include_local_name(self, is_included): 188 """Include local name in the advertisement. inputs: [true|false]""" 189 self.dut.sl4a.bleSetAdvertiseDataIncludeDeviceName(bool(is_included)) 190 191 def adv_data_include_tx_power_level(self, is_included): 192 """Include tx power level in the advertisement. inputs: [true|false]""" 193 self.dut.sl4a.bleSetAdvertiseDataIncludeTxPowerLevel(bool(is_included)) 194 195 def adv_data_add_manufacturer_data(self, line): 196 """Include manufacturer id and data to the advertisment: 197 [id data1 data2 ... dataN]""" 198 info = line.split() 199 manu_id = int(info[0]) 200 manu_data = [] 201 for data in info[1:]: 202 manu_data.append(int(data)) 203 self.dut.sl4a.bleAddAdvertiseDataManufacturerId(manu_id, manu_data) 204 205 def start_generic_nonconnectable_advertisement(self, line): 206 """Start a nonconnectable LE advertisement""" 207 self.dut.sl4a.bleSetAdvertiseSettingsAdvertiseMode(ble_advertise_settings_modes['low_latency']) 208 self.dut.sl4a.bleSetAdvertiseSettingsIsConnectable(False) 209 advertise_callback, advertise_data, advertise_settings = (generate_ble_advertise_objects(self.dut.sl4a)) 210 self.dut.sl4a.bleStartBleAdvertising(advertise_callback, advertise_data, advertise_settings) 211 if self._verify_ble_adv_started(advertise_callback): 212 logging.info("Tracking Callback ID: {}".format(advertise_callback)) 213 self.advertisement_list.append(advertise_callback) 214 logging.info(self.advertisement_list) 215 216 def stop_all_advertisements(self, line): 217 """Stop all LE advertisements""" 218 for callback_id in self.advertisement_list: 219 logging.info("Stopping Advertisement {}".format(callback_id)) 220 self.dut.sl4a.bleStopBleAdvertising(callback_id) 221 time.sleep(1) 222 self.advertisement_list = [] 223 224 def ble_stop_advertisement(self, callback_id): 225 """Stop an LE advertisement""" 226 if not callback_id: 227 logging.info("Need a callback ID") 228 return 229 callback_id = int(callback_id) 230 if callback_id not in self.advertisement_list: 231 logging.info("Callback not in list of advertisements.") 232 return 233 self.dut.sl4a.bleStopBleAdvertising(callback_id) 234 self.advertisement_list.remove(callback_id) 235 236 def start_max_advertisements(self, line): 237 scan_response = None 238 if line: 239 scan_response = bool(line) 240 while (True): 241 try: 242 self.dut.sl4a.bleSetAdvertiseSettingsAdvertiseMode(ble_advertise_settings_modes['low_latency']) 243 self.dut.sl4a.bleSetAdvertiseSettingsIsConnectable(True) 244 advertise_callback, advertise_data, advertise_settings = (generate_ble_advertise_objects(self.dut.sl4a)) 245 if scan_response: 246 self.dut.sl4a.bleStartBleAdvertisingWithScanResponse(advertise_callback, advertise_data, 247 advertise_settings, advertise_data) 248 else: 249 self.dut.sl4a.bleStartBleAdvertising(advertise_callback, advertise_data, advertise_settings) 250 if self._verify_ble_adv_started(advertise_callback): 251 logging.info("Tracking Callback ID: {}".format(advertise_callback)) 252 self.advertisement_list.append(advertise_callback) 253 logging.info(self.advertisement_list) 254 else: 255 logging.info("Advertisements active: {}".format(len(self.advertisement_list))) 256 return False 257 except Exception as err: 258 logging.info("Advertisements active: {}".format(len(self.advertisement_list))) 259 return True 260