1# Lint as: python3 2"""Utils for blue tooth tests. 3 4Partly ported from acts/framework/acts/test_utils/bt/bt_test_utils.py 5""" 6 7from __future__ import absolute_import 8from __future__ import division 9from __future__ import print_function 10 11import logging as log 12import os 13import random 14import string 15import time 16import wave 17from queue import Empty 18from typing import Optional 19 20from blueberry.tests.gd_sl4a.lib.ble_lib import generate_ble_advertise_objects 21from blueberry.tests.gd_sl4a.lib.bt_constants import adv_succ 22from blueberry.tests.gd_sl4a.lib.bt_constants import ble_advertise_settings_modes 23from blueberry.tests.gd_sl4a.lib.bt_constants import ble_advertise_settings_tx_powers 24from blueberry.tests.gd_sl4a.lib.bt_constants import bt_default_timeout 25from mobly.controllers.android_device import AndroidDevice 26 27 28class BtTestUtilsError(Exception): 29 pass 30 31 32def convert_pcm_to_wav(pcm_file_path, wave_file_path, audio_params): 33 """Converts raw pcm data into wave file. 34 35 Args: 36 pcm_file_path: File path of origin pcm file. 37 wave_file_path: File path of converted wave file. 38 audio_params: A dict with audio configuration. 39 """ 40 with open(pcm_file_path, 'rb') as pcm_file: 41 frames = pcm_file.read() 42 write_record_file(wave_file_path, audio_params, frames) 43 44 45def create_vcf_from_vcard(output_path: str, 46 num_of_contacts: int, 47 first_name: Optional[str] = None, 48 last_name: Optional[str] = None, 49 phone_number: Optional[int] = None) -> str: 50 """Creates a vcf file from vCard. 51 52 Args: 53 output_path: Path of the output vcf file. 54 num_of_contacts: Number of contacts to be generated. 55 first_name: First name of the contacts. 56 last_name: Last name of the contacts. 57 phone_number: Phone number of the contacts. 58 59 Returns: 60 vcf_file_path: Path of the output vcf file. E.g. 61 "/<output_path>/contacts_<time>.vcf". 62 """ 63 file_name = f'contacts_{int(time.time())}.vcf' 64 vcf_file_path = os.path.join(output_path, file_name) 65 with open(vcf_file_path, 'w+') as f: 66 for i in range(num_of_contacts): 67 lines = [] 68 if first_name is None: 69 first_name = 'Person' 70 vcard_last_name = last_name 71 if last_name is None: 72 vcard_last_name = i 73 vcard_phone_number = phone_number 74 if phone_number is None: 75 vcard_phone_number = random.randrange(int(10e10)) 76 lines.append('BEGIN:VCARD\n') 77 lines.append('VERSION:2.1\n') 78 lines.append(f'N:{vcard_last_name};{first_name};;;\n') 79 lines.append(f'FN:{first_name} {vcard_last_name}\n') 80 lines.append(f'TEL;CELL:{vcard_phone_number}\n') 81 lines.append(f'EMAIL;PREF:{first_name}{vcard_last_name}@gmail.com\n') 82 lines.append('END:VCARD\n') 83 f.write(''.join(lines)) 84 return vcf_file_path 85 86 87def generate_id_by_size(size, chars=(string.ascii_lowercase + string.ascii_uppercase + string.digits)): 88 """Generate random ascii characters of input size and input char types. 89 90 Args: 91 size: Input size of string. 92 chars: (Optional) Chars to use in generating a random string. 93 94 Returns: 95 String of random input chars at the input size. 96 """ 97 return ''.join(random.choice(chars) for _ in range(size)) 98 99 100def get_duration_seconds(wav_file_path): 101 """Get duration of most recently recorded file. 102 103 Args: 104 wav_file_path: path of the wave file. 105 106 Returns: 107 duration (float): duration of recorded file in seconds. 108 """ 109 f = wave.open(wav_file_path, 'r') 110 frames = f.getnframes() 111 rate = f.getframerate() 112 duration = (frames / float(rate)) 113 f.close() 114 return duration 115 116 117def wait_until(timeout_sec, condition_func, func_args, expected_value, exception=None, interval_sec=0.5): 118 """Waits until a function returns a expected value or timeout is reached. 119 120 Example usage: 121 ``` 122 def is_bluetooth_enabled(device) -> bool: 123 do something and return something... 124 125 # Waits and checks if Bluetooth is turned on. 126 bt_test_utils.wait_until( 127 timeout_sec=10, 128 condition_func=is_bluetooth_enabled, 129 func_args=[dut], 130 expected_value=True, 131 exception=signals.TestFailure('Failed to turn on Bluetooth.'), 132 interval_sec=1) 133 ``` 134 135 Args: 136 timeout_sec: float, max waiting time in seconds. 137 condition_func: function, when the condiction function returns the expected 138 value, the waiting mechanism will be interrupted. 139 func_args: tuple or list, the arguments for the condition function. 140 expected_value: a expected value that the condition function returns. 141 exception: Exception, an exception will be raised when timed out if needed. 142 interval_sec: float, interval time between calls of the condition function 143 in seconds. 144 145 Returns: 146 True if the function returns the expected value else False. 147 """ 148 start_time = time.time() 149 end_time = start_time + timeout_sec 150 while time.time() < end_time: 151 if condition_func(*func_args) == expected_value: 152 return True 153 time.sleep(interval_sec) 154 args_string = ', '.join(list(map(str, func_args))) 155 log.warning('Timed out after %.1fs waiting for "%s(%s)" to be "%s".', timeout_sec, condition_func.__name__, 156 args_string, expected_value) 157 if exception: 158 raise exception 159 return False 160 161 162def write_read_verify_data_sl4a(client_ad, server_ad, msg, binary=False): 163 """Verify that the client wrote data to the server Android device correctly. 164 165 Args: 166 client_ad: the Android device to perform the write. 167 server_ad: the Android device to read the data written. 168 msg: the message to write. 169 binary: if the msg arg is binary or not. 170 171 Returns: 172 True if the data written matches the data read, false if not. 173 """ 174 client_ad.log.info('Write message %s.', msg) 175 if binary: 176 client_ad.sl4a.bluetoothSocketConnWriteBinary(msg) 177 else: 178 client_ad.sl4a.bluetoothSocketConnWrite(msg) 179 server_ad.log.info('Read message %s.', msg) 180 if binary: 181 read_msg = server_ad.sl4a.bluetoothSocketConnReadBinary().rstrip('\r\n') 182 else: 183 read_msg = server_ad.sl4a.bluetoothSocketConnRead() 184 log.info('Verify message.') 185 if msg != read_msg: 186 log.error('Mismatch! Read: %s, Expected: %s', read_msg, msg) 187 return False 188 log.info('Matched! Read: %s, Expected: %s', read_msg, msg) 189 return True 190 191 192def write_record_file(file_name, audio_params, frames): 193 """Writes the recorded audio into the file. 194 195 Args: 196 file_name: The file name for writing the recorded audio. 197 audio_params: A dict with audio configuration. 198 frames: Recorded audio frames. 199 """ 200 log.debug('writing frame to %s', file_name) 201 wf = wave.open(file_name, 'wb') 202 wf.setnchannels(audio_params['channel']) 203 wf.setsampwidth(audio_params.get('sample_width', 1)) 204 wf.setframerate(audio_params['sample_rate']) 205 wf.writeframes(frames) 206 wf.close() 207 208 209def get_mac_address_of_generic_advertisement(scan_device, adv_device, adv_addr_type=None): 210 """Start generic advertisement and get it's mac address by LE scanning. 211 212 Args: 213 scan_ad: The Android device to use as the scanner. 214 adv_device: The Android device to use as the advertiser. 215 adv_addr_type: The address type for the advertiser (refer to AdvertiseSettings.java) 216 217 Returns: 218 mac_address: The mac address of the advertisement. 219 advertise_callback: The advertise callback id of the active 220 advertisement. 221 """ 222 adv_device.sl4a.bleSetAdvertiseDataIncludeDeviceName(True) 223 adv_device.sl4a.bleSetAdvertiseSettingsAdvertiseMode(ble_advertise_settings_modes['low_latency']) 224 adv_device.sl4a.bleSetAdvertiseSettingsIsConnectable(True) 225 adv_device.sl4a.bleSetAdvertiseSettingsTxPowerLevel(ble_advertise_settings_tx_powers['high']) 226 227 if adv_addr_type is not None: 228 adv_device.sl4a.bleSetAdvertiseSettingsOwnAddressType(adv_addr_type) 229 230 advertise_callback, advertise_data, advertise_settings = (generate_ble_advertise_objects(adv_device.sl4a)) 231 adv_device.sl4a.bleStartBleAdvertising(advertise_callback, advertise_data, advertise_settings) 232 try: 233 adv_device.ed.pop_event(adv_succ.format(advertise_callback), bt_default_timeout) 234 except Empty as err: 235 raise BtTestUtilsError("Advertiser did not start successfully {}".format(err)) 236 filter_list = scan_device.sl4a.bleGenFilterList() 237 scan_settings = scan_device.sl4a.bleBuildScanSetting() 238 scan_callback = scan_device.sl4a.bleGenScanCallback() 239 scan_device.sl4a.bleSetScanFilterDeviceName(adv_device.sl4a.bluetoothGetLocalName()) 240 scan_device.sl4a.bleBuildScanFilter(filter_list) 241 scan_device.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) 242 try: 243 event = scan_device.sl4a.ed.pop_event("BleScan{}onScanResults".format(scan_callback), bt_default_timeout) 244 except Empty as err: 245 raise BtTestUtilsError("Scanner did not find advertisement {}".format(err)) 246 mac_address = event['data']['Result']['deviceInfo']['address'] 247 return mac_address, advertise_callback, scan_callback 248 249 250def clear_bonded_devices(ad: AndroidDevice): 251 """Clear bonded devices from the input Android device. 252 253 Args: 254 ad: the Android device performing the connection. 255 Returns: 256 True if clearing bonded devices was successful, false if unsuccessful. 257 """ 258 bonded_device_list = ad.sl4a.bluetoothGetBondedDevices() 259 while bonded_device_list: 260 device_address = bonded_device_list[0]['address'] 261 if not ad.sl4a.bluetoothUnbond(device_address): 262 ad.log.error("Failed to unbond {} from {}".format(device_address, ad.serial)) 263 return False 264 ad.log.info("Successfully unbonded {} from {}".format(device_address, ad.serial)) 265 #TODO: wait for BOND_STATE_CHANGED intent instead of waiting 266 time.sleep(1) 267 268 # If device was first connected using LE transport, after bonding it is 269 # accessible through it's LE address, and through it classic address. 270 # Unbonding it will unbond two devices representing different 271 # "addresses". Attempt to unbond such already unbonded devices will 272 # result in bluetoothUnbond returning false. 273 bonded_device_list = ad.sl4a.bluetoothGetBondedDevices() 274 return True 275