1# /usr/bin/env python3.4
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16
17import logging
18import random
19import pprint
20import string
21from queue import Empty
22import queue
23import threading
24import time
25from acts import utils
26
27from subprocess import call
28
29from acts.test_utils.bt.bt_constants import adv_fail
30from acts.test_utils.bt.bt_constants import adv_succ
31from acts.test_utils.bt.bt_constants import advertising_set_started
32from acts.test_utils.bt.bt_constants import advertising_set_stopped
33from acts.test_utils.bt.bt_constants import advertising_set_on_own_address_read
34from acts.test_utils.bt.bt_constants import advertising_set_stopped
35from acts.test_utils.bt.bt_constants import advertising_set_enabled
36from acts.test_utils.bt.bt_constants import advertising_set_data_set
37from acts.test_utils.bt.bt_constants import advertising_set_scan_response_set
38from acts.test_utils.bt.bt_constants import advertising_set_parameters_update
39from acts.test_utils.bt.bt_constants import \
40    advertising_set_periodic_parameters_updated
41from acts.test_utils.bt.bt_constants import advertising_set_periodic_data_set
42from acts.test_utils.bt.bt_constants import advertising_set_periodic_enable
43from acts.test_utils.bt.bt_constants import batch_scan_not_supported_list
44from acts.test_utils.bt.bt_constants import batch_scan_result
45from acts.test_utils.bt.bt_constants import ble_advertise_settings_modes
46from acts.test_utils.bt.bt_constants import ble_advertise_settings_tx_powers
47from acts.test_utils.bt.bt_constants import bluetooth_off
48from acts.test_utils.bt.bt_constants import bluetooth_on
49from acts.test_utils.bt.bt_constants import \
50    bluetooth_profile_connection_state_changed
51from acts.test_utils.bt.bt_constants import bt_default_timeout
52from acts.test_utils.bt.bt_constants import bt_discovery_timeout
53from acts.test_utils.bt.bt_constants import bt_profile_states
54from acts.test_utils.bt.bt_constants import bt_profile_constants
55from acts.test_utils.bt.bt_constants import bt_rfcomm_uuids
56from acts.test_utils.bt.bt_constants import bluetooth_socket_conn_test_uuid
57from acts.test_utils.bt.bt_constants import bt_scan_mode_types
58from acts.test_utils.bt.bt_constants import btsnoop_last_log_path_on_device
59from acts.test_utils.bt.bt_constants import btsnoop_log_path_on_device
60from acts.test_utils.bt.bt_constants import default_rfcomm_timeout_ms
61from acts.test_utils.bt.bt_constants import default_bluetooth_socket_timeout_ms
62from acts.test_utils.bt.bt_constants import mtu_changed
63from acts.test_utils.bt.bt_constants import pairing_variant_passkey_confirmation
64from acts.test_utils.bt.bt_constants import pan_connect_timeout
65from acts.test_utils.bt.bt_constants import small_timeout
66from acts.test_utils.bt.bt_constants import scan_result
67from acts.test_utils.bt.bt_constants import scan_failed
68from acts.test_utils.bt.bt_constants import hid_id_keyboard
69
70from acts.test_utils.tel.tel_test_utils import toggle_airplane_mode_by_adb
71from acts.test_utils.tel.tel_test_utils import verify_http_connection
72from acts.utils import exe_cmd
73from acts.utils import create_dir
74
75log = logging
76
77advertisements_to_devices = {}
78
79
80class BtTestUtilsError(Exception):
81    pass
82
83
84def scan_and_verify_n_advertisements(scn_ad, max_advertisements):
85    """Verify that input number of advertisements can be found from the scanning
86    Android device.
87
88    Args:
89        scn_ad: The Android device to start LE scanning on.
90        max_advertisements: The number of advertisements the scanner expects to
91        find.
92
93    Returns:
94        True if successful, false if unsuccessful.
95    """
96    test_result = False
97    address_list = []
98    filter_list = scn_ad.droid.bleGenFilterList()
99    scn_ad.droid.bleBuildScanFilter(filter_list)
100    scan_settings = scn_ad.droid.bleBuildScanSetting()
101    scan_callback = scn_ad.droid.bleGenScanCallback()
102    scn_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback)
103    start_time = time.time()
104    while (start_time + bt_default_timeout) > time.time():
105        event = None
106        try:
107            event = scn_ad.ed.pop_event(
108                scan_result.format(scan_callback), bt_default_timeout)
109        except Empty as error:
110            raise BtTestUtilsError(
111                "Failed to find scan event: {}".format(error))
112        address = event['data']['Result']['deviceInfo']['address']
113        if address not in address_list:
114            address_list.append(address)
115        if len(address_list) == max_advertisements:
116            test_result = True
117            break
118    scn_ad.droid.bleStopBleScan(scan_callback)
119    return test_result
120
121
122def setup_n_advertisements(adv_ad, num_advertisements):
123    """Setup input number of advertisements on input Android device.
124
125    Args:
126        adv_ad: The Android device to start LE advertisements on.
127        num_advertisements: The number of advertisements to start.
128
129    Returns:
130        advertise_callback_list: List of advertisement callback ids.
131    """
132    adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode(
133        ble_advertise_settings_modes['low_latency'])
134    advertise_data = adv_ad.droid.bleBuildAdvertiseData()
135    advertise_settings = adv_ad.droid.bleBuildAdvertiseSettings()
136    advertise_callback_list = []
137    for i in range(num_advertisements):
138        advertise_callback = adv_ad.droid.bleGenBleAdvertiseCallback()
139        advertise_callback_list.append(advertise_callback)
140        adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data,
141                                            advertise_settings)
142        try:
143            adv_ad.ed.pop_event(
144                adv_succ.format(advertise_callback), bt_default_timeout)
145            adv_ad.log.info("Advertisement {} started.".format(i + 1))
146        except Empty as error:
147            adv_ad.log.error("Advertisement {} failed to start.".format(i + 1))
148            raise BtTestUtilsError(
149                "Test failed with Empty error: {}".format(error))
150    return advertise_callback_list
151
152
153def teardown_n_advertisements(adv_ad, num_advertisements,
154                              advertise_callback_list):
155    """Stop input number of advertisements on input Android device.
156
157    Args:
158        adv_ad: The Android device to stop LE advertisements on.
159        num_advertisements: The number of advertisements to stop.
160        advertise_callback_list: The list of advertisement callbacks to stop.
161
162    Returns:
163        True if successful, false if unsuccessful.
164    """
165    for n in range(num_advertisements):
166        adv_ad.droid.bleStopBleAdvertising(advertise_callback_list[n])
167    return True
168
169
170def generate_ble_scan_objects(droid):
171    """Generate generic LE scan objects.
172
173    Args:
174        droid: The droid object to generate LE scan objects from.
175
176    Returns:
177        filter_list: The generated scan filter list id.
178        scan_settings: The generated scan settings id.
179        scan_callback: The generated scan callback id.
180    """
181    filter_list = droid.bleGenFilterList()
182    scan_settings = droid.bleBuildScanSetting()
183    scan_callback = droid.bleGenScanCallback()
184    return filter_list, scan_settings, scan_callback
185
186
187def generate_ble_advertise_objects(droid):
188    """Generate generic LE advertise objects.
189
190    Args:
191        droid: The droid object to generate advertise LE objects from.
192
193    Returns:
194        advertise_callback: The generated advertise callback id.
195        advertise_data: The generated advertise data id.
196        advertise_settings: The generated advertise settings id.
197    """
198    advertise_callback = droid.bleGenBleAdvertiseCallback()
199    advertise_data = droid.bleBuildAdvertiseData()
200    advertise_settings = droid.bleBuildAdvertiseSettings()
201    return advertise_callback, advertise_data, advertise_settings
202
203
204def setup_multiple_devices_for_bt_test(android_devices):
205    """A common setup routine for Bluetooth on input Android device list.
206
207    Things this function sets up:
208    1. Resets Bluetooth
209    2. Set Bluetooth local name to random string of size 4
210    3. Disable BLE background scanning.
211    4. Enable Bluetooth snoop logging.
212
213    Args:
214        android_devices: Android device list to setup Bluetooth on.
215
216    Returns:
217        True if successful, false if unsuccessful.
218    """
219    log.info("Setting up Android Devices")
220    # TODO: Temp fix for an selinux error.
221    for ad in android_devices:
222        ad.adb.shell("setenforce 0")
223    threads = []
224    try:
225        for a in android_devices:
226            thread = threading.Thread(target=factory_reset_bluetooth, args=([[a]]))
227            threads.append(thread)
228            thread.start()
229        for t in threads:
230            t.join()
231
232        for a in android_devices:
233            d = a.droid
234            # TODO: Create specific RPC command to instantiate
235            # BluetoothConnectionFacade. This is just a workaround.
236            d.bluetoothStartConnectionStateChangeMonitor("")
237            setup_result = d.bluetoothSetLocalName(generate_id_by_size(4))
238            if not setup_result:
239                a.log.error("Failed to set device name.")
240                return setup_result
241            d.bluetoothDisableBLE()
242            bonded_devices = d.bluetoothGetBondedDevices()
243            for b in bonded_devices:
244                a.log.info("Removing bond for device {}".format(b['address']))
245                d.bluetoothUnbond(b['address'])
246        for a in android_devices:
247            a.adb.shell("setprop persist.bluetooth.btsnoopenable true")
248            getprop_result = bool(
249                a.adb.shell("getprop persist.bluetooth.btsnoopenable"))
250            if not getprop_result:
251                a.log.warning("Failed to enable Bluetooth Hci Snoop Logging.")
252    except Exception as err:
253        log.error("Something went wrong in multi device setup: {}".format(err))
254        return False
255    return setup_result
256
257
258def bluetooth_enabled_check(ad):
259    """Checks if the Bluetooth state is enabled, if not it will attempt to
260    enable it.
261
262    Args:
263        ad: The Android device list to enable Bluetooth on.
264
265    Returns:
266        True if successful, false if unsuccessful.
267    """
268    if not ad.droid.bluetoothCheckState():
269        ad.droid.bluetoothToggleState(True)
270        expected_bluetooth_on_event_name = bluetooth_on
271        try:
272            ad.ed.pop_event(expected_bluetooth_on_event_name,
273                            bt_default_timeout)
274        except Empty:
275            ad.log.info(
276                "Failed to toggle Bluetooth on(no broadcast received).")
277            # Try one more time to poke at the actual state.
278            if ad.droid.bluetoothCheckState():
279                ad.log.info(".. actual state is ON")
280                return True
281            ad.log.error(".. actual state is OFF")
282            return False
283    return True
284
285def wait_for_bluetooth_manager_state(droid, state=None, timeout=10, threshold=5):
286    """ Waits for BlueTooth normalized state or normalized explicit state
287    args:
288        droid: droid device object
289        state: expected BlueTooth state
290        timeout: max timeout threshold
291        threshold: list len of bt state
292    Returns:
293        True if successful, false if unsuccessful.
294    """
295    all_states = []
296    get_state = lambda: droid.bluetoothGetLeState()
297    start_time = time.time()
298    while time.time() < start_time + timeout:
299        all_states.append(get_state())
300        if len(all_states) >= threshold:
301            # for any normalized state
302            if state is None:
303                if len(set(all_states[-threshold:])) == 1:
304                    log.info("State normalized {}".format(set(all_states[-threshold:])))
305                    return True
306            else:
307                # explicit check against normalized state
308                if set([state]).issubset(all_states[-threshold:]):
309                    return True
310        time.sleep(0.5)
311    log.error(
312        "Bluetooth state fails to normalize" if state is None else
313        "Failed to match bluetooth state, current state {} expected state {}".format(get_state(), state))
314    return False
315
316def factory_reset_bluetooth(android_devices):
317    """Clears Bluetooth stack of input Android device list.
318
319        Args:
320            android_devices: The Android device list to reset Bluetooth
321
322        Returns:
323            True if successful, false if unsuccessful.
324        """
325    for a in android_devices:
326        droid, ed = a.droid, a.ed
327        a.log.info("Reset state of bluetooth on device.")
328        if not bluetooth_enabled_check(a):
329            return False
330        # TODO: remove device unbond b/79418045
331        # Temporary solution to ensure all devices are unbonded
332        bonded_devices = droid.bluetoothGetBondedDevices()
333        for b in bonded_devices:
334            a.log.info("Removing bond for device {}".format(b['address']))
335            droid.bluetoothUnbond(b['address'])
336
337        droid.bluetoothFactoryReset()
338        wait_for_bluetooth_manager_state(droid)
339        if not enable_bluetooth(droid, ed):
340            return False
341    return True
342
343def reset_bluetooth(android_devices):
344    """Resets Bluetooth state of input Android device list.
345
346    Args:
347        android_devices: The Android device list to reset Bluetooth state on.
348
349    Returns:
350        True if successful, false if unsuccessful.
351    """
352    for a in android_devices:
353        droid, ed = a.droid, a.ed
354        a.log.info("Reset state of bluetooth on device.")
355        if droid.bluetoothCheckState() is True:
356            droid.bluetoothToggleState(False)
357            expected_bluetooth_off_event_name = bluetooth_off
358            try:
359                ed.pop_event(expected_bluetooth_off_event_name,
360                             bt_default_timeout)
361            except Exception:
362                a.log.error("Failed to toggle Bluetooth off.")
363                return False
364        # temp sleep for b/17723234
365        time.sleep(3)
366        if not bluetooth_enabled_check(a):
367            return False
368    return True
369
370
371def determine_max_advertisements(android_device):
372    """Determines programatically how many advertisements the Android device
373    supports.
374
375    Args:
376        android_device: The Android device to determine max advertisements of.
377
378    Returns:
379        The maximum advertisement count.
380    """
381    android_device.log.info(
382        "Determining number of maximum concurrent advertisements...")
383    advertisement_count = 0
384    bt_enabled = False
385    expected_bluetooth_on_event_name = bluetooth_on
386    if not android_device.droid.bluetoothCheckState():
387        android_device.droid.bluetoothToggleState(True)
388    try:
389        android_device.ed.pop_event(expected_bluetooth_on_event_name,
390                                    bt_default_timeout)
391    except Exception:
392        android_device.log.info(
393            "Failed to toggle Bluetooth on(no broadcast received).")
394        # Try one more time to poke at the actual state.
395        if android_device.droid.bluetoothCheckState() is True:
396            android_device.log.info(".. actual state is ON")
397        else:
398            android_device.log.error(
399                "Failed to turn Bluetooth on. Setting default advertisements to 1"
400            )
401            advertisement_count = -1
402            return advertisement_count
403    advertise_callback_list = []
404    advertise_data = android_device.droid.bleBuildAdvertiseData()
405    advertise_settings = android_device.droid.bleBuildAdvertiseSettings()
406    while (True):
407        advertise_callback = android_device.droid.bleGenBleAdvertiseCallback()
408        advertise_callback_list.append(advertise_callback)
409
410        android_device.droid.bleStartBleAdvertising(
411            advertise_callback, advertise_data, advertise_settings)
412
413        regex = "(" + adv_succ.format(
414            advertise_callback) + "|" + adv_fail.format(
415            advertise_callback) + ")"
416        # wait for either success or failure event
417        evt = android_device.ed.pop_events(regex, bt_default_timeout,
418                                           small_timeout)
419        if evt[0]["name"] == adv_succ.format(advertise_callback):
420            advertisement_count += 1
421            android_device.log.info(
422                "Advertisement {} started.".format(advertisement_count))
423        else:
424            error = evt[0]["data"]["Error"]
425            if error == "ADVERTISE_FAILED_TOO_MANY_ADVERTISERS":
426                android_device.log.info(
427                    "Advertisement failed to start. Reached max " +
428                    "advertisements at {}".format(advertisement_count))
429                break
430            else:
431                raise BtTestUtilsError(
432                    "Expected ADVERTISE_FAILED_TOO_MANY_ADVERTISERS," +
433                    " but received bad error code {}".format(error))
434    try:
435        for adv in advertise_callback_list:
436            android_device.droid.bleStopBleAdvertising(adv)
437    except Exception:
438        android_device.log.error(
439            "Failed to stop advertisingment, resetting Bluetooth.")
440        reset_bluetooth([android_device])
441    return advertisement_count
442
443
444def get_advanced_droid_list(android_devices):
445    """Add max_advertisement and batch_scan_supported attributes to input
446    Android devices
447
448    This will programatically determine maximum LE advertisements of each
449    input Android device.
450
451    Args:
452        android_devices: The Android devices to setup.
453
454    Returns:
455        List of Android devices with new attribtues.
456    """
457    droid_list = []
458    for a in android_devices:
459        d, e = a.droid, a.ed
460        model = d.getBuildModel()
461        max_advertisements = 1
462        batch_scan_supported = True
463        if model in advertisements_to_devices.keys():
464            max_advertisements = advertisements_to_devices[model]
465        else:
466            max_advertisements = determine_max_advertisements(a)
467            max_tries = 3
468            # Retry to calculate max advertisements
469            while max_advertisements == -1 and max_tries > 0:
470                a.log.info(
471                    "Attempts left to determine max advertisements: {}".format(
472                        max_tries))
473                max_advertisements = determine_max_advertisements(a)
474                max_tries -= 1
475            advertisements_to_devices[model] = max_advertisements
476        if model in batch_scan_not_supported_list:
477            batch_scan_supported = False
478        role = {
479            'droid': d,
480            'ed': e,
481            'max_advertisements': max_advertisements,
482            'batch_scan_supported': batch_scan_supported
483        }
484        droid_list.append(role)
485    return droid_list
486
487
488def generate_id_by_size(
489        size,
490        chars=(
491                string.ascii_lowercase + string.ascii_uppercase + string.digits)):
492    """Generate random ascii characters of input size and input char types
493
494    Args:
495        size: Input size of string.
496        chars: (Optional) Chars to use in generating a random string.
497
498    Returns:
499        String of random input chars at the input size.
500    """
501    return ''.join(random.choice(chars) for _ in range(size))
502
503
504def cleanup_scanners_and_advertisers(scn_android_device, scn_callback_list,
505                                     adv_android_device, adv_callback_list):
506    """Try to gracefully stop all scanning and advertising instances.
507
508    Args:
509        scn_android_device: The Android device that is actively scanning.
510        scn_callback_list: The scan callback id list that needs to be stopped.
511        adv_android_device: The Android device that is actively advertising.
512        adv_callback_list: The advertise callback id list that needs to be
513            stopped.
514    """
515    scan_droid, scan_ed = scn_android_device.droid, scn_android_device.ed
516    adv_droid = adv_android_device.droid
517    try:
518        for scan_callback in scn_callback_list:
519            scan_droid.bleStopBleScan(scan_callback)
520    except Exception as err:
521        scn_android_device.log.debug(
522            "Failed to stop LE scan... reseting Bluetooth. Error {}".format(
523                err))
524        reset_bluetooth([scn_android_device])
525    try:
526        for adv_callback in adv_callback_list:
527            adv_droid.bleStopBleAdvertising(adv_callback)
528    except Exception as err:
529        adv_android_device.log.debug(
530            "Failed to stop LE advertisement... reseting Bluetooth. Error {}".
531                format(err))
532        reset_bluetooth([adv_android_device])
533
534
535def get_mac_address_of_generic_advertisement(scan_ad, adv_ad):
536    """Start generic advertisement and get it's mac address by LE scanning.
537
538    Args:
539        scan_ad: The Android device to use as the scanner.
540        adv_ad: The Android device to use as the advertiser.
541
542    Returns:
543        mac_address: The mac address of the advertisement.
544        advertise_callback: The advertise callback id of the active
545            advertisement.
546    """
547    adv_ad.droid.bleSetAdvertiseDataIncludeDeviceName(True)
548    adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode(
549        ble_advertise_settings_modes['low_latency'])
550    adv_ad.droid.bleSetAdvertiseSettingsIsConnectable(True)
551    adv_ad.droid.bleSetAdvertiseSettingsTxPowerLevel(
552        ble_advertise_settings_tx_powers['high'])
553    advertise_callback, advertise_data, advertise_settings = (
554        generate_ble_advertise_objects(adv_ad.droid))
555    adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data,
556                                        advertise_settings)
557    try:
558        adv_ad.ed.pop_event(
559            adv_succ.format(advertise_callback), bt_default_timeout)
560    except Empty as err:
561        raise BtTestUtilsError(
562            "Advertiser did not start successfully {}".format(err))
563    filter_list = scan_ad.droid.bleGenFilterList()
564    scan_settings = scan_ad.droid.bleBuildScanSetting()
565    scan_callback = scan_ad.droid.bleGenScanCallback()
566    scan_ad.droid.bleSetScanFilterDeviceName(
567        adv_ad.droid.bluetoothGetLocalName())
568    scan_ad.droid.bleBuildScanFilter(filter_list)
569    scan_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback)
570    try:
571        event = scan_ad.ed.pop_event(
572            "BleScan{}onScanResults".format(scan_callback), bt_default_timeout)
573    except Empty as err:
574        raise BtTestUtilsError(
575            "Scanner did not find advertisement {}".format(err))
576    mac_address = event['data']['Result']['deviceInfo']['address']
577    return mac_address, advertise_callback, scan_callback
578
579def enable_bluetooth(droid, ed):
580    if droid.bluetoothCheckState() is True:
581        return True
582
583    droid.bluetoothToggleState(True)
584    expected_bluetooth_on_event_name = bluetooth_on
585    try:
586        ed.pop_event(expected_bluetooth_on_event_name, bt_default_timeout)
587    except Exception:
588        log.info("Failed to toggle Bluetooth on (no broadcast received)")
589        if droid.bluetoothCheckState() is True:
590            log.info(".. actual state is ON")
591            return True
592        log.info(".. actual state is OFF")
593        return False
594
595    return True
596
597def disable_bluetooth(droid):
598    """Disable Bluetooth on input Droid object.
599
600    Args:
601        droid: The droid object to disable Bluetooth on.
602
603    Returns:
604        True if successful, false if unsuccessful.
605    """
606    if droid.bluetoothCheckState() is True:
607        droid.bluetoothToggleState(False)
608        if droid.bluetoothCheckState() is True:
609            log.error("Failed to toggle Bluetooth off.")
610            return False
611    return True
612
613
614def set_bt_scan_mode(ad, scan_mode_value):
615    """Set Android device's Bluetooth scan mode.
616
617    Args:
618        ad: The Android device to set the scan mode on.
619        scan_mode_value: The value to set the scan mode to.
620
621    Returns:
622        True if successful, false if unsuccessful.
623    """
624    droid, ed = ad.droid, ad.ed
625    if scan_mode_value == bt_scan_mode_types['state_off']:
626        disable_bluetooth(droid)
627        scan_mode = droid.bluetoothGetScanMode()
628        reset_bluetooth([ad])
629        if scan_mode != scan_mode_value:
630            return False
631    elif scan_mode_value == bt_scan_mode_types['none']:
632        droid.bluetoothMakeUndiscoverable()
633        scan_mode = droid.bluetoothGetScanMode()
634        if scan_mode != scan_mode_value:
635            return False
636    elif scan_mode_value == bt_scan_mode_types['connectable']:
637        droid.bluetoothMakeUndiscoverable()
638        droid.bluetoothMakeConnectable()
639        scan_mode = droid.bluetoothGetScanMode()
640        if scan_mode != scan_mode_value:
641            return False
642    elif (scan_mode_value == bt_scan_mode_types['connectable_discoverable']):
643        droid.bluetoothMakeDiscoverable()
644        scan_mode = droid.bluetoothGetScanMode()
645        if scan_mode != scan_mode_value:
646            return False
647    else:
648        # invalid scan mode
649        return False
650    return True
651
652
653def set_device_name(droid, name):
654    """Set and check Bluetooth local name on input droid object.
655
656    Args:
657        droid: Droid object to set local name on.
658        name: the Bluetooth local name to set.
659
660    Returns:
661        True if successful, false if unsuccessful.
662    """
663    droid.bluetoothSetLocalName(name)
664    time.sleep(2)
665    droid_name = droid.bluetoothGetLocalName()
666    if droid_name != name:
667        return False
668    return True
669
670
671def check_device_supported_profiles(droid):
672    """Checks for Android device supported profiles.
673
674    Args:
675        droid: The droid object to query.
676
677    Returns:
678        A dictionary of supported profiles.
679    """
680    profile_dict = {}
681    profile_dict['hid'] = droid.bluetoothHidIsReady()
682    profile_dict['hsp'] = droid.bluetoothHspIsReady()
683    profile_dict['a2dp'] = droid.bluetoothA2dpIsReady()
684    profile_dict['avrcp'] = droid.bluetoothAvrcpIsReady()
685    profile_dict['a2dp_sink'] = droid.bluetoothA2dpSinkIsReady()
686    profile_dict['hfp_client'] = droid.bluetoothHfpClientIsReady()
687    profile_dict['pbap_client'] = droid.bluetoothPbapClientIsReady()
688    return profile_dict
689
690
691def log_energy_info(android_devices, state):
692    """Logs energy info of input Android devices.
693
694    Args:
695        android_devices: input Android device list to log energy info from.
696        state: the input state to log. Usually 'Start' or 'Stop' for logging.
697
698    Returns:
699        A logging string of the Bluetooth energy info reported.
700    """
701    return_string = "{} Energy info collection:\n".format(state)
702    # Bug: b/31966929
703    return return_string
704
705
706def set_profile_priority(host_ad, client_ad, profiles, priority):
707    """Sets the priority of said profile(s) on host_ad for client_ad"""
708    for profile in profiles:
709        host_ad.log.info("Profile {} on {} for {} set to priority {}".format(
710            profile, host_ad.droid.bluetoothGetLocalName(),
711            client_ad.droid.bluetoothGetLocalAddress(), priority.value))
712        if bt_profile_constants['a2dp_sink'] == profile:
713            host_ad.droid.bluetoothA2dpSinkSetPriority(
714                client_ad.droid.bluetoothGetLocalAddress(), priority.value)
715        elif bt_profile_constants['headset_client'] == profile:
716            host_ad.droid.bluetoothHfpClientSetPriority(
717                client_ad.droid.bluetoothGetLocalAddress(), priority.value)
718        elif bt_profile_constants['pbap_client'] == profile:
719            host_ad.droid.bluetoothPbapClientSetPriority(
720                client_ad.droid.bluetoothGetLocalAddress(), priority.value)
721        else:
722            host_ad.log.error(
723                "Profile {} not yet supported for priority settings".format(
724                    profile))
725
726
727def pair_pri_to_sec(pri_ad, sec_ad, attempts=2, auto_confirm=True):
728    """Pairs pri droid to secondary droid.
729
730    Args:
731        pri_ad: Android device initiating connection
732        sec_ad: Android device accepting connection
733        attempts: Number of attempts to try until failure.
734        auto_confirm: Auto confirm passkey match for both devices
735
736    Returns:
737        Pass if True
738        Fail if False
739    """
740    pri_ad.droid.bluetoothStartConnectionStateChangeMonitor(
741        sec_ad.droid.bluetoothGetLocalAddress())
742    curr_attempts = 0
743    while curr_attempts < attempts:
744        if _pair_pri_to_sec(pri_ad, sec_ad, auto_confirm):
745            return True
746        # Wait 2 seconds before unbound
747        time.sleep(2)
748        if not clear_bonded_devices(pri_ad):
749            log.error("Failed to clear bond for primary device at attempt {}"
750                      .format(str(curr_attempts)))
751            return False
752        if not clear_bonded_devices(sec_ad):
753            log.error("Failed to clear bond for secondary device at attempt {}"
754                      .format(str(curr_attempts)))
755            return False
756        # Wait 2 seconds after unbound
757        time.sleep(2)
758        curr_attempts += 1
759    log.error("pair_pri_to_sec failed to connect after {} attempts".format(
760        str(attempts)))
761    return False
762
763
764def _wait_for_passkey_match(pri_ad, sec_ad):
765    pri_pin, sec_pin = -1, 1
766    pri_variant, sec_variant = -1, 1
767    pri_pairing_req, sec_pairing_req = None, None
768    try:
769        pri_pairing_req = pri_ad.ed.pop_event(
770            event_name="BluetoothActionPairingRequest",
771            timeout=bt_default_timeout)
772        pri_variant = pri_pairing_req["data"]["PairingVariant"]
773        pri_pin = pri_pairing_req["data"]["Pin"]
774        pri_ad.log.info("Primary device received Pin: {}, Variant: {}".format(
775            pri_pin, pri_variant))
776        sec_pairing_req = sec_ad.ed.pop_event(
777            event_name="BluetoothActionPairingRequest",
778            timeout=bt_default_timeout)
779        sec_variant = sec_pairing_req["data"]["PairingVariant"]
780        sec_pin = sec_pairing_req["data"]["Pin"]
781        sec_ad.log.info("Secondary device received Pin: {}, Variant: {}"
782                        .format(sec_pin, sec_variant))
783    except Empty as err:
784        log.error("Wait for pin error: {}".format(err))
785        log.error("Pairing request state, Primary: {}, Secondary: {}".format(
786            pri_pairing_req, sec_pairing_req))
787        return False
788    if pri_variant == sec_variant == pairing_variant_passkey_confirmation:
789        confirmation = pri_pin == sec_pin
790        if confirmation:
791            log.info("Pairing code matched, accepting connection")
792        else:
793            log.info("Pairing code mismatched, rejecting connection")
794        pri_ad.droid.eventPost("BluetoothActionPairingRequestUserConfirm",
795                               str(confirmation))
796        sec_ad.droid.eventPost("BluetoothActionPairingRequestUserConfirm",
797                               str(confirmation))
798        if not confirmation:
799            return False
800    elif pri_variant != sec_variant:
801        log.error("Pairing variant mismatched, abort connection")
802        return False
803    return True
804
805
806def _pair_pri_to_sec(pri_ad, sec_ad, auto_confirm):
807    # Enable discovery on sec_ad so that pri_ad can find it.
808    # The timeout here is based on how much time it would take for two devices
809    # to pair with each other once pri_ad starts seeing devices.
810    pri_droid = pri_ad.droid
811    sec_droid = sec_ad.droid
812    pri_ad.ed.clear_all_events()
813    sec_ad.ed.clear_all_events()
814    log.info("Bonding device {} to {}".format(
815        pri_droid.bluetoothGetLocalAddress(),
816        sec_droid.bluetoothGetLocalAddress()))
817    sec_droid.bluetoothMakeDiscoverable(bt_default_timeout)
818    target_address = sec_droid.bluetoothGetLocalAddress()
819    log.debug("Starting paring helper on each device")
820    pri_droid.bluetoothStartPairingHelper(auto_confirm)
821    sec_droid.bluetoothStartPairingHelper(auto_confirm)
822    pri_ad.log.info("Primary device starting discovery and executing bond")
823    result = pri_droid.bluetoothDiscoverAndBond(target_address)
824    if not auto_confirm:
825        if not _wait_for_passkey_match(pri_ad, sec_ad):
826            return False
827    # Loop until we have bonded successfully or timeout.
828    end_time = time.time() + bt_default_timeout
829    pri_ad.log.info("Verifying devices are bonded")
830    while time.time() < end_time:
831        bonded_devices = pri_droid.bluetoothGetBondedDevices()
832        bonded = False
833        for d in bonded_devices:
834            if d['address'] == target_address:
835                pri_ad.log.info("Successfully bonded to device")
836                return True
837        time.sleep(0.1)
838    # Timed out trying to bond.
839    pri_ad.log.info("Failed to bond devices.")
840    return False
841
842
843def connect_pri_to_sec(pri_ad, sec_ad, profiles_set, attempts=2):
844    """Connects pri droid to secondary droid.
845
846    Args:
847        pri_ad: AndroidDroid initiating connection
848        sec_ad: AndroidDroid accepting connection
849        profiles_set: Set of profiles to be connected
850        attempts: Number of attempts to try until failure.
851
852    Returns:
853        Pass if True
854        Fail if False
855    """
856    device_addr = sec_ad.droid.bluetoothGetLocalAddress()
857    # Allows extra time for the SDP records to be updated.
858    time.sleep(2)
859    curr_attempts = 0
860    while curr_attempts < attempts:
861        log.info("connect_pri_to_sec curr attempt {} total {}".format(
862            curr_attempts, attempts))
863        if _connect_pri_to_sec(pri_ad, sec_ad, profiles_set):
864            return True
865        curr_attempts += 1
866    log.error("connect_pri_to_sec failed to connect after {} attempts".format(
867        attempts))
868    return False
869
870
871def _connect_pri_to_sec(pri_ad, sec_ad, profiles_set):
872    """Connects pri droid to secondary droid.
873
874    Args:
875        pri_ad: AndroidDroid initiating connection.
876        sec_ad: AndroidDroid accepting connection.
877        profiles_set: Set of profiles to be connected.
878
879    Returns:
880        True of connection is successful, false if unsuccessful.
881    """
882    # Check if we support all profiles.
883    supported_profiles = bt_profile_constants.values()
884    for profile in profiles_set:
885        if profile not in supported_profiles:
886            pri_ad.log.info("Profile {} is not supported list {}".format(
887                profile, supported_profiles))
888            return False
889
890    # First check that devices are bonded.
891    paired = False
892    for paired_device in pri_ad.droid.bluetoothGetBondedDevices():
893        if paired_device['address'] == \
894                sec_ad.droid.bluetoothGetLocalAddress():
895            paired = True
896            break
897
898    if not paired:
899        pri_ad.log.error("Not paired to {}".format(sec_ad.serial))
900        return False
901
902    # Now try to connect them, the following call will try to initiate all
903    # connections.
904    pri_ad.droid.bluetoothConnectBonded(
905        sec_ad.droid.bluetoothGetLocalAddress())
906
907    end_time = time.time() + 10
908    profile_connected = set()
909    sec_addr = sec_ad.droid.bluetoothGetLocalAddress()
910    pri_ad.log.info("Profiles to be connected {}".format(profiles_set))
911    # First use APIs to check profile connection state
912    while (time.time() < end_time
913           and not profile_connected.issuperset(profiles_set)):
914        if (bt_profile_constants['headset_client'] not in profile_connected
915                and bt_profile_constants['headset_client'] in profiles_set):
916            if is_hfp_client_device_connected(pri_ad, sec_addr):
917                profile_connected.add(bt_profile_constants['headset_client'])
918        if (bt_profile_constants['a2dp'] not in profile_connected
919                and bt_profile_constants['a2dp'] in profiles_set):
920            if is_a2dp_src_device_connected(pri_ad, sec_addr):
921                profile_connected.add(bt_profile_constants['a2dp'])
922        if (bt_profile_constants['a2dp_sink'] not in profile_connected
923                and bt_profile_constants['a2dp_sink'] in profiles_set):
924            if is_a2dp_snk_device_connected(pri_ad, sec_addr):
925                profile_connected.add(bt_profile_constants['a2dp_sink'])
926        if (bt_profile_constants['map_mce'] not in profile_connected
927                and bt_profile_constants['map_mce'] in profiles_set):
928            if is_map_mce_device_connected(pri_ad, sec_addr):
929                profile_connected.add(bt_profile_constants['map_mce'])
930        if (bt_profile_constants['map'] not in profile_connected
931                and bt_profile_constants['map'] in profiles_set):
932            if is_map_mse_device_connected(pri_ad, sec_addr):
933                profile_connected.add(bt_profile_constants['map'])
934        time.sleep(0.1)
935    # If APIs fail, try to find the connection broadcast receiver.
936    while not profile_connected.issuperset(profiles_set):
937        try:
938            profile_event = pri_ad.ed.pop_event(
939                bluetooth_profile_connection_state_changed,
940                bt_default_timeout + 10)
941            pri_ad.log.info("Got event {}".format(profile_event))
942        except Exception:
943            pri_ad.log.error("Did not get {} profiles left {}".format(
944                bluetooth_profile_connection_state_changed, profile_connected))
945            return False
946
947        profile = profile_event['data']['profile']
948        state = profile_event['data']['state']
949        device_addr = profile_event['data']['addr']
950
951        if state == bt_profile_states['connected'] and \
952                device_addr == sec_ad.droid.bluetoothGetLocalAddress():
953            profile_connected.add(profile)
954        pri_ad.log.info(
955            "Profiles connected until now {}".format(profile_connected))
956    # Failure happens inside the while loop. If we came here then we already
957    # connected.
958    return True
959
960
961def disconnect_pri_from_sec(pri_ad, sec_ad, profiles_list):
962    """
963    Disconnect primary from secondary on a specific set of profiles
964    Args:
965        pri_ad - Primary android_device initiating disconnection
966        sec_ad - Secondary android droid (sl4a interface to keep the
967          method signature the same connect_pri_to_sec above)
968        profiles_list - List of profiles we want to disconnect from
969
970    Returns:
971        True on Success
972        False on Failure
973    """
974    # Sanity check to see if all the profiles in the given set is supported
975    supported_profiles = bt_profile_constants.values()
976    for profile in profiles_list:
977        if profile not in supported_profiles:
978            pri_ad.log.info("Profile {} is not in supported list {}".format(
979                profile, supported_profiles))
980            return False
981
982    pri_ad.log.info(pri_ad.droid.bluetoothGetBondedDevices())
983    # Disconnecting on a already disconnected profile is a nop,
984    # so not checking for the connection state
985    try:
986        pri_ad.droid.bluetoothDisconnectConnectedProfile(
987            sec_ad.droid.bluetoothGetLocalAddress(), profiles_list)
988    except Exception as err:
989        pri_ad.log.error(
990            "Exception while trying to disconnect profile(s) {}: {}".format(
991                profiles_list, err))
992        return False
993
994    profile_disconnected = set()
995    pri_ad.log.info("Disconnecting from profiles: {}".format(profiles_list))
996
997    while not profile_disconnected.issuperset(profiles_list):
998        try:
999            profile_event = pri_ad.ed.pop_event(
1000                bluetooth_profile_connection_state_changed, bt_default_timeout)
1001            pri_ad.log.info("Got event {}".format(profile_event))
1002        except Exception as e:
1003            pri_ad.log.error("Did not disconnect from Profiles. Reason {}".format(e))
1004            return False
1005
1006        profile = profile_event['data']['profile']
1007        state = profile_event['data']['state']
1008        device_addr = profile_event['data']['addr']
1009
1010        if state == bt_profile_states['disconnected'] and \
1011                device_addr == sec_ad.droid.bluetoothGetLocalAddress():
1012            profile_disconnected.add(profile)
1013        pri_ad.log.info(
1014            "Profiles disconnected so far {}".format(profile_disconnected))
1015
1016    return True
1017
1018
1019def take_btsnoop_logs(android_devices, testcase, testname):
1020    """Pull btsnoop logs from an input list of android devices.
1021
1022    Args:
1023        android_devices: the list of Android devices to pull btsnoop logs from.
1024        testcase: Name of the test calss that triggered this snoop log.
1025        testname: Name of the test case that triggered this bug report.
1026    """
1027    for a in android_devices:
1028        take_btsnoop_log(a, testcase, testname)
1029
1030
1031def take_btsnoop_log(ad, testcase, testname):
1032    """Grabs the btsnoop_hci log on a device and stores it in the log directory
1033    of the test class.
1034
1035    If you want grab the btsnoop_hci log, call this function with android_device
1036    objects in on_fail. Bug report takes a relative long time to take, so use
1037    this cautiously.
1038
1039    Args:
1040        ad: The android_device instance to take bugreport on.
1041        testcase: Name of the test calss that triggered this snoop log.
1042        testname: Name of the test case that triggered this bug report.
1043    """
1044    testname = "".join(x for x in testname if x.isalnum())
1045    serial = ad.serial
1046    device_model = ad.droid.getBuildModel()
1047    device_model = device_model.replace(" ", "")
1048    out_name = ','.join((testname, device_model, serial))
1049    snoop_path = ad.log_path + "/BluetoothSnoopLogs"
1050    utils.create_dir(snoop_path)
1051    cmd = ''.join(("adb -s ", serial, " pull ", btsnoop_log_path_on_device,
1052                   " ", snoop_path + '/' + out_name, ".btsnoop_hci.log"))
1053    exe_cmd(cmd)
1054    try:
1055        cmd = ''.join(
1056            ("adb -s ", serial, " pull ", btsnoop_last_log_path_on_device, " ",
1057             snoop_path + '/' + out_name, ".btsnoop_hci.log.last"))
1058        exe_cmd(cmd)
1059    except Exception as err:
1060        testcase.log.info(
1061            "File does not exist {}".format(btsnoop_last_log_path_on_device))
1062
1063
1064def kill_bluetooth_process(ad):
1065    """Kill Bluetooth process on Android device.
1066
1067    Args:
1068        ad: Android device to kill BT process on.
1069    """
1070    ad.log.info("Killing Bluetooth process.")
1071    pid = ad.adb.shell(
1072        "ps | grep com.android.bluetooth | awk '{print $2}'").decode('ascii')
1073    call(["adb -s " + ad.serial + " shell kill " + pid], shell=True)
1074
1075
1076def orchestrate_rfcomm_connection(client_ad,
1077                                  server_ad,
1078                                  accept_timeout_ms=default_rfcomm_timeout_ms,
1079                                  uuid=None):
1080    """Sets up the RFCOMM connection between two Android devices.
1081
1082    Args:
1083        client_ad: the Android device performing the connection.
1084        server_ad: the Android device accepting the connection.
1085    Returns:
1086        True if connection was successful, false if unsuccessful.
1087    """
1088    result = orchestrate_bluetooth_socket_connection(
1089        client_ad, server_ad, accept_timeout_ms,
1090        (bt_rfcomm_uuids['default_uuid'] if uuid is None else uuid))
1091
1092    return result
1093
1094
1095def orchestrate_bluetooth_socket_connection(
1096        client_ad,
1097        server_ad,
1098        accept_timeout_ms=default_bluetooth_socket_timeout_ms,
1099        uuid=None):
1100    """Sets up the Bluetooth Socket connection between two Android devices.
1101
1102    Args:
1103        client_ad: the Android device performing the connection.
1104        server_ad: the Android device accepting the connection.
1105    Returns:
1106        True if connection was successful, false if unsuccessful.
1107    """
1108    server_ad.droid.bluetoothStartPairingHelper()
1109    client_ad.droid.bluetoothStartPairingHelper()
1110
1111    server_ad.droid.bluetoothSocketConnBeginAcceptThreadUuid(
1112        (bluetooth_socket_conn_test_uuid
1113         if uuid is None else uuid), accept_timeout_ms)
1114    client_ad.droid.bluetoothSocketConnBeginConnectThreadUuid(
1115        server_ad.droid.bluetoothGetLocalAddress(),
1116        (bluetooth_socket_conn_test_uuid if uuid is None else uuid))
1117
1118    end_time = time.time() + bt_default_timeout
1119    result = False
1120    test_result = True
1121    while time.time() < end_time:
1122        if len(client_ad.droid.bluetoothSocketConnActiveConnections()) > 0:
1123            test_result = True
1124            client_ad.log.info("Bluetooth socket Client Connection Active")
1125            break
1126        else:
1127            test_result = False
1128        time.sleep(1)
1129    if not test_result:
1130        client_ad.log.error(
1131            "Failed to establish a Bluetooth socket connection")
1132        return False
1133    return True
1134
1135
1136def write_read_verify_data(client_ad, server_ad, msg, binary=False):
1137    """Verify that the client wrote data to the server Android device correctly.
1138
1139    Args:
1140        client_ad: the Android device to perform the write.
1141        server_ad: the Android device to read the data written.
1142        msg: the message to write.
1143        binary: if the msg arg is binary or not.
1144
1145    Returns:
1146        True if the data written matches the data read, false if not.
1147    """
1148    client_ad.log.info("Write message.")
1149    try:
1150        if binary:
1151            client_ad.droid.bluetoothSocketConnWriteBinary(msg)
1152        else:
1153            client_ad.droid.bluetoothSocketConnWrite(msg)
1154    except Exception as err:
1155        client_ad.log.error("Failed to write data: {}".format(err))
1156        return False
1157    server_ad.log.info("Read message.")
1158    try:
1159        if binary:
1160            read_msg = server_ad.droid.bluetoothSocketConnReadBinary().rstrip(
1161                "\r\n")
1162        else:
1163            read_msg = server_ad.droid.bluetoothSocketConnRead()
1164    except Exception as err:
1165        server_ad.log.error("Failed to read data: {}".format(err))
1166        return False
1167    log.info("Verify message.")
1168    if msg != read_msg:
1169        log.error("Mismatch! Read: {}, Expected: {}".format(read_msg, msg))
1170        return False
1171    return True
1172
1173
1174def clear_bonded_devices(ad):
1175    """Clear bonded devices from the input Android device.
1176
1177    Args:
1178        ad: the Android device performing the connection.
1179    Returns:
1180        True if clearing bonded devices was successful, false if unsuccessful.
1181    """
1182    bonded_device_list = ad.droid.bluetoothGetBondedDevices()
1183    for device in bonded_device_list:
1184        device_address = device['address']
1185        if not ad.droid.bluetoothUnbond(device_address):
1186            log.error("Failed to unbond {}".format(device_address))
1187            return False
1188        log.info("Successfully unbonded {}".format(device_address))
1189    return True
1190
1191
1192def verify_server_and_client_connected(client_ad, server_ad, log=True):
1193    """Verify that input server and client Android devices are connected.
1194
1195    This code is under the assumption that there will only be
1196    a single connection.
1197
1198    Args:
1199        client_ad: the Android device to check number of active connections.
1200        server_ad: the Android device to check number of active connections.
1201
1202    Returns:
1203        True both server and client have at least 1 active connection,
1204        false if unsuccessful.
1205    """
1206    test_result = True
1207    if len(server_ad.droid.bluetoothSocketConnActiveConnections()) == 0:
1208        if log:
1209            server_ad.log.error("No socket connections found on server.")
1210        test_result = False
1211    if len(client_ad.droid.bluetoothSocketConnActiveConnections()) == 0:
1212        if log:
1213            client_ad.log.error("No socket connections found on client.")
1214        test_result = False
1215    return test_result
1216
1217
1218def orchestrate_and_verify_pan_connection(pan_dut, panu_dut):
1219    """Setups up a PAN conenction between two android devices.
1220
1221    Args:
1222        pan_dut: the Android device providing tethering services
1223        panu_dut: the Android device using the internet connection from the
1224            pan_dut
1225    Returns:
1226        True if PAN connection and verification is successful,
1227        false if unsuccessful.
1228    """
1229    if not toggle_airplane_mode_by_adb(log, panu_dut, True):
1230        panu_dut.log.error("Failed to toggle airplane mode on")
1231        return False
1232    if not toggle_airplane_mode_by_adb(log, panu_dut, False):
1233        pan_dut.log.error("Failed to toggle airplane mode off")
1234        return False
1235    pan_dut.droid.bluetoothStartConnectionStateChangeMonitor("")
1236    panu_dut.droid.bluetoothStartConnectionStateChangeMonitor("")
1237    if not bluetooth_enabled_check(panu_dut):
1238        return False
1239    if not bluetooth_enabled_check(pan_dut):
1240        return False
1241    pan_dut.droid.bluetoothPanSetBluetoothTethering(True)
1242    if not (pair_pri_to_sec(pan_dut, panu_dut)):
1243        return False
1244    if not pan_dut.droid.bluetoothPanIsTetheringOn():
1245        pan_dut.log.error("Failed to enable Bluetooth tethering.")
1246        return False
1247    # Magic sleep needed to give the stack time in between bonding and
1248    # connecting the PAN profile.
1249    time.sleep(pan_connect_timeout)
1250    panu_dut.droid.bluetoothConnectBonded(
1251        pan_dut.droid.bluetoothGetLocalAddress())
1252    if not verify_http_connection(log, panu_dut):
1253        panu_dut.log.error("Can't verify http connection on PANU device.")
1254        if not verify_http_connection(log, pan_dut):
1255            pan_dut.log.info(
1256                "Can't verify http connection on PAN service device")
1257        return False
1258    return True
1259
1260
1261def is_hfp_client_device_connected(ad, addr):
1262    """Determines if an AndroidDevice has HFP connectivity to input address
1263
1264    Args:
1265        ad: the Android device
1266        addr: the address that's expected
1267    Returns:
1268        True if connection was successful, false if unsuccessful.
1269    """
1270    devices = ad.droid.bluetoothHfpClientGetConnectedDevices()
1271    ad.log.info("Connected HFP Client devices: {}".format(devices))
1272    if addr in {d['address'] for d in devices}:
1273        return True
1274    return False
1275
1276
1277def is_a2dp_src_device_connected(ad, addr):
1278    """Determines if an AndroidDevice has A2DP connectivity to input address
1279
1280    Args:
1281        ad: the Android device
1282        addr: the address that's expected
1283    Returns:
1284        True if connection was successful, false if unsuccessful.
1285    """
1286    devices = ad.droid.bluetoothA2dpGetConnectedDevices()
1287    ad.log.info("Connected A2DP Source devices: {}".format(devices))
1288    if addr in {d['address'] for d in devices}:
1289        return True
1290    return False
1291
1292
1293def is_a2dp_snk_device_connected(ad, addr):
1294    """Determines if an AndroidDevice has A2DP snk connectivity to input address
1295
1296    Args:
1297        ad: the Android device
1298        addr: the address that's expected
1299    Returns:
1300        True if connection was successful, false if unsuccessful.
1301    """
1302    devices = ad.droid.bluetoothA2dpSinkGetConnectedDevices()
1303    ad.log.info("Connected A2DP Sink devices: {}".format(devices))
1304    if addr in {d['address'] for d in devices}:
1305        return True
1306    return False
1307
1308
1309def is_map_mce_device_connected(ad, addr):
1310    """Determines if an AndroidDevice has MAP MCE connectivity to input address
1311
1312    Args:
1313        ad: the Android device
1314        addr: the address that's expected
1315    Returns:
1316        True if connection was successful, false if unsuccessful.
1317    """
1318    devices = ad.droid.bluetoothMapClientGetConnectedDevices()
1319    ad.log.info("Connected MAP MCE devices: {}".format(devices))
1320    if addr in {d['address'] for d in devices}:
1321        return True
1322    return False
1323
1324
1325def is_map_mse_device_connected(ad, addr):
1326    """Determines if an AndroidDevice has MAP MSE connectivity to input address
1327
1328    Args:
1329        ad: the Android device
1330        addr: the address that's expected
1331    Returns:
1332        True if connection was successful, false if unsuccessful.
1333    """
1334    devices = ad.droid.bluetoothMapGetConnectedDevices()
1335    ad.log.info("Connected MAP MSE devices: {}".format(devices))
1336    if addr in {d['address'] for d in devices}:
1337        return True
1338    return False
1339
1340
1341def hid_keyboard_report(key, modifier="00"):
1342    """Get the HID keyboard report for the given key
1343
1344    Args:
1345        key: the key we want
1346        modifier: HID keyboard modifier bytes
1347    Returns:
1348        The byte array for the HID report.
1349    """
1350    return str(
1351        bytearray.fromhex(" ".join(
1352            [modifier, "00", key, "00", "00", "00", "00", "00"])), "utf-8")
1353
1354
1355def hid_device_send_key_data_report(host_id, device_ad, key, interval=1):
1356    """Send a HID report simulating a 1-second keyboard press from host_ad to
1357    device_ad
1358
1359    Args:
1360        host_id: the Bluetooth MAC address or name of the HID host
1361        device_ad: HID device
1362        key: the key we want to send
1363        interval: the interval between key press and key release
1364    """
1365    device_ad.droid.bluetoothHidDeviceSendReport(host_id, hid_id_keyboard,
1366                                                 hid_keyboard_report(key))
1367    time.sleep(interval)
1368    device_ad.droid.bluetoothHidDeviceSendReport(host_id, hid_id_keyboard,
1369                                                 hid_keyboard_report("00"))
1370
1371
1372def is_a2dp_connected(sink, source):
1373    """
1374    Convenience Function to see if the 2 devices are connected on
1375    A2dp.
1376    Args:
1377        sink:       Audio Sink
1378        source:     Audio Source
1379    Returns:
1380        True if Connected
1381        False if Not connected
1382    """
1383
1384    devices = sink.droid.bluetoothA2dpSinkGetConnectedDevices()
1385    for device in devices:
1386        sink.log.info("A2dp Connected device {}".format(device["name"]))
1387        if (device["address"] == source.droid.bluetoothGetLocalAddress()):
1388            return True
1389    return False
1390