1# Lint as: python3
2"""Utils for blue tooth tests.
3
4Partly ported from acts/framework/acts/test_utils/bt/bt_test_utils.py
5"""
6
7from __future__ import absolute_import
8from __future__ import division
9from __future__ import print_function
10
11import logging as log
12import os
13import random
14import string
15import time
16import wave
17from queue import Empty
18from typing import Optional
19
20from blueberry.tests.gd_sl4a.lib.ble_lib import generate_ble_advertise_objects
21from blueberry.tests.gd_sl4a.lib.bt_constants import adv_succ
22from blueberry.tests.gd_sl4a.lib.bt_constants import ble_advertise_settings_modes
23from blueberry.tests.gd_sl4a.lib.bt_constants import ble_advertise_settings_tx_powers
24from blueberry.tests.gd_sl4a.lib.bt_constants import bt_default_timeout
25from mobly.controllers.android_device import AndroidDevice
26
27
28class BtTestUtilsError(Exception):
29    pass
30
31
32def convert_pcm_to_wav(pcm_file_path, wave_file_path, audio_params):
33    """Converts raw pcm data into wave file.
34
35    Args:
36        pcm_file_path: File path of origin pcm file.
37        wave_file_path: File path of converted wave file.
38        audio_params: A dict with audio configuration.
39    """
40    with open(pcm_file_path, 'rb') as pcm_file:
41        frames = pcm_file.read()
42    write_record_file(wave_file_path, audio_params, frames)
43
44
45def create_vcf_from_vcard(output_path: str,
46                          num_of_contacts: int,
47                          first_name: Optional[str] = None,
48                          last_name: Optional[str] = None,
49                          phone_number: Optional[int] = None) -> str:
50    """Creates a vcf file from vCard.
51
52    Args:
53        output_path: Path of the output vcf file.
54        num_of_contacts: Number of contacts to be generated.
55        first_name: First name of the contacts.
56        last_name: Last name of the contacts.
57        phone_number: Phone number of the contacts.
58
59    Returns:
60        vcf_file_path: Path of the output vcf file. E.g.
61            "/<output_path>/contacts_<time>.vcf".
62    """
63    file_name = f'contacts_{int(time.time())}.vcf'
64    vcf_file_path = os.path.join(output_path, file_name)
65    with open(vcf_file_path, 'w+') as f:
66        for i in range(num_of_contacts):
67            lines = []
68            if first_name is None:
69                first_name = 'Person'
70            vcard_last_name = last_name
71            if last_name is None:
72                vcard_last_name = i
73            vcard_phone_number = phone_number
74            if phone_number is None:
75                vcard_phone_number = random.randrange(int(10e10))
76            lines.append('BEGIN:VCARD\n')
77            lines.append('VERSION:2.1\n')
78            lines.append(f'N:{vcard_last_name};{first_name};;;\n')
79            lines.append(f'FN:{first_name} {vcard_last_name}\n')
80            lines.append(f'TEL;CELL:{vcard_phone_number}\n')
81            lines.append(f'EMAIL;PREF:{first_name}{vcard_last_name}@gmail.com\n')
82            lines.append('END:VCARD\n')
83            f.write(''.join(lines))
84    return vcf_file_path
85
86
87def generate_id_by_size(size, chars=(string.ascii_lowercase + string.ascii_uppercase + string.digits)):
88    """Generate random ascii characters of input size and input char types.
89
90    Args:
91        size: Input size of string.
92        chars: (Optional) Chars to use in generating a random string.
93
94    Returns:
95        String of random input chars at the input size.
96    """
97    return ''.join(random.choice(chars) for _ in range(size))
98
99
100def get_duration_seconds(wav_file_path):
101    """Get duration of most recently recorded file.
102
103    Args:
104        wav_file_path: path of the wave file.
105
106    Returns:
107        duration (float): duration of recorded file in seconds.
108    """
109    f = wave.open(wav_file_path, 'r')
110    frames = f.getnframes()
111    rate = f.getframerate()
112    duration = (frames / float(rate))
113    f.close()
114    return duration
115
116
117def wait_until(timeout_sec, condition_func, func_args, expected_value, exception=None, interval_sec=0.5):
118    """Waits until a function returns a expected value or timeout is reached.
119
120    Example usage:
121        ```
122        def is_bluetooth_enabled(device) -> bool:
123          do something and return something...
124
125        # Waits and checks if Bluetooth is turned on.
126        bt_test_utils.wait_until(
127            timeout_sec=10,
128            condition_func=is_bluetooth_enabled,
129            func_args=[dut],
130            expected_value=True,
131            exception=signals.TestFailure('Failed to turn on Bluetooth.'),
132            interval_sec=1)
133        ```
134
135    Args:
136        timeout_sec: float, max waiting time in seconds.
137        condition_func: function, when the condiction function returns the expected
138            value, the waiting mechanism will be interrupted.
139        func_args: tuple or list, the arguments for the condition function.
140        expected_value: a expected value that the condition function returns.
141        exception: Exception, an exception will be raised when timed out if needed.
142        interval_sec: float, interval time between calls of the condition function
143            in seconds.
144
145    Returns:
146        True if the function returns the expected value else False.
147    """
148    start_time = time.time()
149    end_time = start_time + timeout_sec
150    while time.time() < end_time:
151        if condition_func(*func_args) == expected_value:
152            return True
153        time.sleep(interval_sec)
154    args_string = ', '.join(list(map(str, func_args)))
155    log.warning('Timed out after %.1fs waiting for "%s(%s)" to be "%s".', timeout_sec, condition_func.__name__,
156                args_string, expected_value)
157    if exception:
158        raise exception
159    return False
160
161
162def write_read_verify_data_sl4a(client_ad, server_ad, msg, binary=False):
163    """Verify that the client wrote data to the server Android device correctly.
164
165    Args:
166        client_ad: the Android device to perform the write.
167        server_ad: the Android device to read the data written.
168        msg: the message to write.
169        binary: if the msg arg is binary or not.
170
171    Returns:
172        True if the data written matches the data read, false if not.
173    """
174    client_ad.log.info('Write message %s.', msg)
175    if binary:
176        client_ad.sl4a.bluetoothSocketConnWriteBinary(msg)
177    else:
178        client_ad.sl4a.bluetoothSocketConnWrite(msg)
179    server_ad.log.info('Read message %s.', msg)
180    if binary:
181        read_msg = server_ad.sl4a.bluetoothSocketConnReadBinary().rstrip('\r\n')
182    else:
183        read_msg = server_ad.sl4a.bluetoothSocketConnRead()
184    log.info('Verify message.')
185    if msg != read_msg:
186        log.error('Mismatch! Read: %s, Expected: %s', read_msg, msg)
187        return False
188    log.info('Matched! Read: %s, Expected: %s', read_msg, msg)
189    return True
190
191
192def write_record_file(file_name, audio_params, frames):
193    """Writes the recorded audio into the file.
194
195    Args:
196        file_name: The file name for writing the recorded audio.
197        audio_params: A dict with audio configuration.
198        frames: Recorded audio frames.
199    """
200    log.debug('writing frame to %s', file_name)
201    wf = wave.open(file_name, 'wb')
202    wf.setnchannels(audio_params['channel'])
203    wf.setsampwidth(audio_params.get('sample_width', 1))
204    wf.setframerate(audio_params['sample_rate'])
205    wf.writeframes(frames)
206    wf.close()
207
208
209def get_mac_address_of_generic_advertisement(scan_device, adv_device, adv_addr_type=None):
210    """Start generic advertisement and get it's mac address by LE scanning.
211
212    Args:
213        scan_ad: The Android device to use as the scanner.
214        adv_device: The Android device to use as the advertiser.
215        adv_addr_type: The address type for the advertiser (refer to AdvertiseSettings.java)
216
217    Returns:
218        mac_address: The mac address of the advertisement.
219        advertise_callback: The advertise callback id of the active
220            advertisement.
221    """
222    adv_device.sl4a.bleSetAdvertiseDataIncludeDeviceName(True)
223    adv_device.sl4a.bleSetAdvertiseSettingsAdvertiseMode(ble_advertise_settings_modes['low_latency'])
224    adv_device.sl4a.bleSetAdvertiseSettingsIsConnectable(True)
225    adv_device.sl4a.bleSetAdvertiseSettingsTxPowerLevel(ble_advertise_settings_tx_powers['high'])
226
227    if adv_addr_type is not None:
228        adv_device.sl4a.bleSetAdvertiseSettingsOwnAddressType(adv_addr_type)
229
230    advertise_callback, advertise_data, advertise_settings = (generate_ble_advertise_objects(adv_device.sl4a))
231    adv_device.sl4a.bleStartBleAdvertising(advertise_callback, advertise_data, advertise_settings)
232    try:
233        adv_device.ed.pop_event(adv_succ.format(advertise_callback), bt_default_timeout)
234    except Empty as err:
235        raise BtTestUtilsError("Advertiser did not start successfully {}".format(err))
236    filter_list = scan_device.sl4a.bleGenFilterList()
237    scan_settings = scan_device.sl4a.bleBuildScanSetting()
238    scan_callback = scan_device.sl4a.bleGenScanCallback()
239    scan_device.sl4a.bleSetScanFilterDeviceName(adv_device.sl4a.bluetoothGetLocalName())
240    scan_device.sl4a.bleBuildScanFilter(filter_list)
241    scan_device.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
242    try:
243        event = scan_device.sl4a.ed.pop_event("BleScan{}onScanResults".format(scan_callback), bt_default_timeout)
244    except Empty as err:
245        raise BtTestUtilsError("Scanner did not find advertisement {}".format(err))
246    mac_address = event['data']['Result']['deviceInfo']['address']
247    return mac_address, advertise_callback, scan_callback
248
249
250def clear_bonded_devices(ad: AndroidDevice):
251    """Clear bonded devices from the input Android device.
252
253    Args:
254        ad: the Android device performing the connection.
255    Returns:
256        True if clearing bonded devices was successful, false if unsuccessful.
257    """
258    bonded_device_list = ad.sl4a.bluetoothGetBondedDevices()
259    while bonded_device_list:
260        device_address = bonded_device_list[0]['address']
261        if not ad.sl4a.bluetoothUnbond(device_address):
262            ad.log.error("Failed to unbond {} from {}".format(device_address, ad.serial))
263            return False
264        ad.log.info("Successfully unbonded {} from {}".format(device_address, ad.serial))
265        #TODO: wait for BOND_STATE_CHANGED intent instead of waiting
266        time.sleep(1)
267
268        # If device was first connected using LE transport, after bonding it is
269        # accessible through it's LE address, and through it classic address.
270        # Unbonding it will unbond two devices representing different
271        # "addresses". Attempt to unbond such already unbonded devices will
272        # result in bluetoothUnbond returning false.
273        bonded_device_list = ad.sl4a.bluetoothGetBondedDevices()
274    return True
275