1# /usr/bin/env python3.4 2# 3# Copyright (C) 2016 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16 17import logging 18import random 19import pprint 20import string 21from queue import Empty 22import queue 23import threading 24import time 25from acts import utils 26 27from subprocess import call 28 29from acts.test_utils.bt.bt_constants import adv_fail 30from acts.test_utils.bt.bt_constants import adv_succ 31from acts.test_utils.bt.bt_constants import advertising_set_started 32from acts.test_utils.bt.bt_constants import advertising_set_stopped 33from acts.test_utils.bt.bt_constants import advertising_set_on_own_address_read 34from acts.test_utils.bt.bt_constants import advertising_set_stopped 35from acts.test_utils.bt.bt_constants import advertising_set_enabled 36from acts.test_utils.bt.bt_constants import advertising_set_data_set 37from acts.test_utils.bt.bt_constants import advertising_set_scan_response_set 38from acts.test_utils.bt.bt_constants import advertising_set_parameters_update 39from acts.test_utils.bt.bt_constants import \ 40 advertising_set_periodic_parameters_updated 41from acts.test_utils.bt.bt_constants import advertising_set_periodic_data_set 42from acts.test_utils.bt.bt_constants import advertising_set_periodic_enable 43from acts.test_utils.bt.bt_constants import batch_scan_not_supported_list 44from acts.test_utils.bt.bt_constants import batch_scan_result 45from acts.test_utils.bt.bt_constants import ble_advertise_settings_modes 46from acts.test_utils.bt.bt_constants import ble_advertise_settings_tx_powers 47from acts.test_utils.bt.bt_constants import bluetooth_off 48from acts.test_utils.bt.bt_constants import bluetooth_on 49from acts.test_utils.bt.bt_constants import \ 50 bluetooth_profile_connection_state_changed 51from acts.test_utils.bt.bt_constants import bt_default_timeout 52from acts.test_utils.bt.bt_constants import bt_discovery_timeout 53from acts.test_utils.bt.bt_constants import bt_profile_states 54from acts.test_utils.bt.bt_constants import bt_profile_constants 55from acts.test_utils.bt.bt_constants import bt_rfcomm_uuids 56from acts.test_utils.bt.bt_constants import bluetooth_socket_conn_test_uuid 57from acts.test_utils.bt.bt_constants import bt_scan_mode_types 58from acts.test_utils.bt.bt_constants import btsnoop_last_log_path_on_device 59from acts.test_utils.bt.bt_constants import btsnoop_log_path_on_device 60from acts.test_utils.bt.bt_constants import default_rfcomm_timeout_ms 61from acts.test_utils.bt.bt_constants import default_bluetooth_socket_timeout_ms 62from acts.test_utils.bt.bt_constants import mtu_changed 63from acts.test_utils.bt.bt_constants import pairing_variant_passkey_confirmation 64from acts.test_utils.bt.bt_constants import pan_connect_timeout 65from acts.test_utils.bt.bt_constants import small_timeout 66from acts.test_utils.bt.bt_constants import scan_result 67from acts.test_utils.bt.bt_constants import scan_failed 68from acts.test_utils.bt.bt_constants import hid_id_keyboard 69 70from acts.test_utils.tel.tel_test_utils import toggle_airplane_mode_by_adb 71from acts.test_utils.tel.tel_test_utils import verify_http_connection 72from acts.utils import exe_cmd 73from acts.utils import create_dir 74 75log = logging 76 77advertisements_to_devices = {} 78 79 80class BtTestUtilsError(Exception): 81 pass 82 83 84def scan_and_verify_n_advertisements(scn_ad, max_advertisements): 85 """Verify that input number of advertisements can be found from the scanning 86 Android device. 87 88 Args: 89 scn_ad: The Android device to start LE scanning on. 90 max_advertisements: The number of advertisements the scanner expects to 91 find. 92 93 Returns: 94 True if successful, false if unsuccessful. 95 """ 96 test_result = False 97 address_list = [] 98 filter_list = scn_ad.droid.bleGenFilterList() 99 scn_ad.droid.bleBuildScanFilter(filter_list) 100 scan_settings = scn_ad.droid.bleBuildScanSetting() 101 scan_callback = scn_ad.droid.bleGenScanCallback() 102 scn_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) 103 start_time = time.time() 104 while (start_time + bt_default_timeout) > time.time(): 105 event = None 106 try: 107 event = scn_ad.ed.pop_event( 108 scan_result.format(scan_callback), bt_default_timeout) 109 except Empty as error: 110 raise BtTestUtilsError( 111 "Failed to find scan event: {}".format(error)) 112 address = event['data']['Result']['deviceInfo']['address'] 113 if address not in address_list: 114 address_list.append(address) 115 if len(address_list) == max_advertisements: 116 test_result = True 117 break 118 scn_ad.droid.bleStopBleScan(scan_callback) 119 return test_result 120 121 122def setup_n_advertisements(adv_ad, num_advertisements): 123 """Setup input number of advertisements on input Android device. 124 125 Args: 126 adv_ad: The Android device to start LE advertisements on. 127 num_advertisements: The number of advertisements to start. 128 129 Returns: 130 advertise_callback_list: List of advertisement callback ids. 131 """ 132 adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode( 133 ble_advertise_settings_modes['low_latency']) 134 advertise_data = adv_ad.droid.bleBuildAdvertiseData() 135 advertise_settings = adv_ad.droid.bleBuildAdvertiseSettings() 136 advertise_callback_list = [] 137 for i in range(num_advertisements): 138 advertise_callback = adv_ad.droid.bleGenBleAdvertiseCallback() 139 advertise_callback_list.append(advertise_callback) 140 adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data, 141 advertise_settings) 142 try: 143 adv_ad.ed.pop_event( 144 adv_succ.format(advertise_callback), bt_default_timeout) 145 adv_ad.log.info("Advertisement {} started.".format(i + 1)) 146 except Empty as error: 147 adv_ad.log.error("Advertisement {} failed to start.".format(i + 1)) 148 raise BtTestUtilsError( 149 "Test failed with Empty error: {}".format(error)) 150 return advertise_callback_list 151 152 153def teardown_n_advertisements(adv_ad, num_advertisements, 154 advertise_callback_list): 155 """Stop input number of advertisements on input Android device. 156 157 Args: 158 adv_ad: The Android device to stop LE advertisements on. 159 num_advertisements: The number of advertisements to stop. 160 advertise_callback_list: The list of advertisement callbacks to stop. 161 162 Returns: 163 True if successful, false if unsuccessful. 164 """ 165 for n in range(num_advertisements): 166 adv_ad.droid.bleStopBleAdvertising(advertise_callback_list[n]) 167 return True 168 169 170def generate_ble_scan_objects(droid): 171 """Generate generic LE scan objects. 172 173 Args: 174 droid: The droid object to generate LE scan objects from. 175 176 Returns: 177 filter_list: The generated scan filter list id. 178 scan_settings: The generated scan settings id. 179 scan_callback: The generated scan callback id. 180 """ 181 filter_list = droid.bleGenFilterList() 182 scan_settings = droid.bleBuildScanSetting() 183 scan_callback = droid.bleGenScanCallback() 184 return filter_list, scan_settings, scan_callback 185 186 187def generate_ble_advertise_objects(droid): 188 """Generate generic LE advertise objects. 189 190 Args: 191 droid: The droid object to generate advertise LE objects from. 192 193 Returns: 194 advertise_callback: The generated advertise callback id. 195 advertise_data: The generated advertise data id. 196 advertise_settings: The generated advertise settings id. 197 """ 198 advertise_callback = droid.bleGenBleAdvertiseCallback() 199 advertise_data = droid.bleBuildAdvertiseData() 200 advertise_settings = droid.bleBuildAdvertiseSettings() 201 return advertise_callback, advertise_data, advertise_settings 202 203 204def setup_multiple_devices_for_bt_test(android_devices): 205 """A common setup routine for Bluetooth on input Android device list. 206 207 Things this function sets up: 208 1. Resets Bluetooth 209 2. Set Bluetooth local name to random string of size 4 210 3. Disable BLE background scanning. 211 4. Enable Bluetooth snoop logging. 212 213 Args: 214 android_devices: Android device list to setup Bluetooth on. 215 216 Returns: 217 True if successful, false if unsuccessful. 218 """ 219 log.info("Setting up Android Devices") 220 # TODO: Temp fix for an selinux error. 221 for ad in android_devices: 222 ad.adb.shell("setenforce 0") 223 threads = [] 224 try: 225 for a in android_devices: 226 thread = threading.Thread(target=factory_reset_bluetooth, args=([[a]])) 227 threads.append(thread) 228 thread.start() 229 for t in threads: 230 t.join() 231 232 for a in android_devices: 233 d = a.droid 234 # TODO: Create specific RPC command to instantiate 235 # BluetoothConnectionFacade. This is just a workaround. 236 d.bluetoothStartConnectionStateChangeMonitor("") 237 setup_result = d.bluetoothSetLocalName(generate_id_by_size(4)) 238 if not setup_result: 239 a.log.error("Failed to set device name.") 240 return setup_result 241 d.bluetoothDisableBLE() 242 bonded_devices = d.bluetoothGetBondedDevices() 243 for b in bonded_devices: 244 a.log.info("Removing bond for device {}".format(b['address'])) 245 d.bluetoothUnbond(b['address']) 246 for a in android_devices: 247 a.adb.shell("setprop persist.bluetooth.btsnoopenable true") 248 getprop_result = bool( 249 a.adb.shell("getprop persist.bluetooth.btsnoopenable")) 250 if not getprop_result: 251 a.log.warning("Failed to enable Bluetooth Hci Snoop Logging.") 252 except Exception as err: 253 log.error("Something went wrong in multi device setup: {}".format(err)) 254 return False 255 return setup_result 256 257 258def bluetooth_enabled_check(ad): 259 """Checks if the Bluetooth state is enabled, if not it will attempt to 260 enable it. 261 262 Args: 263 ad: The Android device list to enable Bluetooth on. 264 265 Returns: 266 True if successful, false if unsuccessful. 267 """ 268 if not ad.droid.bluetoothCheckState(): 269 ad.droid.bluetoothToggleState(True) 270 expected_bluetooth_on_event_name = bluetooth_on 271 try: 272 ad.ed.pop_event(expected_bluetooth_on_event_name, 273 bt_default_timeout) 274 except Empty: 275 ad.log.info( 276 "Failed to toggle Bluetooth on(no broadcast received).") 277 # Try one more time to poke at the actual state. 278 if ad.droid.bluetoothCheckState(): 279 ad.log.info(".. actual state is ON") 280 return True 281 ad.log.error(".. actual state is OFF") 282 return False 283 return True 284 285def wait_for_bluetooth_manager_state(droid, state=None, timeout=10, threshold=5): 286 """ Waits for BlueTooth normalized state or normalized explicit state 287 args: 288 droid: droid device object 289 state: expected BlueTooth state 290 timeout: max timeout threshold 291 threshold: list len of bt state 292 Returns: 293 True if successful, false if unsuccessful. 294 """ 295 all_states = [] 296 get_state = lambda: droid.bluetoothGetLeState() 297 start_time = time.time() 298 while time.time() < start_time + timeout: 299 all_states.append(get_state()) 300 if len(all_states) >= threshold: 301 # for any normalized state 302 if state is None: 303 if len(set(all_states[-threshold:])) == 1: 304 log.info("State normalized {}".format(set(all_states[-threshold:]))) 305 return True 306 else: 307 # explicit check against normalized state 308 if set([state]).issubset(all_states[-threshold:]): 309 return True 310 time.sleep(0.5) 311 log.error( 312 "Bluetooth state fails to normalize" if state is None else 313 "Failed to match bluetooth state, current state {} expected state {}".format(get_state(), state)) 314 return False 315 316def factory_reset_bluetooth(android_devices): 317 """Clears Bluetooth stack of input Android device list. 318 319 Args: 320 android_devices: The Android device list to reset Bluetooth 321 322 Returns: 323 True if successful, false if unsuccessful. 324 """ 325 for a in android_devices: 326 droid, ed = a.droid, a.ed 327 a.log.info("Reset state of bluetooth on device.") 328 if not bluetooth_enabled_check(a): 329 return False 330 # TODO: remove device unbond b/79418045 331 # Temporary solution to ensure all devices are unbonded 332 bonded_devices = droid.bluetoothGetBondedDevices() 333 for b in bonded_devices: 334 a.log.info("Removing bond for device {}".format(b['address'])) 335 droid.bluetoothUnbond(b['address']) 336 337 droid.bluetoothFactoryReset() 338 wait_for_bluetooth_manager_state(droid) 339 if not enable_bluetooth(droid, ed): 340 return False 341 return True 342 343def reset_bluetooth(android_devices): 344 """Resets Bluetooth state of input Android device list. 345 346 Args: 347 android_devices: The Android device list to reset Bluetooth state on. 348 349 Returns: 350 True if successful, false if unsuccessful. 351 """ 352 for a in android_devices: 353 droid, ed = a.droid, a.ed 354 a.log.info("Reset state of bluetooth on device.") 355 if droid.bluetoothCheckState() is True: 356 droid.bluetoothToggleState(False) 357 expected_bluetooth_off_event_name = bluetooth_off 358 try: 359 ed.pop_event(expected_bluetooth_off_event_name, 360 bt_default_timeout) 361 except Exception: 362 a.log.error("Failed to toggle Bluetooth off.") 363 return False 364 # temp sleep for b/17723234 365 time.sleep(3) 366 if not bluetooth_enabled_check(a): 367 return False 368 return True 369 370 371def determine_max_advertisements(android_device): 372 """Determines programatically how many advertisements the Android device 373 supports. 374 375 Args: 376 android_device: The Android device to determine max advertisements of. 377 378 Returns: 379 The maximum advertisement count. 380 """ 381 android_device.log.info( 382 "Determining number of maximum concurrent advertisements...") 383 advertisement_count = 0 384 bt_enabled = False 385 expected_bluetooth_on_event_name = bluetooth_on 386 if not android_device.droid.bluetoothCheckState(): 387 android_device.droid.bluetoothToggleState(True) 388 try: 389 android_device.ed.pop_event(expected_bluetooth_on_event_name, 390 bt_default_timeout) 391 except Exception: 392 android_device.log.info( 393 "Failed to toggle Bluetooth on(no broadcast received).") 394 # Try one more time to poke at the actual state. 395 if android_device.droid.bluetoothCheckState() is True: 396 android_device.log.info(".. actual state is ON") 397 else: 398 android_device.log.error( 399 "Failed to turn Bluetooth on. Setting default advertisements to 1" 400 ) 401 advertisement_count = -1 402 return advertisement_count 403 advertise_callback_list = [] 404 advertise_data = android_device.droid.bleBuildAdvertiseData() 405 advertise_settings = android_device.droid.bleBuildAdvertiseSettings() 406 while (True): 407 advertise_callback = android_device.droid.bleGenBleAdvertiseCallback() 408 advertise_callback_list.append(advertise_callback) 409 410 android_device.droid.bleStartBleAdvertising( 411 advertise_callback, advertise_data, advertise_settings) 412 413 regex = "(" + adv_succ.format( 414 advertise_callback) + "|" + adv_fail.format( 415 advertise_callback) + ")" 416 # wait for either success or failure event 417 evt = android_device.ed.pop_events(regex, bt_default_timeout, 418 small_timeout) 419 if evt[0]["name"] == adv_succ.format(advertise_callback): 420 advertisement_count += 1 421 android_device.log.info( 422 "Advertisement {} started.".format(advertisement_count)) 423 else: 424 error = evt[0]["data"]["Error"] 425 if error == "ADVERTISE_FAILED_TOO_MANY_ADVERTISERS": 426 android_device.log.info( 427 "Advertisement failed to start. Reached max " + 428 "advertisements at {}".format(advertisement_count)) 429 break 430 else: 431 raise BtTestUtilsError( 432 "Expected ADVERTISE_FAILED_TOO_MANY_ADVERTISERS," + 433 " but received bad error code {}".format(error)) 434 try: 435 for adv in advertise_callback_list: 436 android_device.droid.bleStopBleAdvertising(adv) 437 except Exception: 438 android_device.log.error( 439 "Failed to stop advertisingment, resetting Bluetooth.") 440 reset_bluetooth([android_device]) 441 return advertisement_count 442 443 444def get_advanced_droid_list(android_devices): 445 """Add max_advertisement and batch_scan_supported attributes to input 446 Android devices 447 448 This will programatically determine maximum LE advertisements of each 449 input Android device. 450 451 Args: 452 android_devices: The Android devices to setup. 453 454 Returns: 455 List of Android devices with new attribtues. 456 """ 457 droid_list = [] 458 for a in android_devices: 459 d, e = a.droid, a.ed 460 model = d.getBuildModel() 461 max_advertisements = 1 462 batch_scan_supported = True 463 if model in advertisements_to_devices.keys(): 464 max_advertisements = advertisements_to_devices[model] 465 else: 466 max_advertisements = determine_max_advertisements(a) 467 max_tries = 3 468 # Retry to calculate max advertisements 469 while max_advertisements == -1 and max_tries > 0: 470 a.log.info( 471 "Attempts left to determine max advertisements: {}".format( 472 max_tries)) 473 max_advertisements = determine_max_advertisements(a) 474 max_tries -= 1 475 advertisements_to_devices[model] = max_advertisements 476 if model in batch_scan_not_supported_list: 477 batch_scan_supported = False 478 role = { 479 'droid': d, 480 'ed': e, 481 'max_advertisements': max_advertisements, 482 'batch_scan_supported': batch_scan_supported 483 } 484 droid_list.append(role) 485 return droid_list 486 487 488def generate_id_by_size( 489 size, 490 chars=( 491 string.ascii_lowercase + string.ascii_uppercase + string.digits)): 492 """Generate random ascii characters of input size and input char types 493 494 Args: 495 size: Input size of string. 496 chars: (Optional) Chars to use in generating a random string. 497 498 Returns: 499 String of random input chars at the input size. 500 """ 501 return ''.join(random.choice(chars) for _ in range(size)) 502 503 504def cleanup_scanners_and_advertisers(scn_android_device, scn_callback_list, 505 adv_android_device, adv_callback_list): 506 """Try to gracefully stop all scanning and advertising instances. 507 508 Args: 509 scn_android_device: The Android device that is actively scanning. 510 scn_callback_list: The scan callback id list that needs to be stopped. 511 adv_android_device: The Android device that is actively advertising. 512 adv_callback_list: The advertise callback id list that needs to be 513 stopped. 514 """ 515 scan_droid, scan_ed = scn_android_device.droid, scn_android_device.ed 516 adv_droid = adv_android_device.droid 517 try: 518 for scan_callback in scn_callback_list: 519 scan_droid.bleStopBleScan(scan_callback) 520 except Exception as err: 521 scn_android_device.log.debug( 522 "Failed to stop LE scan... reseting Bluetooth. Error {}".format( 523 err)) 524 reset_bluetooth([scn_android_device]) 525 try: 526 for adv_callback in adv_callback_list: 527 adv_droid.bleStopBleAdvertising(adv_callback) 528 except Exception as err: 529 adv_android_device.log.debug( 530 "Failed to stop LE advertisement... reseting Bluetooth. Error {}". 531 format(err)) 532 reset_bluetooth([adv_android_device]) 533 534 535def get_mac_address_of_generic_advertisement(scan_ad, adv_ad): 536 """Start generic advertisement and get it's mac address by LE scanning. 537 538 Args: 539 scan_ad: The Android device to use as the scanner. 540 adv_ad: The Android device to use as the advertiser. 541 542 Returns: 543 mac_address: The mac address of the advertisement. 544 advertise_callback: The advertise callback id of the active 545 advertisement. 546 """ 547 adv_ad.droid.bleSetAdvertiseDataIncludeDeviceName(True) 548 adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode( 549 ble_advertise_settings_modes['low_latency']) 550 adv_ad.droid.bleSetAdvertiseSettingsIsConnectable(True) 551 adv_ad.droid.bleSetAdvertiseSettingsTxPowerLevel( 552 ble_advertise_settings_tx_powers['high']) 553 advertise_callback, advertise_data, advertise_settings = ( 554 generate_ble_advertise_objects(adv_ad.droid)) 555 adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data, 556 advertise_settings) 557 try: 558 adv_ad.ed.pop_event( 559 adv_succ.format(advertise_callback), bt_default_timeout) 560 except Empty as err: 561 raise BtTestUtilsError( 562 "Advertiser did not start successfully {}".format(err)) 563 filter_list = scan_ad.droid.bleGenFilterList() 564 scan_settings = scan_ad.droid.bleBuildScanSetting() 565 scan_callback = scan_ad.droid.bleGenScanCallback() 566 scan_ad.droid.bleSetScanFilterDeviceName( 567 adv_ad.droid.bluetoothGetLocalName()) 568 scan_ad.droid.bleBuildScanFilter(filter_list) 569 scan_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) 570 try: 571 event = scan_ad.ed.pop_event( 572 "BleScan{}onScanResults".format(scan_callback), bt_default_timeout) 573 except Empty as err: 574 raise BtTestUtilsError( 575 "Scanner did not find advertisement {}".format(err)) 576 mac_address = event['data']['Result']['deviceInfo']['address'] 577 return mac_address, advertise_callback, scan_callback 578 579def enable_bluetooth(droid, ed): 580 if droid.bluetoothCheckState() is True: 581 return True 582 583 droid.bluetoothToggleState(True) 584 expected_bluetooth_on_event_name = bluetooth_on 585 try: 586 ed.pop_event(expected_bluetooth_on_event_name, bt_default_timeout) 587 except Exception: 588 log.info("Failed to toggle Bluetooth on (no broadcast received)") 589 if droid.bluetoothCheckState() is True: 590 log.info(".. actual state is ON") 591 return True 592 log.info(".. actual state is OFF") 593 return False 594 595 return True 596 597def disable_bluetooth(droid): 598 """Disable Bluetooth on input Droid object. 599 600 Args: 601 droid: The droid object to disable Bluetooth on. 602 603 Returns: 604 True if successful, false if unsuccessful. 605 """ 606 if droid.bluetoothCheckState() is True: 607 droid.bluetoothToggleState(False) 608 if droid.bluetoothCheckState() is True: 609 log.error("Failed to toggle Bluetooth off.") 610 return False 611 return True 612 613 614def set_bt_scan_mode(ad, scan_mode_value): 615 """Set Android device's Bluetooth scan mode. 616 617 Args: 618 ad: The Android device to set the scan mode on. 619 scan_mode_value: The value to set the scan mode to. 620 621 Returns: 622 True if successful, false if unsuccessful. 623 """ 624 droid, ed = ad.droid, ad.ed 625 if scan_mode_value == bt_scan_mode_types['state_off']: 626 disable_bluetooth(droid) 627 scan_mode = droid.bluetoothGetScanMode() 628 reset_bluetooth([ad]) 629 if scan_mode != scan_mode_value: 630 return False 631 elif scan_mode_value == bt_scan_mode_types['none']: 632 droid.bluetoothMakeUndiscoverable() 633 scan_mode = droid.bluetoothGetScanMode() 634 if scan_mode != scan_mode_value: 635 return False 636 elif scan_mode_value == bt_scan_mode_types['connectable']: 637 droid.bluetoothMakeUndiscoverable() 638 droid.bluetoothMakeConnectable() 639 scan_mode = droid.bluetoothGetScanMode() 640 if scan_mode != scan_mode_value: 641 return False 642 elif (scan_mode_value == bt_scan_mode_types['connectable_discoverable']): 643 droid.bluetoothMakeDiscoverable() 644 scan_mode = droid.bluetoothGetScanMode() 645 if scan_mode != scan_mode_value: 646 return False 647 else: 648 # invalid scan mode 649 return False 650 return True 651 652 653def set_device_name(droid, name): 654 """Set and check Bluetooth local name on input droid object. 655 656 Args: 657 droid: Droid object to set local name on. 658 name: the Bluetooth local name to set. 659 660 Returns: 661 True if successful, false if unsuccessful. 662 """ 663 droid.bluetoothSetLocalName(name) 664 time.sleep(2) 665 droid_name = droid.bluetoothGetLocalName() 666 if droid_name != name: 667 return False 668 return True 669 670 671def check_device_supported_profiles(droid): 672 """Checks for Android device supported profiles. 673 674 Args: 675 droid: The droid object to query. 676 677 Returns: 678 A dictionary of supported profiles. 679 """ 680 profile_dict = {} 681 profile_dict['hid'] = droid.bluetoothHidIsReady() 682 profile_dict['hsp'] = droid.bluetoothHspIsReady() 683 profile_dict['a2dp'] = droid.bluetoothA2dpIsReady() 684 profile_dict['avrcp'] = droid.bluetoothAvrcpIsReady() 685 profile_dict['a2dp_sink'] = droid.bluetoothA2dpSinkIsReady() 686 profile_dict['hfp_client'] = droid.bluetoothHfpClientIsReady() 687 profile_dict['pbap_client'] = droid.bluetoothPbapClientIsReady() 688 return profile_dict 689 690 691def log_energy_info(android_devices, state): 692 """Logs energy info of input Android devices. 693 694 Args: 695 android_devices: input Android device list to log energy info from. 696 state: the input state to log. Usually 'Start' or 'Stop' for logging. 697 698 Returns: 699 A logging string of the Bluetooth energy info reported. 700 """ 701 return_string = "{} Energy info collection:\n".format(state) 702 # Bug: b/31966929 703 return return_string 704 705 706def set_profile_priority(host_ad, client_ad, profiles, priority): 707 """Sets the priority of said profile(s) on host_ad for client_ad""" 708 for profile in profiles: 709 host_ad.log.info("Profile {} on {} for {} set to priority {}".format( 710 profile, host_ad.droid.bluetoothGetLocalName(), 711 client_ad.droid.bluetoothGetLocalAddress(), priority.value)) 712 if bt_profile_constants['a2dp_sink'] == profile: 713 host_ad.droid.bluetoothA2dpSinkSetPriority( 714 client_ad.droid.bluetoothGetLocalAddress(), priority.value) 715 elif bt_profile_constants['headset_client'] == profile: 716 host_ad.droid.bluetoothHfpClientSetPriority( 717 client_ad.droid.bluetoothGetLocalAddress(), priority.value) 718 elif bt_profile_constants['pbap_client'] == profile: 719 host_ad.droid.bluetoothPbapClientSetPriority( 720 client_ad.droid.bluetoothGetLocalAddress(), priority.value) 721 else: 722 host_ad.log.error( 723 "Profile {} not yet supported for priority settings".format( 724 profile)) 725 726 727def pair_pri_to_sec(pri_ad, sec_ad, attempts=2, auto_confirm=True): 728 """Pairs pri droid to secondary droid. 729 730 Args: 731 pri_ad: Android device initiating connection 732 sec_ad: Android device accepting connection 733 attempts: Number of attempts to try until failure. 734 auto_confirm: Auto confirm passkey match for both devices 735 736 Returns: 737 Pass if True 738 Fail if False 739 """ 740 pri_ad.droid.bluetoothStartConnectionStateChangeMonitor( 741 sec_ad.droid.bluetoothGetLocalAddress()) 742 curr_attempts = 0 743 while curr_attempts < attempts: 744 if _pair_pri_to_sec(pri_ad, sec_ad, auto_confirm): 745 return True 746 # Wait 2 seconds before unbound 747 time.sleep(2) 748 if not clear_bonded_devices(pri_ad): 749 log.error("Failed to clear bond for primary device at attempt {}" 750 .format(str(curr_attempts))) 751 return False 752 if not clear_bonded_devices(sec_ad): 753 log.error("Failed to clear bond for secondary device at attempt {}" 754 .format(str(curr_attempts))) 755 return False 756 # Wait 2 seconds after unbound 757 time.sleep(2) 758 curr_attempts += 1 759 log.error("pair_pri_to_sec failed to connect after {} attempts".format( 760 str(attempts))) 761 return False 762 763 764def _wait_for_passkey_match(pri_ad, sec_ad): 765 pri_pin, sec_pin = -1, 1 766 pri_variant, sec_variant = -1, 1 767 pri_pairing_req, sec_pairing_req = None, None 768 try: 769 pri_pairing_req = pri_ad.ed.pop_event( 770 event_name="BluetoothActionPairingRequest", 771 timeout=bt_default_timeout) 772 pri_variant = pri_pairing_req["data"]["PairingVariant"] 773 pri_pin = pri_pairing_req["data"]["Pin"] 774 pri_ad.log.info("Primary device received Pin: {}, Variant: {}".format( 775 pri_pin, pri_variant)) 776 sec_pairing_req = sec_ad.ed.pop_event( 777 event_name="BluetoothActionPairingRequest", 778 timeout=bt_default_timeout) 779 sec_variant = sec_pairing_req["data"]["PairingVariant"] 780 sec_pin = sec_pairing_req["data"]["Pin"] 781 sec_ad.log.info("Secondary device received Pin: {}, Variant: {}" 782 .format(sec_pin, sec_variant)) 783 except Empty as err: 784 log.error("Wait for pin error: {}".format(err)) 785 log.error("Pairing request state, Primary: {}, Secondary: {}".format( 786 pri_pairing_req, sec_pairing_req)) 787 return False 788 if pri_variant == sec_variant == pairing_variant_passkey_confirmation: 789 confirmation = pri_pin == sec_pin 790 if confirmation: 791 log.info("Pairing code matched, accepting connection") 792 else: 793 log.info("Pairing code mismatched, rejecting connection") 794 pri_ad.droid.eventPost("BluetoothActionPairingRequestUserConfirm", 795 str(confirmation)) 796 sec_ad.droid.eventPost("BluetoothActionPairingRequestUserConfirm", 797 str(confirmation)) 798 if not confirmation: 799 return False 800 elif pri_variant != sec_variant: 801 log.error("Pairing variant mismatched, abort connection") 802 return False 803 return True 804 805 806def _pair_pri_to_sec(pri_ad, sec_ad, auto_confirm): 807 # Enable discovery on sec_ad so that pri_ad can find it. 808 # The timeout here is based on how much time it would take for two devices 809 # to pair with each other once pri_ad starts seeing devices. 810 pri_droid = pri_ad.droid 811 sec_droid = sec_ad.droid 812 pri_ad.ed.clear_all_events() 813 sec_ad.ed.clear_all_events() 814 log.info("Bonding device {} to {}".format( 815 pri_droid.bluetoothGetLocalAddress(), 816 sec_droid.bluetoothGetLocalAddress())) 817 sec_droid.bluetoothMakeDiscoverable(bt_default_timeout) 818 target_address = sec_droid.bluetoothGetLocalAddress() 819 log.debug("Starting paring helper on each device") 820 pri_droid.bluetoothStartPairingHelper(auto_confirm) 821 sec_droid.bluetoothStartPairingHelper(auto_confirm) 822 pri_ad.log.info("Primary device starting discovery and executing bond") 823 result = pri_droid.bluetoothDiscoverAndBond(target_address) 824 if not auto_confirm: 825 if not _wait_for_passkey_match(pri_ad, sec_ad): 826 return False 827 # Loop until we have bonded successfully or timeout. 828 end_time = time.time() + bt_default_timeout 829 pri_ad.log.info("Verifying devices are bonded") 830 while time.time() < end_time: 831 bonded_devices = pri_droid.bluetoothGetBondedDevices() 832 bonded = False 833 for d in bonded_devices: 834 if d['address'] == target_address: 835 pri_ad.log.info("Successfully bonded to device") 836 return True 837 time.sleep(0.1) 838 # Timed out trying to bond. 839 pri_ad.log.info("Failed to bond devices.") 840 return False 841 842 843def connect_pri_to_sec(pri_ad, sec_ad, profiles_set, attempts=2): 844 """Connects pri droid to secondary droid. 845 846 Args: 847 pri_ad: AndroidDroid initiating connection 848 sec_ad: AndroidDroid accepting connection 849 profiles_set: Set of profiles to be connected 850 attempts: Number of attempts to try until failure. 851 852 Returns: 853 Pass if True 854 Fail if False 855 """ 856 device_addr = sec_ad.droid.bluetoothGetLocalAddress() 857 # Allows extra time for the SDP records to be updated. 858 time.sleep(2) 859 curr_attempts = 0 860 while curr_attempts < attempts: 861 log.info("connect_pri_to_sec curr attempt {} total {}".format( 862 curr_attempts, attempts)) 863 if _connect_pri_to_sec(pri_ad, sec_ad, profiles_set): 864 return True 865 curr_attempts += 1 866 log.error("connect_pri_to_sec failed to connect after {} attempts".format( 867 attempts)) 868 return False 869 870 871def _connect_pri_to_sec(pri_ad, sec_ad, profiles_set): 872 """Connects pri droid to secondary droid. 873 874 Args: 875 pri_ad: AndroidDroid initiating connection. 876 sec_ad: AndroidDroid accepting connection. 877 profiles_set: Set of profiles to be connected. 878 879 Returns: 880 True of connection is successful, false if unsuccessful. 881 """ 882 # Check if we support all profiles. 883 supported_profiles = bt_profile_constants.values() 884 for profile in profiles_set: 885 if profile not in supported_profiles: 886 pri_ad.log.info("Profile {} is not supported list {}".format( 887 profile, supported_profiles)) 888 return False 889 890 # First check that devices are bonded. 891 paired = False 892 for paired_device in pri_ad.droid.bluetoothGetBondedDevices(): 893 if paired_device['address'] == \ 894 sec_ad.droid.bluetoothGetLocalAddress(): 895 paired = True 896 break 897 898 if not paired: 899 pri_ad.log.error("Not paired to {}".format(sec_ad.serial)) 900 return False 901 902 # Now try to connect them, the following call will try to initiate all 903 # connections. 904 pri_ad.droid.bluetoothConnectBonded( 905 sec_ad.droid.bluetoothGetLocalAddress()) 906 907 end_time = time.time() + 10 908 profile_connected = set() 909 sec_addr = sec_ad.droid.bluetoothGetLocalAddress() 910 pri_ad.log.info("Profiles to be connected {}".format(profiles_set)) 911 # First use APIs to check profile connection state 912 while (time.time() < end_time 913 and not profile_connected.issuperset(profiles_set)): 914 if (bt_profile_constants['headset_client'] not in profile_connected 915 and bt_profile_constants['headset_client'] in profiles_set): 916 if is_hfp_client_device_connected(pri_ad, sec_addr): 917 profile_connected.add(bt_profile_constants['headset_client']) 918 if (bt_profile_constants['a2dp'] not in profile_connected 919 and bt_profile_constants['a2dp'] in profiles_set): 920 if is_a2dp_src_device_connected(pri_ad, sec_addr): 921 profile_connected.add(bt_profile_constants['a2dp']) 922 if (bt_profile_constants['a2dp_sink'] not in profile_connected 923 and bt_profile_constants['a2dp_sink'] in profiles_set): 924 if is_a2dp_snk_device_connected(pri_ad, sec_addr): 925 profile_connected.add(bt_profile_constants['a2dp_sink']) 926 if (bt_profile_constants['map_mce'] not in profile_connected 927 and bt_profile_constants['map_mce'] in profiles_set): 928 if is_map_mce_device_connected(pri_ad, sec_addr): 929 profile_connected.add(bt_profile_constants['map_mce']) 930 if (bt_profile_constants['map'] not in profile_connected 931 and bt_profile_constants['map'] in profiles_set): 932 if is_map_mse_device_connected(pri_ad, sec_addr): 933 profile_connected.add(bt_profile_constants['map']) 934 time.sleep(0.1) 935 # If APIs fail, try to find the connection broadcast receiver. 936 while not profile_connected.issuperset(profiles_set): 937 try: 938 profile_event = pri_ad.ed.pop_event( 939 bluetooth_profile_connection_state_changed, 940 bt_default_timeout + 10) 941 pri_ad.log.info("Got event {}".format(profile_event)) 942 except Exception: 943 pri_ad.log.error("Did not get {} profiles left {}".format( 944 bluetooth_profile_connection_state_changed, profile_connected)) 945 return False 946 947 profile = profile_event['data']['profile'] 948 state = profile_event['data']['state'] 949 device_addr = profile_event['data']['addr'] 950 951 if state == bt_profile_states['connected'] and \ 952 device_addr == sec_ad.droid.bluetoothGetLocalAddress(): 953 profile_connected.add(profile) 954 pri_ad.log.info( 955 "Profiles connected until now {}".format(profile_connected)) 956 # Failure happens inside the while loop. If we came here then we already 957 # connected. 958 return True 959 960 961def disconnect_pri_from_sec(pri_ad, sec_ad, profiles_list): 962 """ 963 Disconnect primary from secondary on a specific set of profiles 964 Args: 965 pri_ad - Primary android_device initiating disconnection 966 sec_ad - Secondary android droid (sl4a interface to keep the 967 method signature the same connect_pri_to_sec above) 968 profiles_list - List of profiles we want to disconnect from 969 970 Returns: 971 True on Success 972 False on Failure 973 """ 974 # Sanity check to see if all the profiles in the given set is supported 975 supported_profiles = bt_profile_constants.values() 976 for profile in profiles_list: 977 if profile not in supported_profiles: 978 pri_ad.log.info("Profile {} is not in supported list {}".format( 979 profile, supported_profiles)) 980 return False 981 982 pri_ad.log.info(pri_ad.droid.bluetoothGetBondedDevices()) 983 # Disconnecting on a already disconnected profile is a nop, 984 # so not checking for the connection state 985 try: 986 pri_ad.droid.bluetoothDisconnectConnectedProfile( 987 sec_ad.droid.bluetoothGetLocalAddress(), profiles_list) 988 except Exception as err: 989 pri_ad.log.error( 990 "Exception while trying to disconnect profile(s) {}: {}".format( 991 profiles_list, err)) 992 return False 993 994 profile_disconnected = set() 995 pri_ad.log.info("Disconnecting from profiles: {}".format(profiles_list)) 996 997 while not profile_disconnected.issuperset(profiles_list): 998 try: 999 profile_event = pri_ad.ed.pop_event( 1000 bluetooth_profile_connection_state_changed, bt_default_timeout) 1001 pri_ad.log.info("Got event {}".format(profile_event)) 1002 except Exception as e: 1003 pri_ad.log.error("Did not disconnect from Profiles. Reason {}".format(e)) 1004 return False 1005 1006 profile = profile_event['data']['profile'] 1007 state = profile_event['data']['state'] 1008 device_addr = profile_event['data']['addr'] 1009 1010 if state == bt_profile_states['disconnected'] and \ 1011 device_addr == sec_ad.droid.bluetoothGetLocalAddress(): 1012 profile_disconnected.add(profile) 1013 pri_ad.log.info( 1014 "Profiles disconnected so far {}".format(profile_disconnected)) 1015 1016 return True 1017 1018 1019def take_btsnoop_logs(android_devices, testcase, testname): 1020 """Pull btsnoop logs from an input list of android devices. 1021 1022 Args: 1023 android_devices: the list of Android devices to pull btsnoop logs from. 1024 testcase: Name of the test calss that triggered this snoop log. 1025 testname: Name of the test case that triggered this bug report. 1026 """ 1027 for a in android_devices: 1028 take_btsnoop_log(a, testcase, testname) 1029 1030 1031def take_btsnoop_log(ad, testcase, testname): 1032 """Grabs the btsnoop_hci log on a device and stores it in the log directory 1033 of the test class. 1034 1035 If you want grab the btsnoop_hci log, call this function with android_device 1036 objects in on_fail. Bug report takes a relative long time to take, so use 1037 this cautiously. 1038 1039 Args: 1040 ad: The android_device instance to take bugreport on. 1041 testcase: Name of the test calss that triggered this snoop log. 1042 testname: Name of the test case that triggered this bug report. 1043 """ 1044 testname = "".join(x for x in testname if x.isalnum()) 1045 serial = ad.serial 1046 device_model = ad.droid.getBuildModel() 1047 device_model = device_model.replace(" ", "") 1048 out_name = ','.join((testname, device_model, serial)) 1049 snoop_path = ad.log_path + "/BluetoothSnoopLogs" 1050 utils.create_dir(snoop_path) 1051 cmd = ''.join(("adb -s ", serial, " pull ", btsnoop_log_path_on_device, 1052 " ", snoop_path + '/' + out_name, ".btsnoop_hci.log")) 1053 exe_cmd(cmd) 1054 try: 1055 cmd = ''.join( 1056 ("adb -s ", serial, " pull ", btsnoop_last_log_path_on_device, " ", 1057 snoop_path + '/' + out_name, ".btsnoop_hci.log.last")) 1058 exe_cmd(cmd) 1059 except Exception as err: 1060 testcase.log.info( 1061 "File does not exist {}".format(btsnoop_last_log_path_on_device)) 1062 1063 1064def kill_bluetooth_process(ad): 1065 """Kill Bluetooth process on Android device. 1066 1067 Args: 1068 ad: Android device to kill BT process on. 1069 """ 1070 ad.log.info("Killing Bluetooth process.") 1071 pid = ad.adb.shell( 1072 "ps | grep com.android.bluetooth | awk '{print $2}'").decode('ascii') 1073 call(["adb -s " + ad.serial + " shell kill " + pid], shell=True) 1074 1075 1076def orchestrate_rfcomm_connection(client_ad, 1077 server_ad, 1078 accept_timeout_ms=default_rfcomm_timeout_ms, 1079 uuid=None): 1080 """Sets up the RFCOMM connection between two Android devices. 1081 1082 Args: 1083 client_ad: the Android device performing the connection. 1084 server_ad: the Android device accepting the connection. 1085 Returns: 1086 True if connection was successful, false if unsuccessful. 1087 """ 1088 result = orchestrate_bluetooth_socket_connection( 1089 client_ad, server_ad, accept_timeout_ms, 1090 (bt_rfcomm_uuids['default_uuid'] if uuid is None else uuid)) 1091 1092 return result 1093 1094 1095def orchestrate_bluetooth_socket_connection( 1096 client_ad, 1097 server_ad, 1098 accept_timeout_ms=default_bluetooth_socket_timeout_ms, 1099 uuid=None): 1100 """Sets up the Bluetooth Socket connection between two Android devices. 1101 1102 Args: 1103 client_ad: the Android device performing the connection. 1104 server_ad: the Android device accepting the connection. 1105 Returns: 1106 True if connection was successful, false if unsuccessful. 1107 """ 1108 server_ad.droid.bluetoothStartPairingHelper() 1109 client_ad.droid.bluetoothStartPairingHelper() 1110 1111 server_ad.droid.bluetoothSocketConnBeginAcceptThreadUuid( 1112 (bluetooth_socket_conn_test_uuid 1113 if uuid is None else uuid), accept_timeout_ms) 1114 client_ad.droid.bluetoothSocketConnBeginConnectThreadUuid( 1115 server_ad.droid.bluetoothGetLocalAddress(), 1116 (bluetooth_socket_conn_test_uuid if uuid is None else uuid)) 1117 1118 end_time = time.time() + bt_default_timeout 1119 result = False 1120 test_result = True 1121 while time.time() < end_time: 1122 if len(client_ad.droid.bluetoothSocketConnActiveConnections()) > 0: 1123 test_result = True 1124 client_ad.log.info("Bluetooth socket Client Connection Active") 1125 break 1126 else: 1127 test_result = False 1128 time.sleep(1) 1129 if not test_result: 1130 client_ad.log.error( 1131 "Failed to establish a Bluetooth socket connection") 1132 return False 1133 return True 1134 1135 1136def write_read_verify_data(client_ad, server_ad, msg, binary=False): 1137 """Verify that the client wrote data to the server Android device correctly. 1138 1139 Args: 1140 client_ad: the Android device to perform the write. 1141 server_ad: the Android device to read the data written. 1142 msg: the message to write. 1143 binary: if the msg arg is binary or not. 1144 1145 Returns: 1146 True if the data written matches the data read, false if not. 1147 """ 1148 client_ad.log.info("Write message.") 1149 try: 1150 if binary: 1151 client_ad.droid.bluetoothSocketConnWriteBinary(msg) 1152 else: 1153 client_ad.droid.bluetoothSocketConnWrite(msg) 1154 except Exception as err: 1155 client_ad.log.error("Failed to write data: {}".format(err)) 1156 return False 1157 server_ad.log.info("Read message.") 1158 try: 1159 if binary: 1160 read_msg = server_ad.droid.bluetoothSocketConnReadBinary().rstrip( 1161 "\r\n") 1162 else: 1163 read_msg = server_ad.droid.bluetoothSocketConnRead() 1164 except Exception as err: 1165 server_ad.log.error("Failed to read data: {}".format(err)) 1166 return False 1167 log.info("Verify message.") 1168 if msg != read_msg: 1169 log.error("Mismatch! Read: {}, Expected: {}".format(read_msg, msg)) 1170 return False 1171 return True 1172 1173 1174def clear_bonded_devices(ad): 1175 """Clear bonded devices from the input Android device. 1176 1177 Args: 1178 ad: the Android device performing the connection. 1179 Returns: 1180 True if clearing bonded devices was successful, false if unsuccessful. 1181 """ 1182 bonded_device_list = ad.droid.bluetoothGetBondedDevices() 1183 for device in bonded_device_list: 1184 device_address = device['address'] 1185 if not ad.droid.bluetoothUnbond(device_address): 1186 log.error("Failed to unbond {}".format(device_address)) 1187 return False 1188 log.info("Successfully unbonded {}".format(device_address)) 1189 return True 1190 1191 1192def verify_server_and_client_connected(client_ad, server_ad, log=True): 1193 """Verify that input server and client Android devices are connected. 1194 1195 This code is under the assumption that there will only be 1196 a single connection. 1197 1198 Args: 1199 client_ad: the Android device to check number of active connections. 1200 server_ad: the Android device to check number of active connections. 1201 1202 Returns: 1203 True both server and client have at least 1 active connection, 1204 false if unsuccessful. 1205 """ 1206 test_result = True 1207 if len(server_ad.droid.bluetoothSocketConnActiveConnections()) == 0: 1208 if log: 1209 server_ad.log.error("No socket connections found on server.") 1210 test_result = False 1211 if len(client_ad.droid.bluetoothSocketConnActiveConnections()) == 0: 1212 if log: 1213 client_ad.log.error("No socket connections found on client.") 1214 test_result = False 1215 return test_result 1216 1217 1218def orchestrate_and_verify_pan_connection(pan_dut, panu_dut): 1219 """Setups up a PAN conenction between two android devices. 1220 1221 Args: 1222 pan_dut: the Android device providing tethering services 1223 panu_dut: the Android device using the internet connection from the 1224 pan_dut 1225 Returns: 1226 True if PAN connection and verification is successful, 1227 false if unsuccessful. 1228 """ 1229 if not toggle_airplane_mode_by_adb(log, panu_dut, True): 1230 panu_dut.log.error("Failed to toggle airplane mode on") 1231 return False 1232 if not toggle_airplane_mode_by_adb(log, panu_dut, False): 1233 pan_dut.log.error("Failed to toggle airplane mode off") 1234 return False 1235 pan_dut.droid.bluetoothStartConnectionStateChangeMonitor("") 1236 panu_dut.droid.bluetoothStartConnectionStateChangeMonitor("") 1237 if not bluetooth_enabled_check(panu_dut): 1238 return False 1239 if not bluetooth_enabled_check(pan_dut): 1240 return False 1241 pan_dut.droid.bluetoothPanSetBluetoothTethering(True) 1242 if not (pair_pri_to_sec(pan_dut, panu_dut)): 1243 return False 1244 if not pan_dut.droid.bluetoothPanIsTetheringOn(): 1245 pan_dut.log.error("Failed to enable Bluetooth tethering.") 1246 return False 1247 # Magic sleep needed to give the stack time in between bonding and 1248 # connecting the PAN profile. 1249 time.sleep(pan_connect_timeout) 1250 panu_dut.droid.bluetoothConnectBonded( 1251 pan_dut.droid.bluetoothGetLocalAddress()) 1252 if not verify_http_connection(log, panu_dut): 1253 panu_dut.log.error("Can't verify http connection on PANU device.") 1254 if not verify_http_connection(log, pan_dut): 1255 pan_dut.log.info( 1256 "Can't verify http connection on PAN service device") 1257 return False 1258 return True 1259 1260 1261def is_hfp_client_device_connected(ad, addr): 1262 """Determines if an AndroidDevice has HFP connectivity to input address 1263 1264 Args: 1265 ad: the Android device 1266 addr: the address that's expected 1267 Returns: 1268 True if connection was successful, false if unsuccessful. 1269 """ 1270 devices = ad.droid.bluetoothHfpClientGetConnectedDevices() 1271 ad.log.info("Connected HFP Client devices: {}".format(devices)) 1272 if addr in {d['address'] for d in devices}: 1273 return True 1274 return False 1275 1276 1277def is_a2dp_src_device_connected(ad, addr): 1278 """Determines if an AndroidDevice has A2DP connectivity to input address 1279 1280 Args: 1281 ad: the Android device 1282 addr: the address that's expected 1283 Returns: 1284 True if connection was successful, false if unsuccessful. 1285 """ 1286 devices = ad.droid.bluetoothA2dpGetConnectedDevices() 1287 ad.log.info("Connected A2DP Source devices: {}".format(devices)) 1288 if addr in {d['address'] for d in devices}: 1289 return True 1290 return False 1291 1292 1293def is_a2dp_snk_device_connected(ad, addr): 1294 """Determines if an AndroidDevice has A2DP snk connectivity to input address 1295 1296 Args: 1297 ad: the Android device 1298 addr: the address that's expected 1299 Returns: 1300 True if connection was successful, false if unsuccessful. 1301 """ 1302 devices = ad.droid.bluetoothA2dpSinkGetConnectedDevices() 1303 ad.log.info("Connected A2DP Sink devices: {}".format(devices)) 1304 if addr in {d['address'] for d in devices}: 1305 return True 1306 return False 1307 1308 1309def is_map_mce_device_connected(ad, addr): 1310 """Determines if an AndroidDevice has MAP MCE connectivity to input address 1311 1312 Args: 1313 ad: the Android device 1314 addr: the address that's expected 1315 Returns: 1316 True if connection was successful, false if unsuccessful. 1317 """ 1318 devices = ad.droid.bluetoothMapClientGetConnectedDevices() 1319 ad.log.info("Connected MAP MCE devices: {}".format(devices)) 1320 if addr in {d['address'] for d in devices}: 1321 return True 1322 return False 1323 1324 1325def is_map_mse_device_connected(ad, addr): 1326 """Determines if an AndroidDevice has MAP MSE connectivity to input address 1327 1328 Args: 1329 ad: the Android device 1330 addr: the address that's expected 1331 Returns: 1332 True if connection was successful, false if unsuccessful. 1333 """ 1334 devices = ad.droid.bluetoothMapGetConnectedDevices() 1335 ad.log.info("Connected MAP MSE devices: {}".format(devices)) 1336 if addr in {d['address'] for d in devices}: 1337 return True 1338 return False 1339 1340 1341def hid_keyboard_report(key, modifier="00"): 1342 """Get the HID keyboard report for the given key 1343 1344 Args: 1345 key: the key we want 1346 modifier: HID keyboard modifier bytes 1347 Returns: 1348 The byte array for the HID report. 1349 """ 1350 return str( 1351 bytearray.fromhex(" ".join( 1352 [modifier, "00", key, "00", "00", "00", "00", "00"])), "utf-8") 1353 1354 1355def hid_device_send_key_data_report(host_id, device_ad, key, interval=1): 1356 """Send a HID report simulating a 1-second keyboard press from host_ad to 1357 device_ad 1358 1359 Args: 1360 host_id: the Bluetooth MAC address or name of the HID host 1361 device_ad: HID device 1362 key: the key we want to send 1363 interval: the interval between key press and key release 1364 """ 1365 device_ad.droid.bluetoothHidDeviceSendReport(host_id, hid_id_keyboard, 1366 hid_keyboard_report(key)) 1367 time.sleep(interval) 1368 device_ad.droid.bluetoothHidDeviceSendReport(host_id, hid_id_keyboard, 1369 hid_keyboard_report("00")) 1370 1371 1372def is_a2dp_connected(sink, source): 1373 """ 1374 Convenience Function to see if the 2 devices are connected on 1375 A2dp. 1376 Args: 1377 sink: Audio Sink 1378 source: Audio Source 1379 Returns: 1380 True if Connected 1381 False if Not connected 1382 """ 1383 1384 devices = sink.droid.bluetoothA2dpSinkGetConnectedDevices() 1385 for device in devices: 1386 sink.log.info("A2dp Connected device {}".format(device["name"])) 1387 if (device["address"] == source.droid.bluetoothGetLocalAddress()): 1388 return True 1389 return False 1390