1#!/usr/bin/env python3
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16
17import logging
18import os
19import random
20import re
21import string
22import threading
23import time
24from queue import Empty
25from subprocess import call
26
27from acts import asserts
28from acts.test_utils.bt.bt_constants import adv_fail
29from acts.test_utils.bt.bt_constants import adv_succ
30from acts.test_utils.bt.bt_constants import batch_scan_not_supported_list
31from acts.test_utils.bt.bt_constants import bits_per_samples
32from acts.test_utils.bt.bt_constants import ble_advertise_settings_modes
33from acts.test_utils.bt.bt_constants import ble_advertise_settings_tx_powers
34from acts.test_utils.bt.bt_constants import bluetooth_a2dp_codec_config_changed
35from acts.test_utils.bt.bt_constants import bluetooth_off
36from acts.test_utils.bt.bt_constants import bluetooth_on
37from acts.test_utils.bt.bt_constants import \
38    bluetooth_profile_connection_state_changed
39from acts.test_utils.bt.bt_constants import bluetooth_socket_conn_test_uuid
40from acts.test_utils.bt.bt_constants import bt_default_timeout
41from acts.test_utils.bt.bt_constants import bt_profile_constants
42from acts.test_utils.bt.bt_constants import bt_profile_states
43from acts.test_utils.bt.bt_constants import bt_rfcomm_uuids
44from acts.test_utils.bt.bt_constants import bt_scan_mode_types
45from acts.test_utils.bt.bt_constants import btsnoop_last_log_path_on_device
46from acts.test_utils.bt.bt_constants import btsnoop_log_path_on_device
47from acts.test_utils.bt.bt_constants import channel_modes
48from acts.test_utils.bt.bt_constants import codec_types
49from acts.test_utils.bt.bt_constants import default_bluetooth_socket_timeout_ms
50from acts.test_utils.bt.bt_constants import default_rfcomm_timeout_ms
51from acts.test_utils.bt.bt_constants import hid_id_keyboard
52from acts.test_utils.bt.bt_constants import pairing_variant_passkey_confirmation
53from acts.test_utils.bt.bt_constants import pan_connect_timeout
54from acts.test_utils.bt.bt_constants import sample_rates
55from acts.test_utils.bt.bt_constants import scan_result
56from acts.test_utils.bt.bt_constants import sig_uuid_constants
57from acts.test_utils.bt.bt_constants import small_timeout
58from acts.test_utils.tel.tel_test_utils import toggle_airplane_mode_by_adb
59from acts.test_utils.tel.tel_test_utils import verify_http_connection
60from acts.utils import exe_cmd
61
62from acts import utils
63
64log = logging
65
66advertisements_to_devices = {}
67
68
69class BtTestUtilsError(Exception):
70    pass
71
72
73def _add_android_device_to_dictionary(android_device, profile_list,
74                                      selector_dict):
75    """Adds the AndroidDevice and supported features to the selector dictionary
76
77    Args:
78        android_device: The Android device.
79        profile_list: The list of profiles the Android device supports.
80    """
81    for profile in profile_list:
82        if profile in selector_dict and android_device not in selector_dict[
83                profile]:
84            selector_dict[profile].append(android_device)
85        else:
86            selector_dict[profile] = [android_device]
87
88
89def bluetooth_enabled_check(ad, timeout_sec=5):
90    """Checks if the Bluetooth state is enabled, if not it will attempt to
91    enable it.
92
93    Args:
94        ad: The Android device list to enable Bluetooth on.
95        timeout_sec: number of seconds to wait for toggle to take effect.
96
97    Returns:
98        True if successful, false if unsuccessful.
99    """
100    if not ad.droid.bluetoothCheckState():
101        ad.droid.bluetoothToggleState(True)
102        expected_bluetooth_on_event_name = bluetooth_on
103        try:
104            ad.ed.pop_event(expected_bluetooth_on_event_name,
105                            bt_default_timeout)
106        except Empty:
107            ad.log.info(
108                "Failed to toggle Bluetooth on(no broadcast received).")
109            # Try one more time to poke at the actual state.
110            if ad.droid.bluetoothCheckState():
111                ad.log.info(".. actual state is ON")
112                return True
113            ad.log.error(".. actual state is OFF")
114            return False
115    end_time = time.time() + timeout_sec
116    while not ad.droid.bluetoothCheckState() and time.time() < end_time:
117        time.sleep(1)
118    return ad.droid.bluetoothCheckState()
119
120
121def check_device_supported_profiles(droid):
122    """Checks for Android device supported profiles.
123
124    Args:
125        droid: The droid object to query.
126
127    Returns:
128        A dictionary of supported profiles.
129    """
130    profile_dict = {}
131    profile_dict['hid'] = droid.bluetoothHidIsReady()
132    profile_dict['hsp'] = droid.bluetoothHspIsReady()
133    profile_dict['a2dp'] = droid.bluetoothA2dpIsReady()
134    profile_dict['avrcp'] = droid.bluetoothAvrcpIsReady()
135    profile_dict['a2dp_sink'] = droid.bluetoothA2dpSinkIsReady()
136    profile_dict['hfp_client'] = droid.bluetoothHfpClientIsReady()
137    profile_dict['pbap_client'] = droid.bluetoothPbapClientIsReady()
138    return profile_dict
139
140
141def cleanup_scanners_and_advertisers(scn_android_device, scn_callback_list,
142                                     adv_android_device, adv_callback_list):
143    """Try to gracefully stop all scanning and advertising instances.
144
145    Args:
146        scn_android_device: The Android device that is actively scanning.
147        scn_callback_list: The scan callback id list that needs to be stopped.
148        adv_android_device: The Android device that is actively advertising.
149        adv_callback_list: The advertise callback id list that needs to be
150            stopped.
151    """
152    scan_droid, scan_ed = scn_android_device.droid, scn_android_device.ed
153    adv_droid = adv_android_device.droid
154    try:
155        for scan_callback in scn_callback_list:
156            scan_droid.bleStopBleScan(scan_callback)
157    except Exception as err:
158        scn_android_device.log.debug(
159            "Failed to stop LE scan... reseting Bluetooth. Error {}".format(
160                err))
161        reset_bluetooth([scn_android_device])
162    try:
163        for adv_callback in adv_callback_list:
164            adv_droid.bleStopBleAdvertising(adv_callback)
165    except Exception as err:
166        adv_android_device.log.debug(
167            "Failed to stop LE advertisement... reseting Bluetooth. Error {}".
168            format(err))
169        reset_bluetooth([adv_android_device])
170
171
172def clear_bonded_devices(ad):
173    """Clear bonded devices from the input Android device.
174
175    Args:
176        ad: the Android device performing the connection.
177    Returns:
178        True if clearing bonded devices was successful, false if unsuccessful.
179    """
180    bonded_device_list = ad.droid.bluetoothGetBondedDevices()
181    while bonded_device_list:
182        device_address = bonded_device_list[0]['address']
183        if not ad.droid.bluetoothUnbond(device_address):
184            log.error("Failed to unbond {} from {}".format(
185                device_address, ad.serial))
186            return False
187        log.info("Successfully unbonded {} from {}".format(
188            device_address, ad.serial))
189        #TODO: wait for BOND_STATE_CHANGED intent instead of waiting
190        time.sleep(1)
191
192        # If device was first connected using LE transport, after bonding it is
193        # accessible through it's LE address, and through it classic address.
194        # Unbonding it will unbond two devices representing different
195        # "addresses". Attempt to unbond such already unbonded devices will
196        # result in bluetoothUnbond returning false.
197        bonded_device_list = ad.droid.bluetoothGetBondedDevices()
198    return True
199
200
201def connect_phone_to_headset(android,
202                             headset,
203                             timeout=bt_default_timeout,
204                             connection_check_period=10):
205    """Connects android phone to bluetooth headset.
206    Headset object must have methods power_on and enter_pairing_mode,
207    and attribute mac_address.
208
209    Args:
210        android: AndroidDevice object with SL4A installed.
211        headset: Object with attribute mac_address and methods power_on and
212            enter_pairing_mode.
213        timeout: Seconds to wait for devices to connect.
214        connection_check_period: how often to check for connection once the
215            SL4A connect RPC has been sent.
216    Returns:
217        connected (bool): True if devices are paired and connected by end of
218        method. False otherwise.
219    """
220    headset_mac_address = headset.mac_address
221    connected = is_a2dp_src_device_connected(android, headset_mac_address)
222    log.info('Devices connected before pair attempt: %s' % connected)
223    if not connected:
224        # Turn on headset and initiate pairing mode.
225        headset.enter_pairing_mode()
226        android.droid.bluetoothStartPairingHelper()
227    start_time = time.time()
228    # If already connected, skip pair and connect attempt.
229    while not connected and (time.time() - start_time < timeout):
230        bonded_info = android.droid.bluetoothGetBondedDevices()
231        if headset.mac_address not in [
232                info["address"] for info in bonded_info
233        ]:
234            # Use SL4A to pair and connect with headset.
235            headset.enter_pairing_mode()
236            android.droid.bluetoothDiscoverAndBond(headset_mac_address)
237        else:  # Device is bonded but not connected
238            android.droid.bluetoothConnectBonded(headset_mac_address)
239        log.info('Waiting for connection...')
240        time.sleep(connection_check_period)
241        # Check for connection.
242        connected = is_a2dp_src_device_connected(android, headset_mac_address)
243    log.info('Devices connected after pair attempt: %s' % connected)
244    return connected
245
246
247def connect_pri_to_sec(pri_ad, sec_ad, profiles_set, attempts=2):
248    """Connects pri droid to secondary droid.
249
250    Args:
251        pri_ad: AndroidDroid initiating connection
252        sec_ad: AndroidDroid accepting connection
253        profiles_set: Set of profiles to be connected
254        attempts: Number of attempts to try until failure.
255
256    Returns:
257        Pass if True
258        Fail if False
259    """
260    device_addr = sec_ad.droid.bluetoothGetLocalAddress()
261    # Allows extra time for the SDP records to be updated.
262    time.sleep(2)
263    curr_attempts = 0
264    while curr_attempts < attempts:
265        log.info("connect_pri_to_sec curr attempt {} total {}".format(
266            curr_attempts, attempts))
267        if _connect_pri_to_sec(pri_ad, sec_ad, profiles_set):
268            return True
269        curr_attempts += 1
270    log.error("connect_pri_to_sec failed to connect after {} attempts".format(
271        attempts))
272    return False
273
274
275def _connect_pri_to_sec(pri_ad, sec_ad, profiles_set):
276    """Connects pri droid to secondary droid.
277
278    Args:
279        pri_ad: AndroidDroid initiating connection.
280        sec_ad: AndroidDroid accepting connection.
281        profiles_set: Set of profiles to be connected.
282
283    Returns:
284        True of connection is successful, false if unsuccessful.
285    """
286    # Check if we support all profiles.
287    supported_profiles = bt_profile_constants.values()
288    for profile in profiles_set:
289        if profile not in supported_profiles:
290            pri_ad.log.info("Profile {} is not supported list {}".format(
291                profile, supported_profiles))
292            return False
293
294    # First check that devices are bonded.
295    paired = False
296    for paired_device in pri_ad.droid.bluetoothGetBondedDevices():
297        if paired_device['address'] == \
298                sec_ad.droid.bluetoothGetLocalAddress():
299            paired = True
300            break
301
302    if not paired:
303        pri_ad.log.error("Not paired to {}".format(sec_ad.serial))
304        return False
305
306    # Now try to connect them, the following call will try to initiate all
307    # connections.
308    pri_ad.droid.bluetoothConnectBonded(
309        sec_ad.droid.bluetoothGetLocalAddress())
310
311    end_time = time.time() + 10
312    profile_connected = set()
313    sec_addr = sec_ad.droid.bluetoothGetLocalAddress()
314    pri_ad.log.info("Profiles to be connected {}".format(profiles_set))
315    # First use APIs to check profile connection state
316    while (time.time() < end_time
317           and not profile_connected.issuperset(profiles_set)):
318        if (bt_profile_constants['headset_client'] not in profile_connected
319                and bt_profile_constants['headset_client'] in profiles_set):
320            if is_hfp_client_device_connected(pri_ad, sec_addr):
321                profile_connected.add(bt_profile_constants['headset_client'])
322        if (bt_profile_constants['a2dp'] not in profile_connected
323                and bt_profile_constants['a2dp'] in profiles_set):
324            if is_a2dp_src_device_connected(pri_ad, sec_addr):
325                profile_connected.add(bt_profile_constants['a2dp'])
326        if (bt_profile_constants['a2dp_sink'] not in profile_connected
327                and bt_profile_constants['a2dp_sink'] in profiles_set):
328            if is_a2dp_snk_device_connected(pri_ad, sec_addr):
329                profile_connected.add(bt_profile_constants['a2dp_sink'])
330        if (bt_profile_constants['map_mce'] not in profile_connected
331                and bt_profile_constants['map_mce'] in profiles_set):
332            if is_map_mce_device_connected(pri_ad, sec_addr):
333                profile_connected.add(bt_profile_constants['map_mce'])
334        if (bt_profile_constants['map'] not in profile_connected
335                and bt_profile_constants['map'] in profiles_set):
336            if is_map_mse_device_connected(pri_ad, sec_addr):
337                profile_connected.add(bt_profile_constants['map'])
338        time.sleep(0.1)
339    # If APIs fail, try to find the connection broadcast receiver.
340    while not profile_connected.issuperset(profiles_set):
341        try:
342            profile_event = pri_ad.ed.pop_event(
343                bluetooth_profile_connection_state_changed,
344                bt_default_timeout + 10)
345            pri_ad.log.info("Got event {}".format(profile_event))
346        except Exception:
347            pri_ad.log.error("Did not get {} profiles left {}".format(
348                bluetooth_profile_connection_state_changed, profile_connected))
349            return False
350
351        profile = profile_event['data']['profile']
352        state = profile_event['data']['state']
353        device_addr = profile_event['data']['addr']
354        if state == bt_profile_states['connected'] and \
355                device_addr == sec_ad.droid.bluetoothGetLocalAddress():
356            profile_connected.add(profile)
357        pri_ad.log.info(
358            "Profiles connected until now {}".format(profile_connected))
359    # Failure happens inside the while loop. If we came here then we already
360    # connected.
361    return True
362
363
364def determine_max_advertisements(android_device):
365    """Determines programatically how many advertisements the Android device
366    supports.
367
368    Args:
369        android_device: The Android device to determine max advertisements of.
370
371    Returns:
372        The maximum advertisement count.
373    """
374    android_device.log.info(
375        "Determining number of maximum concurrent advertisements...")
376    advertisement_count = 0
377    bt_enabled = False
378    expected_bluetooth_on_event_name = bluetooth_on
379    if not android_device.droid.bluetoothCheckState():
380        android_device.droid.bluetoothToggleState(True)
381    try:
382        android_device.ed.pop_event(expected_bluetooth_on_event_name,
383                                    bt_default_timeout)
384    except Exception:
385        android_device.log.info(
386            "Failed to toggle Bluetooth on(no broadcast received).")
387        # Try one more time to poke at the actual state.
388        if android_device.droid.bluetoothCheckState() is True:
389            android_device.log.info(".. actual state is ON")
390        else:
391            android_device.log.error(
392                "Failed to turn Bluetooth on. Setting default advertisements to 1"
393            )
394            advertisement_count = -1
395            return advertisement_count
396    advertise_callback_list = []
397    advertise_data = android_device.droid.bleBuildAdvertiseData()
398    advertise_settings = android_device.droid.bleBuildAdvertiseSettings()
399    while (True):
400        advertise_callback = android_device.droid.bleGenBleAdvertiseCallback()
401        advertise_callback_list.append(advertise_callback)
402
403        android_device.droid.bleStartBleAdvertising(advertise_callback,
404                                                    advertise_data,
405                                                    advertise_settings)
406
407        regex = "(" + adv_succ.format(
408            advertise_callback) + "|" + adv_fail.format(
409                advertise_callback) + ")"
410        # wait for either success or failure event
411        evt = android_device.ed.pop_events(regex, bt_default_timeout,
412                                           small_timeout)
413        if evt[0]["name"] == adv_succ.format(advertise_callback):
414            advertisement_count += 1
415            android_device.log.info(
416                "Advertisement {} started.".format(advertisement_count))
417        else:
418            error = evt[0]["data"]["Error"]
419            if error == "ADVERTISE_FAILED_TOO_MANY_ADVERTISERS":
420                android_device.log.info(
421                    "Advertisement failed to start. Reached max " +
422                    "advertisements at {}".format(advertisement_count))
423                break
424            else:
425                raise BtTestUtilsError(
426                    "Expected ADVERTISE_FAILED_TOO_MANY_ADVERTISERS," +
427                    " but received bad error code {}".format(error))
428    try:
429        for adv in advertise_callback_list:
430            android_device.droid.bleStopBleAdvertising(adv)
431    except Exception:
432        android_device.log.error(
433            "Failed to stop advertisingment, resetting Bluetooth.")
434        reset_bluetooth([android_device])
435    return advertisement_count
436
437
438def disable_bluetooth(droid):
439    """Disable Bluetooth on input Droid object.
440
441    Args:
442        droid: The droid object to disable Bluetooth on.
443
444    Returns:
445        True if successful, false if unsuccessful.
446    """
447    if droid.bluetoothCheckState() is True:
448        droid.bluetoothToggleState(False)
449        if droid.bluetoothCheckState() is True:
450            log.error("Failed to toggle Bluetooth off.")
451            return False
452    return True
453
454
455def disconnect_pri_from_sec(pri_ad, sec_ad, profiles_list):
456    """
457    Disconnect primary from secondary on a specific set of profiles
458    Args:
459        pri_ad - Primary android_device initiating disconnection
460        sec_ad - Secondary android droid (sl4a interface to keep the
461          method signature the same connect_pri_to_sec above)
462        profiles_list - List of profiles we want to disconnect from
463
464    Returns:
465        True on Success
466        False on Failure
467    """
468    # Sanity check to see if all the profiles in the given set is supported
469    supported_profiles = bt_profile_constants.values()
470    for profile in profiles_list:
471        if profile not in supported_profiles:
472            pri_ad.log.info("Profile {} is not in supported list {}".format(
473                profile, supported_profiles))
474            return False
475
476    pri_ad.log.info(pri_ad.droid.bluetoothGetBondedDevices())
477    # Disconnecting on a already disconnected profile is a nop,
478    # so not checking for the connection state
479    try:
480        pri_ad.droid.bluetoothDisconnectConnectedProfile(
481            sec_ad.droid.bluetoothGetLocalAddress(), profiles_list)
482    except Exception as err:
483        pri_ad.log.error(
484            "Exception while trying to disconnect profile(s) {}: {}".format(
485                profiles_list, err))
486        return False
487
488    profile_disconnected = set()
489    pri_ad.log.info("Disconnecting from profiles: {}".format(profiles_list))
490
491    while not profile_disconnected.issuperset(profiles_list):
492        try:
493            profile_event = pri_ad.ed.pop_event(
494                bluetooth_profile_connection_state_changed, bt_default_timeout)
495            pri_ad.log.info("Got event {}".format(profile_event))
496        except Exception as e:
497            pri_ad.log.error(
498                "Did not disconnect from Profiles. Reason {}".format(e))
499            return False
500
501        profile = profile_event['data']['profile']
502        state = profile_event['data']['state']
503        device_addr = profile_event['data']['addr']
504
505        if state == bt_profile_states['disconnected'] and \
506                device_addr == sec_ad.droid.bluetoothGetLocalAddress():
507            profile_disconnected.add(profile)
508        pri_ad.log.info(
509            "Profiles disconnected so far {}".format(profile_disconnected))
510
511    return True
512
513
514def enable_bluetooth(droid, ed):
515    if droid.bluetoothCheckState() is True:
516        return True
517
518    droid.bluetoothToggleState(True)
519    expected_bluetooth_on_event_name = bluetooth_on
520    try:
521        ed.pop_event(expected_bluetooth_on_event_name, bt_default_timeout)
522    except Exception:
523        log.info("Failed to toggle Bluetooth on (no broadcast received)")
524        if droid.bluetoothCheckState() is True:
525            log.info(".. actual state is ON")
526            return True
527        log.info(".. actual state is OFF")
528        return False
529
530    return True
531
532
533def factory_reset_bluetooth(android_devices):
534    """Clears Bluetooth stack of input Android device list.
535
536        Args:
537            android_devices: The Android device list to reset Bluetooth
538
539        Returns:
540            True if successful, false if unsuccessful.
541        """
542    for a in android_devices:
543        droid, ed = a.droid, a.ed
544        a.log.info("Reset state of bluetooth on device.")
545        if not bluetooth_enabled_check(a):
546            return False
547        # TODO: remove device unbond b/79418045
548        # Temporary solution to ensure all devices are unbonded
549        bonded_devices = droid.bluetoothGetBondedDevices()
550        for b in bonded_devices:
551            a.log.info("Removing bond for device {}".format(b['address']))
552            droid.bluetoothUnbond(b['address'])
553
554        droid.bluetoothFactoryReset()
555        wait_for_bluetooth_manager_state(droid)
556        if not enable_bluetooth(droid, ed):
557            return False
558    return True
559
560
561def generate_ble_advertise_objects(droid):
562    """Generate generic LE advertise objects.
563
564    Args:
565        droid: The droid object to generate advertise LE objects from.
566
567    Returns:
568        advertise_callback: The generated advertise callback id.
569        advertise_data: The generated advertise data id.
570        advertise_settings: The generated advertise settings id.
571    """
572    advertise_callback = droid.bleGenBleAdvertiseCallback()
573    advertise_data = droid.bleBuildAdvertiseData()
574    advertise_settings = droid.bleBuildAdvertiseSettings()
575    return advertise_callback, advertise_data, advertise_settings
576
577
578def generate_ble_scan_objects(droid):
579    """Generate generic LE scan objects.
580
581    Args:
582        droid: The droid object to generate LE scan objects from.
583
584    Returns:
585        filter_list: The generated scan filter list id.
586        scan_settings: The generated scan settings id.
587        scan_callback: The generated scan callback id.
588    """
589    filter_list = droid.bleGenFilterList()
590    scan_settings = droid.bleBuildScanSetting()
591    scan_callback = droid.bleGenScanCallback()
592    return filter_list, scan_settings, scan_callback
593
594
595def generate_id_by_size(size,
596                        chars=(string.ascii_lowercase +
597                               string.ascii_uppercase + string.digits)):
598    """Generate random ascii characters of input size and input char types
599
600    Args:
601        size: Input size of string.
602        chars: (Optional) Chars to use in generating a random string.
603
604    Returns:
605        String of random input chars at the input size.
606    """
607    return ''.join(random.choice(chars) for _ in range(size))
608
609
610def get_advanced_droid_list(android_devices):
611    """Add max_advertisement and batch_scan_supported attributes to input
612    Android devices
613
614    This will programatically determine maximum LE advertisements of each
615    input Android device.
616
617    Args:
618        android_devices: The Android devices to setup.
619
620    Returns:
621        List of Android devices with new attribtues.
622    """
623    droid_list = []
624    for a in android_devices:
625        d, e = a.droid, a.ed
626        model = d.getBuildModel()
627        max_advertisements = 1
628        batch_scan_supported = True
629        if model in advertisements_to_devices.keys():
630            max_advertisements = advertisements_to_devices[model]
631        else:
632            max_advertisements = determine_max_advertisements(a)
633            max_tries = 3
634            # Retry to calculate max advertisements
635            while max_advertisements == -1 and max_tries > 0:
636                a.log.info(
637                    "Attempts left to determine max advertisements: {}".format(
638                        max_tries))
639                max_advertisements = determine_max_advertisements(a)
640                max_tries -= 1
641            advertisements_to_devices[model] = max_advertisements
642        if model in batch_scan_not_supported_list:
643            batch_scan_supported = False
644        role = {
645            'droid': d,
646            'ed': e,
647            'max_advertisements': max_advertisements,
648            'batch_scan_supported': batch_scan_supported
649        }
650        droid_list.append(role)
651    return droid_list
652
653
654def get_bluetooth_crash_count(android_device):
655    out = android_device.adb.shell("dumpsys bluetooth_manager")
656    return int(re.search("crashed(.*\d)", out).group(1))
657
658
659def get_bt_metric(ad_list, duration=1, tag="bt_metric", processed=True):
660    """ Function to get the bt metric from logcat.
661
662    Captures logcat for the specified duration and returns the bqr results.
663    Takes list of android objects as input. If a single android object is given,
664    converts it into a list.
665
666    Args:
667        ad_list: list of android_device objects
668        duration: time duration (seconds) for which the logcat is parsed.
669        tag: tag to be appended to the logcat dump.
670        processed: flag to process bqr output.
671
672    Returns:
673        metrics_dict: dict of metrics for each android device.
674    """
675
676    # Defining bqr quantitites and their regex to extract
677    regex_dict = {"pwlv": "PwLv:\s(\S+)", "rssi": "RSSI:\s[-](\d+)"}
678    metrics_dict = {"rssi": {}, "pwlv": {}}
679
680    # Converting a single android device object to list
681    if not isinstance(ad_list, list):
682        ad_list = [ad_list]
683
684    #Time sync with the test machine
685    for ad in ad_list:
686        ad.droid.setTime(int(round(time.time() * 1000)))
687        time.sleep(0.5)
688
689    begin_time = utils.get_current_epoch_time()
690    time.sleep(duration)
691    end_time = utils.get_current_epoch_time()
692
693    for ad in ad_list:
694        bt_rssi_log = ad.cat_adb_log(tag, begin_time, end_time)
695        bqr_tag = "Monitoring , Handle:"
696
697        # Extracting supporting bqr quantities
698        for metric, regex in regex_dict.items():
699            bqr_metric = []
700            file_bt_log = open(bt_rssi_log, "r")
701            for line in file_bt_log:
702                if bqr_tag in line:
703                    if re.findall(regex, line):
704                        m = re.findall(regex, line)[0].strip(",")
705                        bqr_metric.append(m)
706            metrics_dict[metric][ad.serial] = bqr_metric
707
708        # Formatting the raw data
709        metrics_dict["rssi"][ad.serial] = [
710            (-1) * int(x) for x in metrics_dict["rssi"][ad.serial]
711        ]
712        metrics_dict["pwlv"][ad.serial] = [
713            int(x, 16) for x in metrics_dict["pwlv"][ad.serial]
714        ]
715
716        # Processing formatted data if processing is required
717        if processed:
718            # Computes the average RSSI
719            metrics_dict["rssi"][ad.serial] = round(
720                sum(metrics_dict["rssi"][ad.serial]) /
721                len(metrics_dict["rssi"][ad.serial]), 2)
722            # Returns last noted value for power level
723            metrics_dict["pwlv"][ad.serial] = metrics_dict["pwlv"][
724                ad.serial][-1]
725
726    return metrics_dict
727
728
729def get_bt_rssi(ad, duration=1, processed=True):
730    """Function to get average bt rssi from logcat.
731
732    This function returns the average RSSI for the given duration. RSSI values are
733    extracted from BQR.
734
735    Args:
736        ad: (list of) android_device object.
737        duration: time duration(seconds) for which logcat is parsed.
738
739    Returns:
740        avg_rssi: average RSSI on each android device for the given duration.
741    """
742    function_tag = "get_bt_rssi"
743    bqr_results = get_bt_metric(ad,
744                                duration,
745                                tag=function_tag,
746                                processed=processed)
747    return bqr_results["rssi"]
748
749
750def enable_bqr(
751        ad_list,
752        bqr_interval=10,
753        bqr_event_mask=15,
754):
755    """Sets up BQR reporting.
756
757       Sets up BQR to report BT metrics at the requested frequency and toggles
758       airplane mode for the bqr settings to take effect.
759
760    Args:
761        ad_list: an android_device or list of android devices.
762    """
763    # Converting a single android device object to list
764    if not isinstance(ad_list, list):
765        ad_list = [ad_list]
766
767    for ad in ad_list:
768        #Setting BQR parameters
769        ad.adb.shell("setprop persist.bluetooth.bqr.event_mask {}".format(
770            bqr_event_mask))
771        ad.adb.shell("setprop persist.bluetooth.bqr.min_interval_ms {}".format(
772            bqr_interval))
773
774        ## Toggle airplane mode
775        ad.droid.connectivityToggleAirplaneMode(True)
776        ad.droid.connectivityToggleAirplaneMode(False)
777
778
779def disable_bqr(ad_list):
780    """Disables BQR reporting.
781
782    Args:
783        ad_list: an android_device or list of android devices.
784    """
785    # Converting a single android device object to list
786    if not isinstance(ad_list, list):
787        ad_list = [ad_list]
788
789    DISABLE_BQR_MASK = 0
790
791    for ad in ad_list:
792        #Disabling BQR
793        ad.adb.shell("setprop persist.bluetooth.bqr.event_mask {}".format(
794            DISABLE_BQR_MASK))
795
796        ## Toggle airplane mode
797        ad.droid.connectivityToggleAirplaneMode(True)
798        ad.droid.connectivityToggleAirplaneMode(False)
799
800
801def get_device_selector_dictionary(android_device_list):
802    """Create a dictionary of Bluetooth features vs Android devices.
803
804    Args:
805        android_device_list: The list of Android devices.
806    Returns:
807        A dictionary of profiles/features to Android devices.
808    """
809    selector_dict = {}
810    for ad in android_device_list:
811        uuids = ad.droid.bluetoothGetLocalUuids()
812
813        for profile, uuid_const in sig_uuid_constants.items():
814            uuid_check = sig_uuid_constants['BASE_UUID'].format(
815                uuid_const).lower()
816            if uuids and uuid_check in uuids:
817                if profile in selector_dict:
818                    selector_dict[profile].append(ad)
819                else:
820                    selector_dict[profile] = [ad]
821
822        # Various services may not be active during BT startup.
823        # If the device can be identified through adb shell pm list features
824        # then try to add them to the appropriate profiles / features.
825
826        # Android TV.
827        if "feature:com.google.android.tv.installed" in ad.features:
828            ad.log.info("Android TV device found.")
829            supported_profiles = ['AudioSink']
830            _add_android_device_to_dictionary(ad, supported_profiles,
831                                              selector_dict)
832
833        # Android Auto
834        elif "feature:android.hardware.type.automotive" in ad.features:
835            ad.log.info("Android Auto device found.")
836            # Add: AudioSink , A/V_RemoteControl,
837            supported_profiles = [
838                'AudioSink', 'A/V_RemoteControl', 'Message Notification Server'
839            ]
840            _add_android_device_to_dictionary(ad, supported_profiles,
841                                              selector_dict)
842        # Android Wear
843        elif "feature:android.hardware.type.watch" in ad.features:
844            ad.log.info("Android Wear device found.")
845            supported_profiles = []
846            _add_android_device_to_dictionary(ad, supported_profiles,
847                                              selector_dict)
848        # Android Phone
849        elif "feature:android.hardware.telephony" in ad.features:
850            ad.log.info("Android Phone device found.")
851            # Add: AudioSink
852            supported_profiles = [
853                'AudioSource', 'A/V_RemoteControlTarget',
854                'Message Access Server'
855            ]
856            _add_android_device_to_dictionary(ad, supported_profiles,
857                                              selector_dict)
858    return selector_dict
859
860
861def get_mac_address_of_generic_advertisement(scan_ad, adv_ad):
862    """Start generic advertisement and get it's mac address by LE scanning.
863
864    Args:
865        scan_ad: The Android device to use as the scanner.
866        adv_ad: The Android device to use as the advertiser.
867
868    Returns:
869        mac_address: The mac address of the advertisement.
870        advertise_callback: The advertise callback id of the active
871            advertisement.
872    """
873    adv_ad.droid.bleSetAdvertiseDataIncludeDeviceName(True)
874    adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode(
875        ble_advertise_settings_modes['low_latency'])
876    adv_ad.droid.bleSetAdvertiseSettingsIsConnectable(True)
877    adv_ad.droid.bleSetAdvertiseSettingsTxPowerLevel(
878        ble_advertise_settings_tx_powers['high'])
879    advertise_callback, advertise_data, advertise_settings = (
880        generate_ble_advertise_objects(adv_ad.droid))
881    adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data,
882                                        advertise_settings)
883    try:
884        adv_ad.ed.pop_event(adv_succ.format(advertise_callback),
885                            bt_default_timeout)
886    except Empty as err:
887        raise BtTestUtilsError(
888            "Advertiser did not start successfully {}".format(err))
889    filter_list = scan_ad.droid.bleGenFilterList()
890    scan_settings = scan_ad.droid.bleBuildScanSetting()
891    scan_callback = scan_ad.droid.bleGenScanCallback()
892    scan_ad.droid.bleSetScanFilterDeviceName(
893        adv_ad.droid.bluetoothGetLocalName())
894    scan_ad.droid.bleBuildScanFilter(filter_list)
895    scan_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback)
896    try:
897        event = scan_ad.ed.pop_event(
898            "BleScan{}onScanResults".format(scan_callback), bt_default_timeout)
899    except Empty as err:
900        raise BtTestUtilsError(
901            "Scanner did not find advertisement {}".format(err))
902    mac_address = event['data']['Result']['deviceInfo']['address']
903    return mac_address, advertise_callback, scan_callback
904
905
906def hid_device_send_key_data_report(host_id, device_ad, key, interval=1):
907    """Send a HID report simulating a 1-second keyboard press from host_ad to
908    device_ad
909
910    Args:
911        host_id: the Bluetooth MAC address or name of the HID host
912        device_ad: HID device
913        key: the key we want to send
914        interval: the interval between key press and key release
915    """
916    device_ad.droid.bluetoothHidDeviceSendReport(host_id, hid_id_keyboard,
917                                                 hid_keyboard_report(key))
918    time.sleep(interval)
919    device_ad.droid.bluetoothHidDeviceSendReport(host_id, hid_id_keyboard,
920                                                 hid_keyboard_report("00"))
921
922
923def hid_keyboard_report(key, modifier="00"):
924    """Get the HID keyboard report for the given key
925
926    Args:
927        key: the key we want
928        modifier: HID keyboard modifier bytes
929    Returns:
930        The byte array for the HID report.
931    """
932    return str(
933        bytearray.fromhex(" ".join(
934            [modifier, "00", key, "00", "00", "00", "00", "00"])), "utf-8")
935
936
937def is_a2dp_connected(sink, source):
938    """
939    Convenience Function to see if the 2 devices are connected on
940    A2dp.
941    Args:
942        sink:       Audio Sink
943        source:     Audio Source
944    Returns:
945        True if Connected
946        False if Not connected
947    """
948
949    devices = sink.droid.bluetoothA2dpSinkGetConnectedDevices()
950    for device in devices:
951        sink.log.info("A2dp Connected device {}".format(device["name"]))
952        if (device["address"] == source.droid.bluetoothGetLocalAddress()):
953            return True
954    return False
955
956
957def is_a2dp_snk_device_connected(ad, addr):
958    """Determines if an AndroidDevice has A2DP snk connectivity to input address
959
960    Args:
961        ad: the Android device
962        addr: the address that's expected
963    Returns:
964        True if connection was successful, false if unsuccessful.
965    """
966    devices = ad.droid.bluetoothA2dpSinkGetConnectedDevices()
967    ad.log.info("Connected A2DP Sink devices: {}".format(devices))
968    if addr in {d['address'] for d in devices}:
969        return True
970    return False
971
972
973def is_a2dp_src_device_connected(ad, addr):
974    """Determines if an AndroidDevice has A2DP connectivity to input address
975
976    Args:
977        ad: the Android device
978        addr: the address that's expected
979    Returns:
980        True if connection was successful, false if unsuccessful.
981    """
982    devices = ad.droid.bluetoothA2dpGetConnectedDevices()
983    ad.log.info("Connected A2DP Source devices: {}".format(devices))
984    if addr in {d['address'] for d in devices}:
985        return True
986    return False
987
988
989def is_hfp_client_device_connected(ad, addr):
990    """Determines if an AndroidDevice has HFP connectivity to input address
991
992    Args:
993        ad: the Android device
994        addr: the address that's expected
995    Returns:
996        True if connection was successful, false if unsuccessful.
997    """
998    devices = ad.droid.bluetoothHfpClientGetConnectedDevices()
999    ad.log.info("Connected HFP Client devices: {}".format(devices))
1000    if addr in {d['address'] for d in devices}:
1001        return True
1002    return False
1003
1004
1005def is_map_mce_device_connected(ad, addr):
1006    """Determines if an AndroidDevice has MAP MCE connectivity to input address
1007
1008    Args:
1009        ad: the Android device
1010        addr: the address that's expected
1011    Returns:
1012        True if connection was successful, false if unsuccessful.
1013    """
1014    devices = ad.droid.bluetoothMapClientGetConnectedDevices()
1015    ad.log.info("Connected MAP MCE devices: {}".format(devices))
1016    if addr in {d['address'] for d in devices}:
1017        return True
1018    return False
1019
1020
1021def is_map_mse_device_connected(ad, addr):
1022    """Determines if an AndroidDevice has MAP MSE connectivity to input address
1023
1024    Args:
1025        ad: the Android device
1026        addr: the address that's expected
1027    Returns:
1028        True if connection was successful, false if unsuccessful.
1029    """
1030    devices = ad.droid.bluetoothMapGetConnectedDevices()
1031    ad.log.info("Connected MAP MSE devices: {}".format(devices))
1032    if addr in {d['address'] for d in devices}:
1033        return True
1034    return False
1035
1036
1037def kill_bluetooth_process(ad):
1038    """Kill Bluetooth process on Android device.
1039
1040    Args:
1041        ad: Android device to kill BT process on.
1042    """
1043    ad.log.info("Killing Bluetooth process.")
1044    pid = ad.adb.shell(
1045        "ps | grep com.android.bluetooth | awk '{print $2}'").decode('ascii')
1046    call(["adb -s " + ad.serial + " shell kill " + pid], shell=True)
1047
1048
1049def log_energy_info(android_devices, state):
1050    """Logs energy info of input Android devices.
1051
1052    Args:
1053        android_devices: input Android device list to log energy info from.
1054        state: the input state to log. Usually 'Start' or 'Stop' for logging.
1055
1056    Returns:
1057        A logging string of the Bluetooth energy info reported.
1058    """
1059    return_string = "{} Energy info collection:\n".format(state)
1060    # Bug: b/31966929
1061    return return_string
1062
1063
1064def orchestrate_and_verify_pan_connection(pan_dut, panu_dut):
1065    """Setups up a PAN conenction between two android devices.
1066
1067    Args:
1068        pan_dut: the Android device providing tethering services
1069        panu_dut: the Android device using the internet connection from the
1070            pan_dut
1071    Returns:
1072        True if PAN connection and verification is successful,
1073        false if unsuccessful.
1074    """
1075    if not toggle_airplane_mode_by_adb(log, panu_dut, True):
1076        panu_dut.log.error("Failed to toggle airplane mode on")
1077        return False
1078    if not toggle_airplane_mode_by_adb(log, panu_dut, False):
1079        pan_dut.log.error("Failed to toggle airplane mode off")
1080        return False
1081    pan_dut.droid.bluetoothStartConnectionStateChangeMonitor("")
1082    panu_dut.droid.bluetoothStartConnectionStateChangeMonitor("")
1083    if not bluetooth_enabled_check(panu_dut):
1084        return False
1085    if not bluetooth_enabled_check(pan_dut):
1086        return False
1087    pan_dut.droid.bluetoothPanSetBluetoothTethering(True)
1088    if not (pair_pri_to_sec(pan_dut, panu_dut)):
1089        return False
1090    if not pan_dut.droid.bluetoothPanIsTetheringOn():
1091        pan_dut.log.error("Failed to enable Bluetooth tethering.")
1092        return False
1093    # Magic sleep needed to give the stack time in between bonding and
1094    # connecting the PAN profile.
1095    time.sleep(pan_connect_timeout)
1096    panu_dut.droid.bluetoothConnectBonded(
1097        pan_dut.droid.bluetoothGetLocalAddress())
1098    if not verify_http_connection(log, panu_dut):
1099        panu_dut.log.error("Can't verify http connection on PANU device.")
1100        if not verify_http_connection(log, pan_dut):
1101            pan_dut.log.info(
1102                "Can't verify http connection on PAN service device")
1103        return False
1104    return True
1105
1106
1107def orchestrate_bluetooth_socket_connection(
1108        client_ad,
1109        server_ad,
1110        accept_timeout_ms=default_bluetooth_socket_timeout_ms,
1111        uuid=None):
1112    """Sets up the Bluetooth Socket connection between two Android devices.
1113
1114    Args:
1115        client_ad: the Android device performing the connection.
1116        server_ad: the Android device accepting the connection.
1117    Returns:
1118        True if connection was successful, false if unsuccessful.
1119    """
1120    server_ad.droid.bluetoothStartPairingHelper()
1121    client_ad.droid.bluetoothStartPairingHelper()
1122
1123    server_ad.droid.bluetoothSocketConnBeginAcceptThreadUuid(
1124        (bluetooth_socket_conn_test_uuid if uuid is None else uuid),
1125        accept_timeout_ms)
1126    client_ad.droid.bluetoothSocketConnBeginConnectThreadUuid(
1127        server_ad.droid.bluetoothGetLocalAddress(),
1128        (bluetooth_socket_conn_test_uuid if uuid is None else uuid))
1129
1130    end_time = time.time() + bt_default_timeout
1131    result = False
1132    test_result = True
1133    while time.time() < end_time:
1134        if len(client_ad.droid.bluetoothSocketConnActiveConnections()) > 0:
1135            test_result = True
1136            client_ad.log.info("Bluetooth socket Client Connection Active")
1137            break
1138        else:
1139            test_result = False
1140        time.sleep(1)
1141    if not test_result:
1142        client_ad.log.error(
1143            "Failed to establish a Bluetooth socket connection")
1144        return False
1145    return True
1146
1147
1148def orchestrate_rfcomm_connection(client_ad,
1149                                  server_ad,
1150                                  accept_timeout_ms=default_rfcomm_timeout_ms,
1151                                  uuid=None):
1152    """Sets up the RFCOMM connection between two Android devices.
1153
1154    Args:
1155        client_ad: the Android device performing the connection.
1156        server_ad: the Android device accepting the connection.
1157    Returns:
1158        True if connection was successful, false if unsuccessful.
1159    """
1160    result = orchestrate_bluetooth_socket_connection(
1161        client_ad, server_ad, accept_timeout_ms,
1162        (bt_rfcomm_uuids['default_uuid'] if uuid is None else uuid))
1163
1164    return result
1165
1166
1167def pair_pri_to_sec(pri_ad, sec_ad, attempts=2, auto_confirm=True):
1168    """Pairs pri droid to secondary droid.
1169
1170    Args:
1171        pri_ad: Android device initiating connection
1172        sec_ad: Android device accepting connection
1173        attempts: Number of attempts to try until failure.
1174        auto_confirm: Auto confirm passkey match for both devices
1175
1176    Returns:
1177        Pass if True
1178        Fail if False
1179    """
1180    pri_ad.droid.bluetoothStartConnectionStateChangeMonitor(
1181        sec_ad.droid.bluetoothGetLocalAddress())
1182    curr_attempts = 0
1183    while curr_attempts < attempts:
1184        if _pair_pri_to_sec(pri_ad, sec_ad, auto_confirm):
1185            return True
1186        # Wait 2 seconds before unbound
1187        time.sleep(2)
1188        if not clear_bonded_devices(pri_ad):
1189            log.error(
1190                "Failed to clear bond for primary device at attempt {}".format(
1191                    str(curr_attempts)))
1192            return False
1193        if not clear_bonded_devices(sec_ad):
1194            log.error(
1195                "Failed to clear bond for secondary device at attempt {}".
1196                format(str(curr_attempts)))
1197            return False
1198        # Wait 2 seconds after unbound
1199        time.sleep(2)
1200        curr_attempts += 1
1201    log.error("pair_pri_to_sec failed to connect after {} attempts".format(
1202        str(attempts)))
1203    return False
1204
1205
1206def _pair_pri_to_sec(pri_ad, sec_ad, auto_confirm):
1207    # Enable discovery on sec_ad so that pri_ad can find it.
1208    # The timeout here is based on how much time it would take for two devices
1209    # to pair with each other once pri_ad starts seeing devices.
1210    pri_droid = pri_ad.droid
1211    sec_droid = sec_ad.droid
1212    pri_ad.ed.clear_all_events()
1213    sec_ad.ed.clear_all_events()
1214    log.info("Bonding device {} to {}".format(
1215        pri_droid.bluetoothGetLocalAddress(),
1216        sec_droid.bluetoothGetLocalAddress()))
1217    sec_droid.bluetoothMakeDiscoverable(bt_default_timeout)
1218    target_address = sec_droid.bluetoothGetLocalAddress()
1219    log.debug("Starting paring helper on each device")
1220    pri_droid.bluetoothStartPairingHelper(auto_confirm)
1221    sec_droid.bluetoothStartPairingHelper(auto_confirm)
1222    pri_ad.log.info("Primary device starting discovery and executing bond")
1223    result = pri_droid.bluetoothDiscoverAndBond(target_address)
1224    if not auto_confirm:
1225        if not _wait_for_passkey_match(pri_ad, sec_ad):
1226            return False
1227    # Loop until we have bonded successfully or timeout.
1228    end_time = time.time() + bt_default_timeout
1229    pri_ad.log.info("Verifying devices are bonded")
1230    while time.time() < end_time:
1231        bonded_devices = pri_droid.bluetoothGetBondedDevices()
1232        bonded = False
1233        for d in bonded_devices:
1234            if d['address'] == target_address:
1235                pri_ad.log.info("Successfully bonded to device")
1236                return True
1237        time.sleep(0.1)
1238    # Timed out trying to bond.
1239    pri_ad.log.info("Failed to bond devices.")
1240    return False
1241
1242
1243def reset_bluetooth(android_devices):
1244    """Resets Bluetooth state of input Android device list.
1245
1246    Args:
1247        android_devices: The Android device list to reset Bluetooth state on.
1248
1249    Returns:
1250        True if successful, false if unsuccessful.
1251    """
1252    for a in android_devices:
1253        droid, ed = a.droid, a.ed
1254        a.log.info("Reset state of bluetooth on device.")
1255        if droid.bluetoothCheckState() is True:
1256            droid.bluetoothToggleState(False)
1257            expected_bluetooth_off_event_name = bluetooth_off
1258            try:
1259                ed.pop_event(expected_bluetooth_off_event_name,
1260                             bt_default_timeout)
1261            except Exception:
1262                a.log.error("Failed to toggle Bluetooth off.")
1263                return False
1264        # temp sleep for b/17723234
1265        time.sleep(3)
1266        if not bluetooth_enabled_check(a):
1267            return False
1268    return True
1269
1270
1271def scan_and_verify_n_advertisements(scn_ad, max_advertisements):
1272    """Verify that input number of advertisements can be found from the scanning
1273    Android device.
1274
1275    Args:
1276        scn_ad: The Android device to start LE scanning on.
1277        max_advertisements: The number of advertisements the scanner expects to
1278        find.
1279
1280    Returns:
1281        True if successful, false if unsuccessful.
1282    """
1283    test_result = False
1284    address_list = []
1285    filter_list = scn_ad.droid.bleGenFilterList()
1286    scn_ad.droid.bleBuildScanFilter(filter_list)
1287    scan_settings = scn_ad.droid.bleBuildScanSetting()
1288    scan_callback = scn_ad.droid.bleGenScanCallback()
1289    scn_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback)
1290    start_time = time.time()
1291    while (start_time + bt_default_timeout) > time.time():
1292        event = None
1293        try:
1294            event = scn_ad.ed.pop_event(scan_result.format(scan_callback),
1295                                        bt_default_timeout)
1296        except Empty as error:
1297            raise BtTestUtilsError(
1298                "Failed to find scan event: {}".format(error))
1299        address = event['data']['Result']['deviceInfo']['address']
1300        if address not in address_list:
1301            address_list.append(address)
1302        if len(address_list) == max_advertisements:
1303            test_result = True
1304            break
1305    scn_ad.droid.bleStopBleScan(scan_callback)
1306    return test_result
1307
1308
1309def set_bluetooth_codec(android_device,
1310                        codec_type,
1311                        sample_rate,
1312                        bits_per_sample,
1313                        channel_mode,
1314                        codec_specific_1=0):
1315    """Sets the A2DP codec configuration on the AndroidDevice.
1316
1317    Args:
1318        android_device (acts.controllers.android_device.AndroidDevice): the
1319            android device for which to switch the codec.
1320        codec_type (str): the desired codec type. Must be a key in
1321            bt_constants.codec_types.
1322        sample_rate (str): the desired sample rate. Must be a key in
1323            bt_constants.sample_rates.
1324        bits_per_sample (str): the desired bits per sample. Must be a key in
1325            bt_constants.bits_per_samples.
1326        channel_mode (str): the desired channel mode. Must be a key in
1327            bt_constants.channel_modes.
1328        codec_specific_1 (int): the desired bit rate (quality) for LDAC codec.
1329    Returns:
1330        bool: True if the codec config was successfully changed to the desired
1331            values. Else False.
1332    """
1333    message = ("Set Android Device A2DP Bluetooth codec configuration:\n"
1334               "\tCodec: {codec_type}\n"
1335               "\tSample Rate: {sample_rate}\n"
1336               "\tBits per Sample: {bits_per_sample}\n"
1337               "\tChannel Mode: {channel_mode}".format(
1338                   codec_type=codec_type,
1339                   sample_rate=sample_rate,
1340                   bits_per_sample=bits_per_sample,
1341                   channel_mode=channel_mode))
1342    android_device.log.info(message)
1343
1344    # Send SL4A command
1345    droid, ed = android_device.droid, android_device.ed
1346    if not droid.bluetoothA2dpSetCodecConfigPreference(
1347            codec_types[codec_type], sample_rates[str(sample_rate)],
1348            bits_per_samples[str(bits_per_sample)],
1349            channel_modes[channel_mode], codec_specific_1):
1350        android_device.log.warning(
1351            "SL4A command returned False. Codec was not "
1352            "changed.")
1353    else:
1354        try:
1355            ed.pop_event(bluetooth_a2dp_codec_config_changed,
1356                         bt_default_timeout)
1357        except Exception:
1358            android_device.log.warning("SL4A event not registered. Codec "
1359                                       "may not have been changed.")
1360
1361    # Validate codec value through ADB
1362    # TODO (aidanhb): validate codec more robustly using SL4A
1363    command = "dumpsys bluetooth_manager | grep -i 'current codec'"
1364    out = android_device.adb.shell(command)
1365    split_out = out.split(": ")
1366    if len(split_out) != 2:
1367        android_device.log.warning("Could not verify codec config change "
1368                                   "through ADB.")
1369    elif split_out[1].strip().upper() != codec_type:
1370        android_device.log.error("Codec config was not changed.\n"
1371                                 "\tExpected codec: {exp}\n"
1372                                 "\tActual codec: {act}".format(
1373                                     exp=codec_type, act=split_out[1].strip()))
1374        return False
1375    android_device.log.info("Bluetooth codec successfully changed.")
1376    return True
1377
1378
1379def set_bt_scan_mode(ad, scan_mode_value):
1380    """Set Android device's Bluetooth scan mode.
1381
1382    Args:
1383        ad: The Android device to set the scan mode on.
1384        scan_mode_value: The value to set the scan mode to.
1385
1386    Returns:
1387        True if successful, false if unsuccessful.
1388    """
1389    droid, ed = ad.droid, ad.ed
1390    if scan_mode_value == bt_scan_mode_types['state_off']:
1391        disable_bluetooth(droid)
1392        scan_mode = droid.bluetoothGetScanMode()
1393        reset_bluetooth([ad])
1394        if scan_mode != scan_mode_value:
1395            return False
1396    elif scan_mode_value == bt_scan_mode_types['none']:
1397        droid.bluetoothMakeUndiscoverable()
1398        scan_mode = droid.bluetoothGetScanMode()
1399        if scan_mode != scan_mode_value:
1400            return False
1401    elif scan_mode_value == bt_scan_mode_types['connectable']:
1402        droid.bluetoothMakeUndiscoverable()
1403        droid.bluetoothMakeConnectable()
1404        scan_mode = droid.bluetoothGetScanMode()
1405        if scan_mode != scan_mode_value:
1406            return False
1407    elif (scan_mode_value == bt_scan_mode_types['connectable_discoverable']):
1408        droid.bluetoothMakeDiscoverable()
1409        scan_mode = droid.bluetoothGetScanMode()
1410        if scan_mode != scan_mode_value:
1411            return False
1412    else:
1413        # invalid scan mode
1414        return False
1415    return True
1416
1417
1418def set_device_name(droid, name):
1419    """Set and check Bluetooth local name on input droid object.
1420
1421    Args:
1422        droid: Droid object to set local name on.
1423        name: the Bluetooth local name to set.
1424
1425    Returns:
1426        True if successful, false if unsuccessful.
1427    """
1428    droid.bluetoothSetLocalName(name)
1429    time.sleep(2)
1430    droid_name = droid.bluetoothGetLocalName()
1431    if droid_name != name:
1432        return False
1433    return True
1434
1435
1436def set_profile_priority(host_ad, client_ad, profiles, priority):
1437    """Sets the priority of said profile(s) on host_ad for client_ad"""
1438    for profile in profiles:
1439        host_ad.log.info("Profile {} on {} for {} set to priority {}".format(
1440            profile, host_ad.droid.bluetoothGetLocalName(),
1441            client_ad.droid.bluetoothGetLocalAddress(), priority.value))
1442        if bt_profile_constants['a2dp_sink'] == profile:
1443            host_ad.droid.bluetoothA2dpSinkSetPriority(
1444                client_ad.droid.bluetoothGetLocalAddress(), priority.value)
1445        elif bt_profile_constants['headset_client'] == profile:
1446            host_ad.droid.bluetoothHfpClientSetPriority(
1447                client_ad.droid.bluetoothGetLocalAddress(), priority.value)
1448        elif bt_profile_constants['pbap_client'] == profile:
1449            host_ad.droid.bluetoothPbapClientSetPriority(
1450                client_ad.droid.bluetoothGetLocalAddress(), priority.value)
1451        else:
1452            host_ad.log.error(
1453                "Profile {} not yet supported for priority settings".format(
1454                    profile))
1455
1456
1457def setup_multiple_devices_for_bt_test(android_devices):
1458    """A common setup routine for Bluetooth on input Android device list.
1459
1460    Things this function sets up:
1461    1. Resets Bluetooth
1462    2. Set Bluetooth local name to random string of size 4
1463    3. Disable BLE background scanning.
1464    4. Enable Bluetooth snoop logging.
1465
1466    Args:
1467        android_devices: Android device list to setup Bluetooth on.
1468
1469    Returns:
1470        True if successful, false if unsuccessful.
1471    """
1472    log.info("Setting up Android Devices")
1473    # TODO: Temp fix for an selinux error.
1474    for ad in android_devices:
1475        ad.adb.shell("setenforce 0")
1476    threads = []
1477    try:
1478        for a in android_devices:
1479            thread = threading.Thread(target=factory_reset_bluetooth,
1480                                      args=([[a]]))
1481            threads.append(thread)
1482            thread.start()
1483        for t in threads:
1484            t.join()
1485
1486        for a in android_devices:
1487            d = a.droid
1488            # TODO: Create specific RPC command to instantiate
1489            # BluetoothConnectionFacade. This is just a workaround.
1490            d.bluetoothStartConnectionStateChangeMonitor("")
1491            setup_result = d.bluetoothSetLocalName(generate_id_by_size(4))
1492            if not setup_result:
1493                a.log.error("Failed to set device name.")
1494                return setup_result
1495            d.bluetoothDisableBLE()
1496            utils.set_location_service(a, True)
1497            bonded_devices = d.bluetoothGetBondedDevices()
1498            for b in bonded_devices:
1499                a.log.info("Removing bond for device {}".format(b['address']))
1500                d.bluetoothUnbond(b['address'])
1501        for a in android_devices:
1502            a.adb.shell("setprop persist.bluetooth.btsnooplogmode full")
1503            getprop_result = a.adb.shell(
1504                "getprop persist.bluetooth.btsnooplogmode") == "full"
1505            if not getprop_result:
1506                a.log.warning("Failed to enable Bluetooth Hci Snoop Logging.")
1507    except Exception as err:
1508        log.error("Something went wrong in multi device setup: {}".format(err))
1509        return False
1510    return setup_result
1511
1512
1513def setup_n_advertisements(adv_ad, num_advertisements):
1514    """Setup input number of advertisements on input Android device.
1515
1516    Args:
1517        adv_ad: The Android device to start LE advertisements on.
1518        num_advertisements: The number of advertisements to start.
1519
1520    Returns:
1521        advertise_callback_list: List of advertisement callback ids.
1522    """
1523    adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode(
1524        ble_advertise_settings_modes['low_latency'])
1525    advertise_data = adv_ad.droid.bleBuildAdvertiseData()
1526    advertise_settings = adv_ad.droid.bleBuildAdvertiseSettings()
1527    advertise_callback_list = []
1528    for i in range(num_advertisements):
1529        advertise_callback = adv_ad.droid.bleGenBleAdvertiseCallback()
1530        advertise_callback_list.append(advertise_callback)
1531        adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data,
1532                                            advertise_settings)
1533        try:
1534            adv_ad.ed.pop_event(adv_succ.format(advertise_callback),
1535                                bt_default_timeout)
1536            adv_ad.log.info("Advertisement {} started.".format(i + 1))
1537        except Empty as error:
1538            adv_ad.log.error("Advertisement {} failed to start.".format(i + 1))
1539            raise BtTestUtilsError(
1540                "Test failed with Empty error: {}".format(error))
1541    return advertise_callback_list
1542
1543
1544def take_btsnoop_log(ad, testcase, testname):
1545    """Grabs the btsnoop_hci log on a device and stores it in the log directory
1546    of the test class.
1547
1548    If you want grab the btsnoop_hci log, call this function with android_device
1549    objects in on_fail. Bug report takes a relative long time to take, so use
1550    this cautiously.
1551
1552    Args:
1553        ad: The android_device instance to take bugreport on.
1554        testcase: Name of the test calss that triggered this snoop log.
1555        testname: Name of the test case that triggered this bug report.
1556    """
1557    testname = "".join(x for x in testname if x.isalnum())
1558    serial = ad.serial
1559    device_model = ad.droid.getBuildModel()
1560    device_model = device_model.replace(" ", "")
1561    out_name = ','.join((testname, device_model, serial))
1562    snoop_path = os.path.join(ad.device_log_path, 'BluetoothSnoopLogs')
1563    os.makedirs(snoop_path, exist_ok=True)
1564    cmd = ''.join(("adb -s ", serial, " pull ", btsnoop_log_path_on_device,
1565                   " ", snoop_path + '/' + out_name, ".btsnoop_hci.log"))
1566    exe_cmd(cmd)
1567    try:
1568        cmd = ''.join(
1569            ("adb -s ", serial, " pull ", btsnoop_last_log_path_on_device, " ",
1570             snoop_path + '/' + out_name, ".btsnoop_hci.log.last"))
1571        exe_cmd(cmd)
1572    except Exception as err:
1573        testcase.log.info(
1574            "File does not exist {}".format(btsnoop_last_log_path_on_device))
1575
1576
1577def take_btsnoop_logs(android_devices, testcase, testname):
1578    """Pull btsnoop logs from an input list of android devices.
1579
1580    Args:
1581        android_devices: the list of Android devices to pull btsnoop logs from.
1582        testcase: Name of the test calss that triggered this snoop log.
1583        testname: Name of the test case that triggered this bug report.
1584    """
1585    for a in android_devices:
1586        take_btsnoop_log(a, testcase, testname)
1587
1588
1589def teardown_n_advertisements(adv_ad, num_advertisements,
1590                              advertise_callback_list):
1591    """Stop input number of advertisements on input Android device.
1592
1593    Args:
1594        adv_ad: The Android device to stop LE advertisements on.
1595        num_advertisements: The number of advertisements to stop.
1596        advertise_callback_list: The list of advertisement callbacks to stop.
1597
1598    Returns:
1599        True if successful, false if unsuccessful.
1600    """
1601    for n in range(num_advertisements):
1602        adv_ad.droid.bleStopBleAdvertising(advertise_callback_list[n])
1603    return True
1604
1605
1606def verify_server_and_client_connected(client_ad, server_ad, log=True):
1607    """Verify that input server and client Android devices are connected.
1608
1609    This code is under the assumption that there will only be
1610    a single connection.
1611
1612    Args:
1613        client_ad: the Android device to check number of active connections.
1614        server_ad: the Android device to check number of active connections.
1615
1616    Returns:
1617        True both server and client have at least 1 active connection,
1618        false if unsuccessful.
1619    """
1620    test_result = True
1621    if len(server_ad.droid.bluetoothSocketConnActiveConnections()) == 0:
1622        if log:
1623            server_ad.log.error("No socket connections found on server.")
1624        test_result = False
1625    if len(client_ad.droid.bluetoothSocketConnActiveConnections()) == 0:
1626        if log:
1627            client_ad.log.error("No socket connections found on client.")
1628        test_result = False
1629    return test_result
1630
1631
1632def wait_for_bluetooth_manager_state(droid,
1633                                     state=None,
1634                                     timeout=10,
1635                                     threshold=5):
1636    """ Waits for BlueTooth normalized state or normalized explicit state
1637    args:
1638        droid: droid device object
1639        state: expected BlueTooth state
1640        timeout: max timeout threshold
1641        threshold: list len of bt state
1642    Returns:
1643        True if successful, false if unsuccessful.
1644    """
1645    all_states = []
1646    get_state = lambda: droid.bluetoothGetLeState()
1647    start_time = time.time()
1648    while time.time() < start_time + timeout:
1649        all_states.append(get_state())
1650        if len(all_states) >= threshold:
1651            # for any normalized state
1652            if state is None:
1653                if len(set(all_states[-threshold:])) == 1:
1654                    log.info("State normalized {}".format(
1655                        set(all_states[-threshold:])))
1656                    return True
1657            else:
1658                # explicit check against normalized state
1659                if set([state]).issubset(all_states[-threshold:]):
1660                    return True
1661        time.sleep(0.5)
1662    log.error(
1663        "Bluetooth state fails to normalize" if state is None else
1664        "Failed to match bluetooth state, current state {} expected state {}".
1665        format(get_state(), state))
1666    return False
1667
1668
1669def _wait_for_passkey_match(pri_ad, sec_ad):
1670    pri_pin, sec_pin = -1, 1
1671    pri_variant, sec_variant = -1, 1
1672    pri_pairing_req, sec_pairing_req = None, None
1673    try:
1674        pri_pairing_req = pri_ad.ed.pop_event(
1675            event_name="BluetoothActionPairingRequest",
1676            timeout=bt_default_timeout)
1677        pri_variant = pri_pairing_req["data"]["PairingVariant"]
1678        pri_pin = pri_pairing_req["data"]["Pin"]
1679        pri_ad.log.info("Primary device received Pin: {}, Variant: {}".format(
1680            pri_pin, pri_variant))
1681        sec_pairing_req = sec_ad.ed.pop_event(
1682            event_name="BluetoothActionPairingRequest",
1683            timeout=bt_default_timeout)
1684        sec_variant = sec_pairing_req["data"]["PairingVariant"]
1685        sec_pin = sec_pairing_req["data"]["Pin"]
1686        sec_ad.log.info(
1687            "Secondary device received Pin: {}, Variant: {}".format(
1688                sec_pin, sec_variant))
1689    except Empty as err:
1690        log.error("Wait for pin error: {}".format(err))
1691        log.error("Pairing request state, Primary: {}, Secondary: {}".format(
1692            pri_pairing_req, sec_pairing_req))
1693        return False
1694    if pri_variant == sec_variant == pairing_variant_passkey_confirmation:
1695        confirmation = pri_pin == sec_pin
1696        if confirmation:
1697            log.info("Pairing code matched, accepting connection")
1698        else:
1699            log.info("Pairing code mismatched, rejecting connection")
1700        pri_ad.droid.eventPost("BluetoothActionPairingRequestUserConfirm",
1701                               str(confirmation))
1702        sec_ad.droid.eventPost("BluetoothActionPairingRequestUserConfirm",
1703                               str(confirmation))
1704        if not confirmation:
1705            return False
1706    elif pri_variant != sec_variant:
1707        log.error("Pairing variant mismatched, abort connection")
1708        return False
1709    return True
1710
1711
1712def write_read_verify_data(client_ad, server_ad, msg, binary=False):
1713    """Verify that the client wrote data to the server Android device correctly.
1714
1715    Args:
1716        client_ad: the Android device to perform the write.
1717        server_ad: the Android device to read the data written.
1718        msg: the message to write.
1719        binary: if the msg arg is binary or not.
1720
1721    Returns:
1722        True if the data written matches the data read, false if not.
1723    """
1724    client_ad.log.info("Write message.")
1725    try:
1726        if binary:
1727            client_ad.droid.bluetoothSocketConnWriteBinary(msg)
1728        else:
1729            client_ad.droid.bluetoothSocketConnWrite(msg)
1730    except Exception as err:
1731        client_ad.log.error("Failed to write data: {}".format(err))
1732        return False
1733    server_ad.log.info("Read message.")
1734    try:
1735        if binary:
1736            read_msg = server_ad.droid.bluetoothSocketConnReadBinary().rstrip(
1737                "\r\n")
1738        else:
1739            read_msg = server_ad.droid.bluetoothSocketConnRead()
1740    except Exception as err:
1741        server_ad.log.error("Failed to read data: {}".format(err))
1742        return False
1743    log.info("Verify message.")
1744    if msg != read_msg:
1745        log.error("Mismatch! Read: {}, Expected: {}".format(read_msg, msg))
1746        return False
1747    return True
1748
1749
1750class MediaControlOverSl4a(object):
1751    """Media control using sl4a facade for general purpose.
1752
1753    """
1754    def __init__(self, android_device, music_file):
1755        """Initialize the media_control class.
1756
1757        Args:
1758            android_dut: android_device object
1759            music_file: location of the music file
1760        """
1761        self.android_device = android_device
1762        self.music_file = music_file
1763
1764    def play(self):
1765        """Play media.
1766
1767        """
1768        self.android_device.droid.mediaPlayOpen('file://%s' % self.music_file,
1769                                                'default', True)
1770        playing = self.android_device.droid.mediaIsPlaying()
1771        asserts.assert_true(playing,
1772                            'Failed to play music %s' % self.music_file)
1773
1774    def pause(self):
1775        """Pause media.
1776
1777        """
1778        self.android_device.droid.mediaPlayPause('default')
1779        paused = not self.android_device.droid.mediaIsPlaying()
1780        asserts.assert_true(paused,
1781                            'Failed to pause music %s' % self.music_file)
1782
1783    def resume(self):
1784        """Resume media.
1785
1786        """
1787        self.android_device.droid.mediaPlayStart('default')
1788        playing = self.android_device.droid.mediaIsPlaying()
1789        asserts.assert_true(playing,
1790                            'Failed to play music %s' % self.music_file)
1791
1792    def stop(self):
1793        """Stop media.
1794
1795        """
1796        self.android_device.droid.mediaPlayStop('default')
1797        stopped = not self.android_device.droid.mediaIsPlaying()
1798        asserts.assert_true(stopped,
1799                            'Failed to stop music %s' % self.music_file)
1800