1#!/usr/bin/env python3 2# 3# Copyright (C) 2016 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16 17import logging 18import os 19import random 20import re 21import string 22import threading 23import time 24from queue import Empty 25from subprocess import call 26 27from acts import asserts 28from acts.test_utils.bt.bt_constants import adv_fail 29from acts.test_utils.bt.bt_constants import adv_succ 30from acts.test_utils.bt.bt_constants import batch_scan_not_supported_list 31from acts.test_utils.bt.bt_constants import bits_per_samples 32from acts.test_utils.bt.bt_constants import ble_advertise_settings_modes 33from acts.test_utils.bt.bt_constants import ble_advertise_settings_tx_powers 34from acts.test_utils.bt.bt_constants import bluetooth_a2dp_codec_config_changed 35from acts.test_utils.bt.bt_constants import bluetooth_off 36from acts.test_utils.bt.bt_constants import bluetooth_on 37from acts.test_utils.bt.bt_constants import \ 38 bluetooth_profile_connection_state_changed 39from acts.test_utils.bt.bt_constants import bluetooth_socket_conn_test_uuid 40from acts.test_utils.bt.bt_constants import bt_default_timeout 41from acts.test_utils.bt.bt_constants import bt_profile_constants 42from acts.test_utils.bt.bt_constants import bt_profile_states 43from acts.test_utils.bt.bt_constants import bt_rfcomm_uuids 44from acts.test_utils.bt.bt_constants import bt_scan_mode_types 45from acts.test_utils.bt.bt_constants import btsnoop_last_log_path_on_device 46from acts.test_utils.bt.bt_constants import btsnoop_log_path_on_device 47from acts.test_utils.bt.bt_constants import channel_modes 48from acts.test_utils.bt.bt_constants import codec_types 49from acts.test_utils.bt.bt_constants import default_bluetooth_socket_timeout_ms 50from acts.test_utils.bt.bt_constants import default_rfcomm_timeout_ms 51from acts.test_utils.bt.bt_constants import hid_id_keyboard 52from acts.test_utils.bt.bt_constants import pairing_variant_passkey_confirmation 53from acts.test_utils.bt.bt_constants import pan_connect_timeout 54from acts.test_utils.bt.bt_constants import sample_rates 55from acts.test_utils.bt.bt_constants import scan_result 56from acts.test_utils.bt.bt_constants import sig_uuid_constants 57from acts.test_utils.bt.bt_constants import small_timeout 58from acts.test_utils.tel.tel_test_utils import toggle_airplane_mode_by_adb 59from acts.test_utils.tel.tel_test_utils import verify_http_connection 60from acts.utils import exe_cmd 61 62from acts import utils 63 64log = logging 65 66advertisements_to_devices = {} 67 68 69class BtTestUtilsError(Exception): 70 pass 71 72 73def _add_android_device_to_dictionary(android_device, profile_list, 74 selector_dict): 75 """Adds the AndroidDevice and supported features to the selector dictionary 76 77 Args: 78 android_device: The Android device. 79 profile_list: The list of profiles the Android device supports. 80 """ 81 for profile in profile_list: 82 if profile in selector_dict and android_device not in selector_dict[ 83 profile]: 84 selector_dict[profile].append(android_device) 85 else: 86 selector_dict[profile] = [android_device] 87 88 89def bluetooth_enabled_check(ad, timeout_sec=5): 90 """Checks if the Bluetooth state is enabled, if not it will attempt to 91 enable it. 92 93 Args: 94 ad: The Android device list to enable Bluetooth on. 95 timeout_sec: number of seconds to wait for toggle to take effect. 96 97 Returns: 98 True if successful, false if unsuccessful. 99 """ 100 if not ad.droid.bluetoothCheckState(): 101 ad.droid.bluetoothToggleState(True) 102 expected_bluetooth_on_event_name = bluetooth_on 103 try: 104 ad.ed.pop_event(expected_bluetooth_on_event_name, 105 bt_default_timeout) 106 except Empty: 107 ad.log.info( 108 "Failed to toggle Bluetooth on(no broadcast received).") 109 # Try one more time to poke at the actual state. 110 if ad.droid.bluetoothCheckState(): 111 ad.log.info(".. actual state is ON") 112 return True 113 ad.log.error(".. actual state is OFF") 114 return False 115 end_time = time.time() + timeout_sec 116 while not ad.droid.bluetoothCheckState() and time.time() < end_time: 117 time.sleep(1) 118 return ad.droid.bluetoothCheckState() 119 120 121def check_device_supported_profiles(droid): 122 """Checks for Android device supported profiles. 123 124 Args: 125 droid: The droid object to query. 126 127 Returns: 128 A dictionary of supported profiles. 129 """ 130 profile_dict = {} 131 profile_dict['hid'] = droid.bluetoothHidIsReady() 132 profile_dict['hsp'] = droid.bluetoothHspIsReady() 133 profile_dict['a2dp'] = droid.bluetoothA2dpIsReady() 134 profile_dict['avrcp'] = droid.bluetoothAvrcpIsReady() 135 profile_dict['a2dp_sink'] = droid.bluetoothA2dpSinkIsReady() 136 profile_dict['hfp_client'] = droid.bluetoothHfpClientIsReady() 137 profile_dict['pbap_client'] = droid.bluetoothPbapClientIsReady() 138 return profile_dict 139 140 141def cleanup_scanners_and_advertisers(scn_android_device, scn_callback_list, 142 adv_android_device, adv_callback_list): 143 """Try to gracefully stop all scanning and advertising instances. 144 145 Args: 146 scn_android_device: The Android device that is actively scanning. 147 scn_callback_list: The scan callback id list that needs to be stopped. 148 adv_android_device: The Android device that is actively advertising. 149 adv_callback_list: The advertise callback id list that needs to be 150 stopped. 151 """ 152 scan_droid, scan_ed = scn_android_device.droid, scn_android_device.ed 153 adv_droid = adv_android_device.droid 154 try: 155 for scan_callback in scn_callback_list: 156 scan_droid.bleStopBleScan(scan_callback) 157 except Exception as err: 158 scn_android_device.log.debug( 159 "Failed to stop LE scan... reseting Bluetooth. Error {}".format( 160 err)) 161 reset_bluetooth([scn_android_device]) 162 try: 163 for adv_callback in adv_callback_list: 164 adv_droid.bleStopBleAdvertising(adv_callback) 165 except Exception as err: 166 adv_android_device.log.debug( 167 "Failed to stop LE advertisement... reseting Bluetooth. Error {}". 168 format(err)) 169 reset_bluetooth([adv_android_device]) 170 171 172def clear_bonded_devices(ad): 173 """Clear bonded devices from the input Android device. 174 175 Args: 176 ad: the Android device performing the connection. 177 Returns: 178 True if clearing bonded devices was successful, false if unsuccessful. 179 """ 180 bonded_device_list = ad.droid.bluetoothGetBondedDevices() 181 while bonded_device_list: 182 device_address = bonded_device_list[0]['address'] 183 if not ad.droid.bluetoothUnbond(device_address): 184 log.error("Failed to unbond {} from {}".format( 185 device_address, ad.serial)) 186 return False 187 log.info("Successfully unbonded {} from {}".format( 188 device_address, ad.serial)) 189 #TODO: wait for BOND_STATE_CHANGED intent instead of waiting 190 time.sleep(1) 191 192 # If device was first connected using LE transport, after bonding it is 193 # accessible through it's LE address, and through it classic address. 194 # Unbonding it will unbond two devices representing different 195 # "addresses". Attempt to unbond such already unbonded devices will 196 # result in bluetoothUnbond returning false. 197 bonded_device_list = ad.droid.bluetoothGetBondedDevices() 198 return True 199 200 201def connect_phone_to_headset(android, 202 headset, 203 timeout=bt_default_timeout, 204 connection_check_period=10): 205 """Connects android phone to bluetooth headset. 206 Headset object must have methods power_on and enter_pairing_mode, 207 and attribute mac_address. 208 209 Args: 210 android: AndroidDevice object with SL4A installed. 211 headset: Object with attribute mac_address and methods power_on and 212 enter_pairing_mode. 213 timeout: Seconds to wait for devices to connect. 214 connection_check_period: how often to check for connection once the 215 SL4A connect RPC has been sent. 216 Returns: 217 connected (bool): True if devices are paired and connected by end of 218 method. False otherwise. 219 """ 220 headset_mac_address = headset.mac_address 221 connected = is_a2dp_src_device_connected(android, headset_mac_address) 222 log.info('Devices connected before pair attempt: %s' % connected) 223 if not connected: 224 # Turn on headset and initiate pairing mode. 225 headset.enter_pairing_mode() 226 android.droid.bluetoothStartPairingHelper() 227 start_time = time.time() 228 # If already connected, skip pair and connect attempt. 229 while not connected and (time.time() - start_time < timeout): 230 bonded_info = android.droid.bluetoothGetBondedDevices() 231 if headset.mac_address not in [ 232 info["address"] for info in bonded_info 233 ]: 234 # Use SL4A to pair and connect with headset. 235 headset.enter_pairing_mode() 236 android.droid.bluetoothDiscoverAndBond(headset_mac_address) 237 else: # Device is bonded but not connected 238 android.droid.bluetoothConnectBonded(headset_mac_address) 239 log.info('Waiting for connection...') 240 time.sleep(connection_check_period) 241 # Check for connection. 242 connected = is_a2dp_src_device_connected(android, headset_mac_address) 243 log.info('Devices connected after pair attempt: %s' % connected) 244 return connected 245 246 247def connect_pri_to_sec(pri_ad, sec_ad, profiles_set, attempts=2): 248 """Connects pri droid to secondary droid. 249 250 Args: 251 pri_ad: AndroidDroid initiating connection 252 sec_ad: AndroidDroid accepting connection 253 profiles_set: Set of profiles to be connected 254 attempts: Number of attempts to try until failure. 255 256 Returns: 257 Pass if True 258 Fail if False 259 """ 260 device_addr = sec_ad.droid.bluetoothGetLocalAddress() 261 # Allows extra time for the SDP records to be updated. 262 time.sleep(2) 263 curr_attempts = 0 264 while curr_attempts < attempts: 265 log.info("connect_pri_to_sec curr attempt {} total {}".format( 266 curr_attempts, attempts)) 267 if _connect_pri_to_sec(pri_ad, sec_ad, profiles_set): 268 return True 269 curr_attempts += 1 270 log.error("connect_pri_to_sec failed to connect after {} attempts".format( 271 attempts)) 272 return False 273 274 275def _connect_pri_to_sec(pri_ad, sec_ad, profiles_set): 276 """Connects pri droid to secondary droid. 277 278 Args: 279 pri_ad: AndroidDroid initiating connection. 280 sec_ad: AndroidDroid accepting connection. 281 profiles_set: Set of profiles to be connected. 282 283 Returns: 284 True of connection is successful, false if unsuccessful. 285 """ 286 # Check if we support all profiles. 287 supported_profiles = bt_profile_constants.values() 288 for profile in profiles_set: 289 if profile not in supported_profiles: 290 pri_ad.log.info("Profile {} is not supported list {}".format( 291 profile, supported_profiles)) 292 return False 293 294 # First check that devices are bonded. 295 paired = False 296 for paired_device in pri_ad.droid.bluetoothGetBondedDevices(): 297 if paired_device['address'] == \ 298 sec_ad.droid.bluetoothGetLocalAddress(): 299 paired = True 300 break 301 302 if not paired: 303 pri_ad.log.error("Not paired to {}".format(sec_ad.serial)) 304 return False 305 306 # Now try to connect them, the following call will try to initiate all 307 # connections. 308 pri_ad.droid.bluetoothConnectBonded( 309 sec_ad.droid.bluetoothGetLocalAddress()) 310 311 end_time = time.time() + 10 312 profile_connected = set() 313 sec_addr = sec_ad.droid.bluetoothGetLocalAddress() 314 pri_ad.log.info("Profiles to be connected {}".format(profiles_set)) 315 # First use APIs to check profile connection state 316 while (time.time() < end_time 317 and not profile_connected.issuperset(profiles_set)): 318 if (bt_profile_constants['headset_client'] not in profile_connected 319 and bt_profile_constants['headset_client'] in profiles_set): 320 if is_hfp_client_device_connected(pri_ad, sec_addr): 321 profile_connected.add(bt_profile_constants['headset_client']) 322 if (bt_profile_constants['a2dp'] not in profile_connected 323 and bt_profile_constants['a2dp'] in profiles_set): 324 if is_a2dp_src_device_connected(pri_ad, sec_addr): 325 profile_connected.add(bt_profile_constants['a2dp']) 326 if (bt_profile_constants['a2dp_sink'] not in profile_connected 327 and bt_profile_constants['a2dp_sink'] in profiles_set): 328 if is_a2dp_snk_device_connected(pri_ad, sec_addr): 329 profile_connected.add(bt_profile_constants['a2dp_sink']) 330 if (bt_profile_constants['map_mce'] not in profile_connected 331 and bt_profile_constants['map_mce'] in profiles_set): 332 if is_map_mce_device_connected(pri_ad, sec_addr): 333 profile_connected.add(bt_profile_constants['map_mce']) 334 if (bt_profile_constants['map'] not in profile_connected 335 and bt_profile_constants['map'] in profiles_set): 336 if is_map_mse_device_connected(pri_ad, sec_addr): 337 profile_connected.add(bt_profile_constants['map']) 338 time.sleep(0.1) 339 # If APIs fail, try to find the connection broadcast receiver. 340 while not profile_connected.issuperset(profiles_set): 341 try: 342 profile_event = pri_ad.ed.pop_event( 343 bluetooth_profile_connection_state_changed, 344 bt_default_timeout + 10) 345 pri_ad.log.info("Got event {}".format(profile_event)) 346 except Exception: 347 pri_ad.log.error("Did not get {} profiles left {}".format( 348 bluetooth_profile_connection_state_changed, profile_connected)) 349 return False 350 351 profile = profile_event['data']['profile'] 352 state = profile_event['data']['state'] 353 device_addr = profile_event['data']['addr'] 354 if state == bt_profile_states['connected'] and \ 355 device_addr == sec_ad.droid.bluetoothGetLocalAddress(): 356 profile_connected.add(profile) 357 pri_ad.log.info( 358 "Profiles connected until now {}".format(profile_connected)) 359 # Failure happens inside the while loop. If we came here then we already 360 # connected. 361 return True 362 363 364def determine_max_advertisements(android_device): 365 """Determines programatically how many advertisements the Android device 366 supports. 367 368 Args: 369 android_device: The Android device to determine max advertisements of. 370 371 Returns: 372 The maximum advertisement count. 373 """ 374 android_device.log.info( 375 "Determining number of maximum concurrent advertisements...") 376 advertisement_count = 0 377 bt_enabled = False 378 expected_bluetooth_on_event_name = bluetooth_on 379 if not android_device.droid.bluetoothCheckState(): 380 android_device.droid.bluetoothToggleState(True) 381 try: 382 android_device.ed.pop_event(expected_bluetooth_on_event_name, 383 bt_default_timeout) 384 except Exception: 385 android_device.log.info( 386 "Failed to toggle Bluetooth on(no broadcast received).") 387 # Try one more time to poke at the actual state. 388 if android_device.droid.bluetoothCheckState() is True: 389 android_device.log.info(".. actual state is ON") 390 else: 391 android_device.log.error( 392 "Failed to turn Bluetooth on. Setting default advertisements to 1" 393 ) 394 advertisement_count = -1 395 return advertisement_count 396 advertise_callback_list = [] 397 advertise_data = android_device.droid.bleBuildAdvertiseData() 398 advertise_settings = android_device.droid.bleBuildAdvertiseSettings() 399 while (True): 400 advertise_callback = android_device.droid.bleGenBleAdvertiseCallback() 401 advertise_callback_list.append(advertise_callback) 402 403 android_device.droid.bleStartBleAdvertising(advertise_callback, 404 advertise_data, 405 advertise_settings) 406 407 regex = "(" + adv_succ.format( 408 advertise_callback) + "|" + adv_fail.format( 409 advertise_callback) + ")" 410 # wait for either success or failure event 411 evt = android_device.ed.pop_events(regex, bt_default_timeout, 412 small_timeout) 413 if evt[0]["name"] == adv_succ.format(advertise_callback): 414 advertisement_count += 1 415 android_device.log.info( 416 "Advertisement {} started.".format(advertisement_count)) 417 else: 418 error = evt[0]["data"]["Error"] 419 if error == "ADVERTISE_FAILED_TOO_MANY_ADVERTISERS": 420 android_device.log.info( 421 "Advertisement failed to start. Reached max " + 422 "advertisements at {}".format(advertisement_count)) 423 break 424 else: 425 raise BtTestUtilsError( 426 "Expected ADVERTISE_FAILED_TOO_MANY_ADVERTISERS," + 427 " but received bad error code {}".format(error)) 428 try: 429 for adv in advertise_callback_list: 430 android_device.droid.bleStopBleAdvertising(adv) 431 except Exception: 432 android_device.log.error( 433 "Failed to stop advertisingment, resetting Bluetooth.") 434 reset_bluetooth([android_device]) 435 return advertisement_count 436 437 438def disable_bluetooth(droid): 439 """Disable Bluetooth on input Droid object. 440 441 Args: 442 droid: The droid object to disable Bluetooth on. 443 444 Returns: 445 True if successful, false if unsuccessful. 446 """ 447 if droid.bluetoothCheckState() is True: 448 droid.bluetoothToggleState(False) 449 if droid.bluetoothCheckState() is True: 450 log.error("Failed to toggle Bluetooth off.") 451 return False 452 return True 453 454 455def disconnect_pri_from_sec(pri_ad, sec_ad, profiles_list): 456 """ 457 Disconnect primary from secondary on a specific set of profiles 458 Args: 459 pri_ad - Primary android_device initiating disconnection 460 sec_ad - Secondary android droid (sl4a interface to keep the 461 method signature the same connect_pri_to_sec above) 462 profiles_list - List of profiles we want to disconnect from 463 464 Returns: 465 True on Success 466 False on Failure 467 """ 468 # Sanity check to see if all the profiles in the given set is supported 469 supported_profiles = bt_profile_constants.values() 470 for profile in profiles_list: 471 if profile not in supported_profiles: 472 pri_ad.log.info("Profile {} is not in supported list {}".format( 473 profile, supported_profiles)) 474 return False 475 476 pri_ad.log.info(pri_ad.droid.bluetoothGetBondedDevices()) 477 # Disconnecting on a already disconnected profile is a nop, 478 # so not checking for the connection state 479 try: 480 pri_ad.droid.bluetoothDisconnectConnectedProfile( 481 sec_ad.droid.bluetoothGetLocalAddress(), profiles_list) 482 except Exception as err: 483 pri_ad.log.error( 484 "Exception while trying to disconnect profile(s) {}: {}".format( 485 profiles_list, err)) 486 return False 487 488 profile_disconnected = set() 489 pri_ad.log.info("Disconnecting from profiles: {}".format(profiles_list)) 490 491 while not profile_disconnected.issuperset(profiles_list): 492 try: 493 profile_event = pri_ad.ed.pop_event( 494 bluetooth_profile_connection_state_changed, bt_default_timeout) 495 pri_ad.log.info("Got event {}".format(profile_event)) 496 except Exception as e: 497 pri_ad.log.error( 498 "Did not disconnect from Profiles. Reason {}".format(e)) 499 return False 500 501 profile = profile_event['data']['profile'] 502 state = profile_event['data']['state'] 503 device_addr = profile_event['data']['addr'] 504 505 if state == bt_profile_states['disconnected'] and \ 506 device_addr == sec_ad.droid.bluetoothGetLocalAddress(): 507 profile_disconnected.add(profile) 508 pri_ad.log.info( 509 "Profiles disconnected so far {}".format(profile_disconnected)) 510 511 return True 512 513 514def enable_bluetooth(droid, ed): 515 if droid.bluetoothCheckState() is True: 516 return True 517 518 droid.bluetoothToggleState(True) 519 expected_bluetooth_on_event_name = bluetooth_on 520 try: 521 ed.pop_event(expected_bluetooth_on_event_name, bt_default_timeout) 522 except Exception: 523 log.info("Failed to toggle Bluetooth on (no broadcast received)") 524 if droid.bluetoothCheckState() is True: 525 log.info(".. actual state is ON") 526 return True 527 log.info(".. actual state is OFF") 528 return False 529 530 return True 531 532 533def factory_reset_bluetooth(android_devices): 534 """Clears Bluetooth stack of input Android device list. 535 536 Args: 537 android_devices: The Android device list to reset Bluetooth 538 539 Returns: 540 True if successful, false if unsuccessful. 541 """ 542 for a in android_devices: 543 droid, ed = a.droid, a.ed 544 a.log.info("Reset state of bluetooth on device.") 545 if not bluetooth_enabled_check(a): 546 return False 547 # TODO: remove device unbond b/79418045 548 # Temporary solution to ensure all devices are unbonded 549 bonded_devices = droid.bluetoothGetBondedDevices() 550 for b in bonded_devices: 551 a.log.info("Removing bond for device {}".format(b['address'])) 552 droid.bluetoothUnbond(b['address']) 553 554 droid.bluetoothFactoryReset() 555 wait_for_bluetooth_manager_state(droid) 556 if not enable_bluetooth(droid, ed): 557 return False 558 return True 559 560 561def generate_ble_advertise_objects(droid): 562 """Generate generic LE advertise objects. 563 564 Args: 565 droid: The droid object to generate advertise LE objects from. 566 567 Returns: 568 advertise_callback: The generated advertise callback id. 569 advertise_data: The generated advertise data id. 570 advertise_settings: The generated advertise settings id. 571 """ 572 advertise_callback = droid.bleGenBleAdvertiseCallback() 573 advertise_data = droid.bleBuildAdvertiseData() 574 advertise_settings = droid.bleBuildAdvertiseSettings() 575 return advertise_callback, advertise_data, advertise_settings 576 577 578def generate_ble_scan_objects(droid): 579 """Generate generic LE scan objects. 580 581 Args: 582 droid: The droid object to generate LE scan objects from. 583 584 Returns: 585 filter_list: The generated scan filter list id. 586 scan_settings: The generated scan settings id. 587 scan_callback: The generated scan callback id. 588 """ 589 filter_list = droid.bleGenFilterList() 590 scan_settings = droid.bleBuildScanSetting() 591 scan_callback = droid.bleGenScanCallback() 592 return filter_list, scan_settings, scan_callback 593 594 595def generate_id_by_size(size, 596 chars=(string.ascii_lowercase + 597 string.ascii_uppercase + string.digits)): 598 """Generate random ascii characters of input size and input char types 599 600 Args: 601 size: Input size of string. 602 chars: (Optional) Chars to use in generating a random string. 603 604 Returns: 605 String of random input chars at the input size. 606 """ 607 return ''.join(random.choice(chars) for _ in range(size)) 608 609 610def get_advanced_droid_list(android_devices): 611 """Add max_advertisement and batch_scan_supported attributes to input 612 Android devices 613 614 This will programatically determine maximum LE advertisements of each 615 input Android device. 616 617 Args: 618 android_devices: The Android devices to setup. 619 620 Returns: 621 List of Android devices with new attribtues. 622 """ 623 droid_list = [] 624 for a in android_devices: 625 d, e = a.droid, a.ed 626 model = d.getBuildModel() 627 max_advertisements = 1 628 batch_scan_supported = True 629 if model in advertisements_to_devices.keys(): 630 max_advertisements = advertisements_to_devices[model] 631 else: 632 max_advertisements = determine_max_advertisements(a) 633 max_tries = 3 634 # Retry to calculate max advertisements 635 while max_advertisements == -1 and max_tries > 0: 636 a.log.info( 637 "Attempts left to determine max advertisements: {}".format( 638 max_tries)) 639 max_advertisements = determine_max_advertisements(a) 640 max_tries -= 1 641 advertisements_to_devices[model] = max_advertisements 642 if model in batch_scan_not_supported_list: 643 batch_scan_supported = False 644 role = { 645 'droid': d, 646 'ed': e, 647 'max_advertisements': max_advertisements, 648 'batch_scan_supported': batch_scan_supported 649 } 650 droid_list.append(role) 651 return droid_list 652 653 654def get_bluetooth_crash_count(android_device): 655 out = android_device.adb.shell("dumpsys bluetooth_manager") 656 return int(re.search("crashed(.*\d)", out).group(1)) 657 658 659def get_bt_metric(ad_list, duration=1, tag="bt_metric", processed=True): 660 """ Function to get the bt metric from logcat. 661 662 Captures logcat for the specified duration and returns the bqr results. 663 Takes list of android objects as input. If a single android object is given, 664 converts it into a list. 665 666 Args: 667 ad_list: list of android_device objects 668 duration: time duration (seconds) for which the logcat is parsed. 669 tag: tag to be appended to the logcat dump. 670 processed: flag to process bqr output. 671 672 Returns: 673 metrics_dict: dict of metrics for each android device. 674 """ 675 676 # Defining bqr quantitites and their regex to extract 677 regex_dict = {"pwlv": "PwLv:\s(\S+)", "rssi": "RSSI:\s[-](\d+)"} 678 metrics_dict = {"rssi": {}, "pwlv": {}} 679 680 # Converting a single android device object to list 681 if not isinstance(ad_list, list): 682 ad_list = [ad_list] 683 684 #Time sync with the test machine 685 for ad in ad_list: 686 ad.droid.setTime(int(round(time.time() * 1000))) 687 time.sleep(0.5) 688 689 begin_time = utils.get_current_epoch_time() 690 time.sleep(duration) 691 end_time = utils.get_current_epoch_time() 692 693 for ad in ad_list: 694 bt_rssi_log = ad.cat_adb_log(tag, begin_time, end_time) 695 bqr_tag = "Monitoring , Handle:" 696 697 # Extracting supporting bqr quantities 698 for metric, regex in regex_dict.items(): 699 bqr_metric = [] 700 file_bt_log = open(bt_rssi_log, "r") 701 for line in file_bt_log: 702 if bqr_tag in line: 703 if re.findall(regex, line): 704 m = re.findall(regex, line)[0].strip(",") 705 bqr_metric.append(m) 706 metrics_dict[metric][ad.serial] = bqr_metric 707 708 # Formatting the raw data 709 metrics_dict["rssi"][ad.serial] = [ 710 (-1) * int(x) for x in metrics_dict["rssi"][ad.serial] 711 ] 712 metrics_dict["pwlv"][ad.serial] = [ 713 int(x, 16) for x in metrics_dict["pwlv"][ad.serial] 714 ] 715 716 # Processing formatted data if processing is required 717 if processed: 718 # Computes the average RSSI 719 metrics_dict["rssi"][ad.serial] = round( 720 sum(metrics_dict["rssi"][ad.serial]) / 721 len(metrics_dict["rssi"][ad.serial]), 2) 722 # Returns last noted value for power level 723 metrics_dict["pwlv"][ad.serial] = metrics_dict["pwlv"][ 724 ad.serial][-1] 725 726 return metrics_dict 727 728 729def get_bt_rssi(ad, duration=1, processed=True): 730 """Function to get average bt rssi from logcat. 731 732 This function returns the average RSSI for the given duration. RSSI values are 733 extracted from BQR. 734 735 Args: 736 ad: (list of) android_device object. 737 duration: time duration(seconds) for which logcat is parsed. 738 739 Returns: 740 avg_rssi: average RSSI on each android device for the given duration. 741 """ 742 function_tag = "get_bt_rssi" 743 bqr_results = get_bt_metric(ad, 744 duration, 745 tag=function_tag, 746 processed=processed) 747 return bqr_results["rssi"] 748 749 750def enable_bqr( 751 ad_list, 752 bqr_interval=10, 753 bqr_event_mask=15, 754): 755 """Sets up BQR reporting. 756 757 Sets up BQR to report BT metrics at the requested frequency and toggles 758 airplane mode for the bqr settings to take effect. 759 760 Args: 761 ad_list: an android_device or list of android devices. 762 """ 763 # Converting a single android device object to list 764 if not isinstance(ad_list, list): 765 ad_list = [ad_list] 766 767 for ad in ad_list: 768 #Setting BQR parameters 769 ad.adb.shell("setprop persist.bluetooth.bqr.event_mask {}".format( 770 bqr_event_mask)) 771 ad.adb.shell("setprop persist.bluetooth.bqr.min_interval_ms {}".format( 772 bqr_interval)) 773 774 ## Toggle airplane mode 775 ad.droid.connectivityToggleAirplaneMode(True) 776 ad.droid.connectivityToggleAirplaneMode(False) 777 778 779def disable_bqr(ad_list): 780 """Disables BQR reporting. 781 782 Args: 783 ad_list: an android_device or list of android devices. 784 """ 785 # Converting a single android device object to list 786 if not isinstance(ad_list, list): 787 ad_list = [ad_list] 788 789 DISABLE_BQR_MASK = 0 790 791 for ad in ad_list: 792 #Disabling BQR 793 ad.adb.shell("setprop persist.bluetooth.bqr.event_mask {}".format( 794 DISABLE_BQR_MASK)) 795 796 ## Toggle airplane mode 797 ad.droid.connectivityToggleAirplaneMode(True) 798 ad.droid.connectivityToggleAirplaneMode(False) 799 800 801def get_device_selector_dictionary(android_device_list): 802 """Create a dictionary of Bluetooth features vs Android devices. 803 804 Args: 805 android_device_list: The list of Android devices. 806 Returns: 807 A dictionary of profiles/features to Android devices. 808 """ 809 selector_dict = {} 810 for ad in android_device_list: 811 uuids = ad.droid.bluetoothGetLocalUuids() 812 813 for profile, uuid_const in sig_uuid_constants.items(): 814 uuid_check = sig_uuid_constants['BASE_UUID'].format( 815 uuid_const).lower() 816 if uuids and uuid_check in uuids: 817 if profile in selector_dict: 818 selector_dict[profile].append(ad) 819 else: 820 selector_dict[profile] = [ad] 821 822 # Various services may not be active during BT startup. 823 # If the device can be identified through adb shell pm list features 824 # then try to add them to the appropriate profiles / features. 825 826 # Android TV. 827 if "feature:com.google.android.tv.installed" in ad.features: 828 ad.log.info("Android TV device found.") 829 supported_profiles = ['AudioSink'] 830 _add_android_device_to_dictionary(ad, supported_profiles, 831 selector_dict) 832 833 # Android Auto 834 elif "feature:android.hardware.type.automotive" in ad.features: 835 ad.log.info("Android Auto device found.") 836 # Add: AudioSink , A/V_RemoteControl, 837 supported_profiles = [ 838 'AudioSink', 'A/V_RemoteControl', 'Message Notification Server' 839 ] 840 _add_android_device_to_dictionary(ad, supported_profiles, 841 selector_dict) 842 # Android Wear 843 elif "feature:android.hardware.type.watch" in ad.features: 844 ad.log.info("Android Wear device found.") 845 supported_profiles = [] 846 _add_android_device_to_dictionary(ad, supported_profiles, 847 selector_dict) 848 # Android Phone 849 elif "feature:android.hardware.telephony" in ad.features: 850 ad.log.info("Android Phone device found.") 851 # Add: AudioSink 852 supported_profiles = [ 853 'AudioSource', 'A/V_RemoteControlTarget', 854 'Message Access Server' 855 ] 856 _add_android_device_to_dictionary(ad, supported_profiles, 857 selector_dict) 858 return selector_dict 859 860 861def get_mac_address_of_generic_advertisement(scan_ad, adv_ad): 862 """Start generic advertisement and get it's mac address by LE scanning. 863 864 Args: 865 scan_ad: The Android device to use as the scanner. 866 adv_ad: The Android device to use as the advertiser. 867 868 Returns: 869 mac_address: The mac address of the advertisement. 870 advertise_callback: The advertise callback id of the active 871 advertisement. 872 """ 873 adv_ad.droid.bleSetAdvertiseDataIncludeDeviceName(True) 874 adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode( 875 ble_advertise_settings_modes['low_latency']) 876 adv_ad.droid.bleSetAdvertiseSettingsIsConnectable(True) 877 adv_ad.droid.bleSetAdvertiseSettingsTxPowerLevel( 878 ble_advertise_settings_tx_powers['high']) 879 advertise_callback, advertise_data, advertise_settings = ( 880 generate_ble_advertise_objects(adv_ad.droid)) 881 adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data, 882 advertise_settings) 883 try: 884 adv_ad.ed.pop_event(adv_succ.format(advertise_callback), 885 bt_default_timeout) 886 except Empty as err: 887 raise BtTestUtilsError( 888 "Advertiser did not start successfully {}".format(err)) 889 filter_list = scan_ad.droid.bleGenFilterList() 890 scan_settings = scan_ad.droid.bleBuildScanSetting() 891 scan_callback = scan_ad.droid.bleGenScanCallback() 892 scan_ad.droid.bleSetScanFilterDeviceName( 893 adv_ad.droid.bluetoothGetLocalName()) 894 scan_ad.droid.bleBuildScanFilter(filter_list) 895 scan_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) 896 try: 897 event = scan_ad.ed.pop_event( 898 "BleScan{}onScanResults".format(scan_callback), bt_default_timeout) 899 except Empty as err: 900 raise BtTestUtilsError( 901 "Scanner did not find advertisement {}".format(err)) 902 mac_address = event['data']['Result']['deviceInfo']['address'] 903 return mac_address, advertise_callback, scan_callback 904 905 906def hid_device_send_key_data_report(host_id, device_ad, key, interval=1): 907 """Send a HID report simulating a 1-second keyboard press from host_ad to 908 device_ad 909 910 Args: 911 host_id: the Bluetooth MAC address or name of the HID host 912 device_ad: HID device 913 key: the key we want to send 914 interval: the interval between key press and key release 915 """ 916 device_ad.droid.bluetoothHidDeviceSendReport(host_id, hid_id_keyboard, 917 hid_keyboard_report(key)) 918 time.sleep(interval) 919 device_ad.droid.bluetoothHidDeviceSendReport(host_id, hid_id_keyboard, 920 hid_keyboard_report("00")) 921 922 923def hid_keyboard_report(key, modifier="00"): 924 """Get the HID keyboard report for the given key 925 926 Args: 927 key: the key we want 928 modifier: HID keyboard modifier bytes 929 Returns: 930 The byte array for the HID report. 931 """ 932 return str( 933 bytearray.fromhex(" ".join( 934 [modifier, "00", key, "00", "00", "00", "00", "00"])), "utf-8") 935 936 937def is_a2dp_connected(sink, source): 938 """ 939 Convenience Function to see if the 2 devices are connected on 940 A2dp. 941 Args: 942 sink: Audio Sink 943 source: Audio Source 944 Returns: 945 True if Connected 946 False if Not connected 947 """ 948 949 devices = sink.droid.bluetoothA2dpSinkGetConnectedDevices() 950 for device in devices: 951 sink.log.info("A2dp Connected device {}".format(device["name"])) 952 if (device["address"] == source.droid.bluetoothGetLocalAddress()): 953 return True 954 return False 955 956 957def is_a2dp_snk_device_connected(ad, addr): 958 """Determines if an AndroidDevice has A2DP snk connectivity to input address 959 960 Args: 961 ad: the Android device 962 addr: the address that's expected 963 Returns: 964 True if connection was successful, false if unsuccessful. 965 """ 966 devices = ad.droid.bluetoothA2dpSinkGetConnectedDevices() 967 ad.log.info("Connected A2DP Sink devices: {}".format(devices)) 968 if addr in {d['address'] for d in devices}: 969 return True 970 return False 971 972 973def is_a2dp_src_device_connected(ad, addr): 974 """Determines if an AndroidDevice has A2DP connectivity to input address 975 976 Args: 977 ad: the Android device 978 addr: the address that's expected 979 Returns: 980 True if connection was successful, false if unsuccessful. 981 """ 982 devices = ad.droid.bluetoothA2dpGetConnectedDevices() 983 ad.log.info("Connected A2DP Source devices: {}".format(devices)) 984 if addr in {d['address'] for d in devices}: 985 return True 986 return False 987 988 989def is_hfp_client_device_connected(ad, addr): 990 """Determines if an AndroidDevice has HFP connectivity to input address 991 992 Args: 993 ad: the Android device 994 addr: the address that's expected 995 Returns: 996 True if connection was successful, false if unsuccessful. 997 """ 998 devices = ad.droid.bluetoothHfpClientGetConnectedDevices() 999 ad.log.info("Connected HFP Client devices: {}".format(devices)) 1000 if addr in {d['address'] for d in devices}: 1001 return True 1002 return False 1003 1004 1005def is_map_mce_device_connected(ad, addr): 1006 """Determines if an AndroidDevice has MAP MCE connectivity to input address 1007 1008 Args: 1009 ad: the Android device 1010 addr: the address that's expected 1011 Returns: 1012 True if connection was successful, false if unsuccessful. 1013 """ 1014 devices = ad.droid.bluetoothMapClientGetConnectedDevices() 1015 ad.log.info("Connected MAP MCE devices: {}".format(devices)) 1016 if addr in {d['address'] for d in devices}: 1017 return True 1018 return False 1019 1020 1021def is_map_mse_device_connected(ad, addr): 1022 """Determines if an AndroidDevice has MAP MSE connectivity to input address 1023 1024 Args: 1025 ad: the Android device 1026 addr: the address that's expected 1027 Returns: 1028 True if connection was successful, false if unsuccessful. 1029 """ 1030 devices = ad.droid.bluetoothMapGetConnectedDevices() 1031 ad.log.info("Connected MAP MSE devices: {}".format(devices)) 1032 if addr in {d['address'] for d in devices}: 1033 return True 1034 return False 1035 1036 1037def kill_bluetooth_process(ad): 1038 """Kill Bluetooth process on Android device. 1039 1040 Args: 1041 ad: Android device to kill BT process on. 1042 """ 1043 ad.log.info("Killing Bluetooth process.") 1044 pid = ad.adb.shell( 1045 "ps | grep com.android.bluetooth | awk '{print $2}'").decode('ascii') 1046 call(["adb -s " + ad.serial + " shell kill " + pid], shell=True) 1047 1048 1049def log_energy_info(android_devices, state): 1050 """Logs energy info of input Android devices. 1051 1052 Args: 1053 android_devices: input Android device list to log energy info from. 1054 state: the input state to log. Usually 'Start' or 'Stop' for logging. 1055 1056 Returns: 1057 A logging string of the Bluetooth energy info reported. 1058 """ 1059 return_string = "{} Energy info collection:\n".format(state) 1060 # Bug: b/31966929 1061 return return_string 1062 1063 1064def orchestrate_and_verify_pan_connection(pan_dut, panu_dut): 1065 """Setups up a PAN conenction between two android devices. 1066 1067 Args: 1068 pan_dut: the Android device providing tethering services 1069 panu_dut: the Android device using the internet connection from the 1070 pan_dut 1071 Returns: 1072 True if PAN connection and verification is successful, 1073 false if unsuccessful. 1074 """ 1075 if not toggle_airplane_mode_by_adb(log, panu_dut, True): 1076 panu_dut.log.error("Failed to toggle airplane mode on") 1077 return False 1078 if not toggle_airplane_mode_by_adb(log, panu_dut, False): 1079 pan_dut.log.error("Failed to toggle airplane mode off") 1080 return False 1081 pan_dut.droid.bluetoothStartConnectionStateChangeMonitor("") 1082 panu_dut.droid.bluetoothStartConnectionStateChangeMonitor("") 1083 if not bluetooth_enabled_check(panu_dut): 1084 return False 1085 if not bluetooth_enabled_check(pan_dut): 1086 return False 1087 pan_dut.droid.bluetoothPanSetBluetoothTethering(True) 1088 if not (pair_pri_to_sec(pan_dut, panu_dut)): 1089 return False 1090 if not pan_dut.droid.bluetoothPanIsTetheringOn(): 1091 pan_dut.log.error("Failed to enable Bluetooth tethering.") 1092 return False 1093 # Magic sleep needed to give the stack time in between bonding and 1094 # connecting the PAN profile. 1095 time.sleep(pan_connect_timeout) 1096 panu_dut.droid.bluetoothConnectBonded( 1097 pan_dut.droid.bluetoothGetLocalAddress()) 1098 if not verify_http_connection(log, panu_dut): 1099 panu_dut.log.error("Can't verify http connection on PANU device.") 1100 if not verify_http_connection(log, pan_dut): 1101 pan_dut.log.info( 1102 "Can't verify http connection on PAN service device") 1103 return False 1104 return True 1105 1106 1107def orchestrate_bluetooth_socket_connection( 1108 client_ad, 1109 server_ad, 1110 accept_timeout_ms=default_bluetooth_socket_timeout_ms, 1111 uuid=None): 1112 """Sets up the Bluetooth Socket connection between two Android devices. 1113 1114 Args: 1115 client_ad: the Android device performing the connection. 1116 server_ad: the Android device accepting the connection. 1117 Returns: 1118 True if connection was successful, false if unsuccessful. 1119 """ 1120 server_ad.droid.bluetoothStartPairingHelper() 1121 client_ad.droid.bluetoothStartPairingHelper() 1122 1123 server_ad.droid.bluetoothSocketConnBeginAcceptThreadUuid( 1124 (bluetooth_socket_conn_test_uuid if uuid is None else uuid), 1125 accept_timeout_ms) 1126 client_ad.droid.bluetoothSocketConnBeginConnectThreadUuid( 1127 server_ad.droid.bluetoothGetLocalAddress(), 1128 (bluetooth_socket_conn_test_uuid if uuid is None else uuid)) 1129 1130 end_time = time.time() + bt_default_timeout 1131 result = False 1132 test_result = True 1133 while time.time() < end_time: 1134 if len(client_ad.droid.bluetoothSocketConnActiveConnections()) > 0: 1135 test_result = True 1136 client_ad.log.info("Bluetooth socket Client Connection Active") 1137 break 1138 else: 1139 test_result = False 1140 time.sleep(1) 1141 if not test_result: 1142 client_ad.log.error( 1143 "Failed to establish a Bluetooth socket connection") 1144 return False 1145 return True 1146 1147 1148def orchestrate_rfcomm_connection(client_ad, 1149 server_ad, 1150 accept_timeout_ms=default_rfcomm_timeout_ms, 1151 uuid=None): 1152 """Sets up the RFCOMM connection between two Android devices. 1153 1154 Args: 1155 client_ad: the Android device performing the connection. 1156 server_ad: the Android device accepting the connection. 1157 Returns: 1158 True if connection was successful, false if unsuccessful. 1159 """ 1160 result = orchestrate_bluetooth_socket_connection( 1161 client_ad, server_ad, accept_timeout_ms, 1162 (bt_rfcomm_uuids['default_uuid'] if uuid is None else uuid)) 1163 1164 return result 1165 1166 1167def pair_pri_to_sec(pri_ad, sec_ad, attempts=2, auto_confirm=True): 1168 """Pairs pri droid to secondary droid. 1169 1170 Args: 1171 pri_ad: Android device initiating connection 1172 sec_ad: Android device accepting connection 1173 attempts: Number of attempts to try until failure. 1174 auto_confirm: Auto confirm passkey match for both devices 1175 1176 Returns: 1177 Pass if True 1178 Fail if False 1179 """ 1180 pri_ad.droid.bluetoothStartConnectionStateChangeMonitor( 1181 sec_ad.droid.bluetoothGetLocalAddress()) 1182 curr_attempts = 0 1183 while curr_attempts < attempts: 1184 if _pair_pri_to_sec(pri_ad, sec_ad, auto_confirm): 1185 return True 1186 # Wait 2 seconds before unbound 1187 time.sleep(2) 1188 if not clear_bonded_devices(pri_ad): 1189 log.error( 1190 "Failed to clear bond for primary device at attempt {}".format( 1191 str(curr_attempts))) 1192 return False 1193 if not clear_bonded_devices(sec_ad): 1194 log.error( 1195 "Failed to clear bond for secondary device at attempt {}". 1196 format(str(curr_attempts))) 1197 return False 1198 # Wait 2 seconds after unbound 1199 time.sleep(2) 1200 curr_attempts += 1 1201 log.error("pair_pri_to_sec failed to connect after {} attempts".format( 1202 str(attempts))) 1203 return False 1204 1205 1206def _pair_pri_to_sec(pri_ad, sec_ad, auto_confirm): 1207 # Enable discovery on sec_ad so that pri_ad can find it. 1208 # The timeout here is based on how much time it would take for two devices 1209 # to pair with each other once pri_ad starts seeing devices. 1210 pri_droid = pri_ad.droid 1211 sec_droid = sec_ad.droid 1212 pri_ad.ed.clear_all_events() 1213 sec_ad.ed.clear_all_events() 1214 log.info("Bonding device {} to {}".format( 1215 pri_droid.bluetoothGetLocalAddress(), 1216 sec_droid.bluetoothGetLocalAddress())) 1217 sec_droid.bluetoothMakeDiscoverable(bt_default_timeout) 1218 target_address = sec_droid.bluetoothGetLocalAddress() 1219 log.debug("Starting paring helper on each device") 1220 pri_droid.bluetoothStartPairingHelper(auto_confirm) 1221 sec_droid.bluetoothStartPairingHelper(auto_confirm) 1222 pri_ad.log.info("Primary device starting discovery and executing bond") 1223 result = pri_droid.bluetoothDiscoverAndBond(target_address) 1224 if not auto_confirm: 1225 if not _wait_for_passkey_match(pri_ad, sec_ad): 1226 return False 1227 # Loop until we have bonded successfully or timeout. 1228 end_time = time.time() + bt_default_timeout 1229 pri_ad.log.info("Verifying devices are bonded") 1230 while time.time() < end_time: 1231 bonded_devices = pri_droid.bluetoothGetBondedDevices() 1232 bonded = False 1233 for d in bonded_devices: 1234 if d['address'] == target_address: 1235 pri_ad.log.info("Successfully bonded to device") 1236 return True 1237 time.sleep(0.1) 1238 # Timed out trying to bond. 1239 pri_ad.log.info("Failed to bond devices.") 1240 return False 1241 1242 1243def reset_bluetooth(android_devices): 1244 """Resets Bluetooth state of input Android device list. 1245 1246 Args: 1247 android_devices: The Android device list to reset Bluetooth state on. 1248 1249 Returns: 1250 True if successful, false if unsuccessful. 1251 """ 1252 for a in android_devices: 1253 droid, ed = a.droid, a.ed 1254 a.log.info("Reset state of bluetooth on device.") 1255 if droid.bluetoothCheckState() is True: 1256 droid.bluetoothToggleState(False) 1257 expected_bluetooth_off_event_name = bluetooth_off 1258 try: 1259 ed.pop_event(expected_bluetooth_off_event_name, 1260 bt_default_timeout) 1261 except Exception: 1262 a.log.error("Failed to toggle Bluetooth off.") 1263 return False 1264 # temp sleep for b/17723234 1265 time.sleep(3) 1266 if not bluetooth_enabled_check(a): 1267 return False 1268 return True 1269 1270 1271def scan_and_verify_n_advertisements(scn_ad, max_advertisements): 1272 """Verify that input number of advertisements can be found from the scanning 1273 Android device. 1274 1275 Args: 1276 scn_ad: The Android device to start LE scanning on. 1277 max_advertisements: The number of advertisements the scanner expects to 1278 find. 1279 1280 Returns: 1281 True if successful, false if unsuccessful. 1282 """ 1283 test_result = False 1284 address_list = [] 1285 filter_list = scn_ad.droid.bleGenFilterList() 1286 scn_ad.droid.bleBuildScanFilter(filter_list) 1287 scan_settings = scn_ad.droid.bleBuildScanSetting() 1288 scan_callback = scn_ad.droid.bleGenScanCallback() 1289 scn_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) 1290 start_time = time.time() 1291 while (start_time + bt_default_timeout) > time.time(): 1292 event = None 1293 try: 1294 event = scn_ad.ed.pop_event(scan_result.format(scan_callback), 1295 bt_default_timeout) 1296 except Empty as error: 1297 raise BtTestUtilsError( 1298 "Failed to find scan event: {}".format(error)) 1299 address = event['data']['Result']['deviceInfo']['address'] 1300 if address not in address_list: 1301 address_list.append(address) 1302 if len(address_list) == max_advertisements: 1303 test_result = True 1304 break 1305 scn_ad.droid.bleStopBleScan(scan_callback) 1306 return test_result 1307 1308 1309def set_bluetooth_codec(android_device, 1310 codec_type, 1311 sample_rate, 1312 bits_per_sample, 1313 channel_mode, 1314 codec_specific_1=0): 1315 """Sets the A2DP codec configuration on the AndroidDevice. 1316 1317 Args: 1318 android_device (acts.controllers.android_device.AndroidDevice): the 1319 android device for which to switch the codec. 1320 codec_type (str): the desired codec type. Must be a key in 1321 bt_constants.codec_types. 1322 sample_rate (str): the desired sample rate. Must be a key in 1323 bt_constants.sample_rates. 1324 bits_per_sample (str): the desired bits per sample. Must be a key in 1325 bt_constants.bits_per_samples. 1326 channel_mode (str): the desired channel mode. Must be a key in 1327 bt_constants.channel_modes. 1328 codec_specific_1 (int): the desired bit rate (quality) for LDAC codec. 1329 Returns: 1330 bool: True if the codec config was successfully changed to the desired 1331 values. Else False. 1332 """ 1333 message = ("Set Android Device A2DP Bluetooth codec configuration:\n" 1334 "\tCodec: {codec_type}\n" 1335 "\tSample Rate: {sample_rate}\n" 1336 "\tBits per Sample: {bits_per_sample}\n" 1337 "\tChannel Mode: {channel_mode}".format( 1338 codec_type=codec_type, 1339 sample_rate=sample_rate, 1340 bits_per_sample=bits_per_sample, 1341 channel_mode=channel_mode)) 1342 android_device.log.info(message) 1343 1344 # Send SL4A command 1345 droid, ed = android_device.droid, android_device.ed 1346 if not droid.bluetoothA2dpSetCodecConfigPreference( 1347 codec_types[codec_type], sample_rates[str(sample_rate)], 1348 bits_per_samples[str(bits_per_sample)], 1349 channel_modes[channel_mode], codec_specific_1): 1350 android_device.log.warning( 1351 "SL4A command returned False. Codec was not " 1352 "changed.") 1353 else: 1354 try: 1355 ed.pop_event(bluetooth_a2dp_codec_config_changed, 1356 bt_default_timeout) 1357 except Exception: 1358 android_device.log.warning("SL4A event not registered. Codec " 1359 "may not have been changed.") 1360 1361 # Validate codec value through ADB 1362 # TODO (aidanhb): validate codec more robustly using SL4A 1363 command = "dumpsys bluetooth_manager | grep -i 'current codec'" 1364 out = android_device.adb.shell(command) 1365 split_out = out.split(": ") 1366 if len(split_out) != 2: 1367 android_device.log.warning("Could not verify codec config change " 1368 "through ADB.") 1369 elif split_out[1].strip().upper() != codec_type: 1370 android_device.log.error("Codec config was not changed.\n" 1371 "\tExpected codec: {exp}\n" 1372 "\tActual codec: {act}".format( 1373 exp=codec_type, act=split_out[1].strip())) 1374 return False 1375 android_device.log.info("Bluetooth codec successfully changed.") 1376 return True 1377 1378 1379def set_bt_scan_mode(ad, scan_mode_value): 1380 """Set Android device's Bluetooth scan mode. 1381 1382 Args: 1383 ad: The Android device to set the scan mode on. 1384 scan_mode_value: The value to set the scan mode to. 1385 1386 Returns: 1387 True if successful, false if unsuccessful. 1388 """ 1389 droid, ed = ad.droid, ad.ed 1390 if scan_mode_value == bt_scan_mode_types['state_off']: 1391 disable_bluetooth(droid) 1392 scan_mode = droid.bluetoothGetScanMode() 1393 reset_bluetooth([ad]) 1394 if scan_mode != scan_mode_value: 1395 return False 1396 elif scan_mode_value == bt_scan_mode_types['none']: 1397 droid.bluetoothMakeUndiscoverable() 1398 scan_mode = droid.bluetoothGetScanMode() 1399 if scan_mode != scan_mode_value: 1400 return False 1401 elif scan_mode_value == bt_scan_mode_types['connectable']: 1402 droid.bluetoothMakeUndiscoverable() 1403 droid.bluetoothMakeConnectable() 1404 scan_mode = droid.bluetoothGetScanMode() 1405 if scan_mode != scan_mode_value: 1406 return False 1407 elif (scan_mode_value == bt_scan_mode_types['connectable_discoverable']): 1408 droid.bluetoothMakeDiscoverable() 1409 scan_mode = droid.bluetoothGetScanMode() 1410 if scan_mode != scan_mode_value: 1411 return False 1412 else: 1413 # invalid scan mode 1414 return False 1415 return True 1416 1417 1418def set_device_name(droid, name): 1419 """Set and check Bluetooth local name on input droid object. 1420 1421 Args: 1422 droid: Droid object to set local name on. 1423 name: the Bluetooth local name to set. 1424 1425 Returns: 1426 True if successful, false if unsuccessful. 1427 """ 1428 droid.bluetoothSetLocalName(name) 1429 time.sleep(2) 1430 droid_name = droid.bluetoothGetLocalName() 1431 if droid_name != name: 1432 return False 1433 return True 1434 1435 1436def set_profile_priority(host_ad, client_ad, profiles, priority): 1437 """Sets the priority of said profile(s) on host_ad for client_ad""" 1438 for profile in profiles: 1439 host_ad.log.info("Profile {} on {} for {} set to priority {}".format( 1440 profile, host_ad.droid.bluetoothGetLocalName(), 1441 client_ad.droid.bluetoothGetLocalAddress(), priority.value)) 1442 if bt_profile_constants['a2dp_sink'] == profile: 1443 host_ad.droid.bluetoothA2dpSinkSetPriority( 1444 client_ad.droid.bluetoothGetLocalAddress(), priority.value) 1445 elif bt_profile_constants['headset_client'] == profile: 1446 host_ad.droid.bluetoothHfpClientSetPriority( 1447 client_ad.droid.bluetoothGetLocalAddress(), priority.value) 1448 elif bt_profile_constants['pbap_client'] == profile: 1449 host_ad.droid.bluetoothPbapClientSetPriority( 1450 client_ad.droid.bluetoothGetLocalAddress(), priority.value) 1451 else: 1452 host_ad.log.error( 1453 "Profile {} not yet supported for priority settings".format( 1454 profile)) 1455 1456 1457def setup_multiple_devices_for_bt_test(android_devices): 1458 """A common setup routine for Bluetooth on input Android device list. 1459 1460 Things this function sets up: 1461 1. Resets Bluetooth 1462 2. Set Bluetooth local name to random string of size 4 1463 3. Disable BLE background scanning. 1464 4. Enable Bluetooth snoop logging. 1465 1466 Args: 1467 android_devices: Android device list to setup Bluetooth on. 1468 1469 Returns: 1470 True if successful, false if unsuccessful. 1471 """ 1472 log.info("Setting up Android Devices") 1473 # TODO: Temp fix for an selinux error. 1474 for ad in android_devices: 1475 ad.adb.shell("setenforce 0") 1476 threads = [] 1477 try: 1478 for a in android_devices: 1479 thread = threading.Thread(target=factory_reset_bluetooth, 1480 args=([[a]])) 1481 threads.append(thread) 1482 thread.start() 1483 for t in threads: 1484 t.join() 1485 1486 for a in android_devices: 1487 d = a.droid 1488 # TODO: Create specific RPC command to instantiate 1489 # BluetoothConnectionFacade. This is just a workaround. 1490 d.bluetoothStartConnectionStateChangeMonitor("") 1491 setup_result = d.bluetoothSetLocalName(generate_id_by_size(4)) 1492 if not setup_result: 1493 a.log.error("Failed to set device name.") 1494 return setup_result 1495 d.bluetoothDisableBLE() 1496 utils.set_location_service(a, True) 1497 bonded_devices = d.bluetoothGetBondedDevices() 1498 for b in bonded_devices: 1499 a.log.info("Removing bond for device {}".format(b['address'])) 1500 d.bluetoothUnbond(b['address']) 1501 for a in android_devices: 1502 a.adb.shell("setprop persist.bluetooth.btsnooplogmode full") 1503 getprop_result = a.adb.shell( 1504 "getprop persist.bluetooth.btsnooplogmode") == "full" 1505 if not getprop_result: 1506 a.log.warning("Failed to enable Bluetooth Hci Snoop Logging.") 1507 except Exception as err: 1508 log.error("Something went wrong in multi device setup: {}".format(err)) 1509 return False 1510 return setup_result 1511 1512 1513def setup_n_advertisements(adv_ad, num_advertisements): 1514 """Setup input number of advertisements on input Android device. 1515 1516 Args: 1517 adv_ad: The Android device to start LE advertisements on. 1518 num_advertisements: The number of advertisements to start. 1519 1520 Returns: 1521 advertise_callback_list: List of advertisement callback ids. 1522 """ 1523 adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode( 1524 ble_advertise_settings_modes['low_latency']) 1525 advertise_data = adv_ad.droid.bleBuildAdvertiseData() 1526 advertise_settings = adv_ad.droid.bleBuildAdvertiseSettings() 1527 advertise_callback_list = [] 1528 for i in range(num_advertisements): 1529 advertise_callback = adv_ad.droid.bleGenBleAdvertiseCallback() 1530 advertise_callback_list.append(advertise_callback) 1531 adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data, 1532 advertise_settings) 1533 try: 1534 adv_ad.ed.pop_event(adv_succ.format(advertise_callback), 1535 bt_default_timeout) 1536 adv_ad.log.info("Advertisement {} started.".format(i + 1)) 1537 except Empty as error: 1538 adv_ad.log.error("Advertisement {} failed to start.".format(i + 1)) 1539 raise BtTestUtilsError( 1540 "Test failed with Empty error: {}".format(error)) 1541 return advertise_callback_list 1542 1543 1544def take_btsnoop_log(ad, testcase, testname): 1545 """Grabs the btsnoop_hci log on a device and stores it in the log directory 1546 of the test class. 1547 1548 If you want grab the btsnoop_hci log, call this function with android_device 1549 objects in on_fail. Bug report takes a relative long time to take, so use 1550 this cautiously. 1551 1552 Args: 1553 ad: The android_device instance to take bugreport on. 1554 testcase: Name of the test calss that triggered this snoop log. 1555 testname: Name of the test case that triggered this bug report. 1556 """ 1557 testname = "".join(x for x in testname if x.isalnum()) 1558 serial = ad.serial 1559 device_model = ad.droid.getBuildModel() 1560 device_model = device_model.replace(" ", "") 1561 out_name = ','.join((testname, device_model, serial)) 1562 snoop_path = os.path.join(ad.device_log_path, 'BluetoothSnoopLogs') 1563 os.makedirs(snoop_path, exist_ok=True) 1564 cmd = ''.join(("adb -s ", serial, " pull ", btsnoop_log_path_on_device, 1565 " ", snoop_path + '/' + out_name, ".btsnoop_hci.log")) 1566 exe_cmd(cmd) 1567 try: 1568 cmd = ''.join( 1569 ("adb -s ", serial, " pull ", btsnoop_last_log_path_on_device, " ", 1570 snoop_path + '/' + out_name, ".btsnoop_hci.log.last")) 1571 exe_cmd(cmd) 1572 except Exception as err: 1573 testcase.log.info( 1574 "File does not exist {}".format(btsnoop_last_log_path_on_device)) 1575 1576 1577def take_btsnoop_logs(android_devices, testcase, testname): 1578 """Pull btsnoop logs from an input list of android devices. 1579 1580 Args: 1581 android_devices: the list of Android devices to pull btsnoop logs from. 1582 testcase: Name of the test calss that triggered this snoop log. 1583 testname: Name of the test case that triggered this bug report. 1584 """ 1585 for a in android_devices: 1586 take_btsnoop_log(a, testcase, testname) 1587 1588 1589def teardown_n_advertisements(adv_ad, num_advertisements, 1590 advertise_callback_list): 1591 """Stop input number of advertisements on input Android device. 1592 1593 Args: 1594 adv_ad: The Android device to stop LE advertisements on. 1595 num_advertisements: The number of advertisements to stop. 1596 advertise_callback_list: The list of advertisement callbacks to stop. 1597 1598 Returns: 1599 True if successful, false if unsuccessful. 1600 """ 1601 for n in range(num_advertisements): 1602 adv_ad.droid.bleStopBleAdvertising(advertise_callback_list[n]) 1603 return True 1604 1605 1606def verify_server_and_client_connected(client_ad, server_ad, log=True): 1607 """Verify that input server and client Android devices are connected. 1608 1609 This code is under the assumption that there will only be 1610 a single connection. 1611 1612 Args: 1613 client_ad: the Android device to check number of active connections. 1614 server_ad: the Android device to check number of active connections. 1615 1616 Returns: 1617 True both server and client have at least 1 active connection, 1618 false if unsuccessful. 1619 """ 1620 test_result = True 1621 if len(server_ad.droid.bluetoothSocketConnActiveConnections()) == 0: 1622 if log: 1623 server_ad.log.error("No socket connections found on server.") 1624 test_result = False 1625 if len(client_ad.droid.bluetoothSocketConnActiveConnections()) == 0: 1626 if log: 1627 client_ad.log.error("No socket connections found on client.") 1628 test_result = False 1629 return test_result 1630 1631 1632def wait_for_bluetooth_manager_state(droid, 1633 state=None, 1634 timeout=10, 1635 threshold=5): 1636 """ Waits for BlueTooth normalized state or normalized explicit state 1637 args: 1638 droid: droid device object 1639 state: expected BlueTooth state 1640 timeout: max timeout threshold 1641 threshold: list len of bt state 1642 Returns: 1643 True if successful, false if unsuccessful. 1644 """ 1645 all_states = [] 1646 get_state = lambda: droid.bluetoothGetLeState() 1647 start_time = time.time() 1648 while time.time() < start_time + timeout: 1649 all_states.append(get_state()) 1650 if len(all_states) >= threshold: 1651 # for any normalized state 1652 if state is None: 1653 if len(set(all_states[-threshold:])) == 1: 1654 log.info("State normalized {}".format( 1655 set(all_states[-threshold:]))) 1656 return True 1657 else: 1658 # explicit check against normalized state 1659 if set([state]).issubset(all_states[-threshold:]): 1660 return True 1661 time.sleep(0.5) 1662 log.error( 1663 "Bluetooth state fails to normalize" if state is None else 1664 "Failed to match bluetooth state, current state {} expected state {}". 1665 format(get_state(), state)) 1666 return False 1667 1668 1669def _wait_for_passkey_match(pri_ad, sec_ad): 1670 pri_pin, sec_pin = -1, 1 1671 pri_variant, sec_variant = -1, 1 1672 pri_pairing_req, sec_pairing_req = None, None 1673 try: 1674 pri_pairing_req = pri_ad.ed.pop_event( 1675 event_name="BluetoothActionPairingRequest", 1676 timeout=bt_default_timeout) 1677 pri_variant = pri_pairing_req["data"]["PairingVariant"] 1678 pri_pin = pri_pairing_req["data"]["Pin"] 1679 pri_ad.log.info("Primary device received Pin: {}, Variant: {}".format( 1680 pri_pin, pri_variant)) 1681 sec_pairing_req = sec_ad.ed.pop_event( 1682 event_name="BluetoothActionPairingRequest", 1683 timeout=bt_default_timeout) 1684 sec_variant = sec_pairing_req["data"]["PairingVariant"] 1685 sec_pin = sec_pairing_req["data"]["Pin"] 1686 sec_ad.log.info( 1687 "Secondary device received Pin: {}, Variant: {}".format( 1688 sec_pin, sec_variant)) 1689 except Empty as err: 1690 log.error("Wait for pin error: {}".format(err)) 1691 log.error("Pairing request state, Primary: {}, Secondary: {}".format( 1692 pri_pairing_req, sec_pairing_req)) 1693 return False 1694 if pri_variant == sec_variant == pairing_variant_passkey_confirmation: 1695 confirmation = pri_pin == sec_pin 1696 if confirmation: 1697 log.info("Pairing code matched, accepting connection") 1698 else: 1699 log.info("Pairing code mismatched, rejecting connection") 1700 pri_ad.droid.eventPost("BluetoothActionPairingRequestUserConfirm", 1701 str(confirmation)) 1702 sec_ad.droid.eventPost("BluetoothActionPairingRequestUserConfirm", 1703 str(confirmation)) 1704 if not confirmation: 1705 return False 1706 elif pri_variant != sec_variant: 1707 log.error("Pairing variant mismatched, abort connection") 1708 return False 1709 return True 1710 1711 1712def write_read_verify_data(client_ad, server_ad, msg, binary=False): 1713 """Verify that the client wrote data to the server Android device correctly. 1714 1715 Args: 1716 client_ad: the Android device to perform the write. 1717 server_ad: the Android device to read the data written. 1718 msg: the message to write. 1719 binary: if the msg arg is binary or not. 1720 1721 Returns: 1722 True if the data written matches the data read, false if not. 1723 """ 1724 client_ad.log.info("Write message.") 1725 try: 1726 if binary: 1727 client_ad.droid.bluetoothSocketConnWriteBinary(msg) 1728 else: 1729 client_ad.droid.bluetoothSocketConnWrite(msg) 1730 except Exception as err: 1731 client_ad.log.error("Failed to write data: {}".format(err)) 1732 return False 1733 server_ad.log.info("Read message.") 1734 try: 1735 if binary: 1736 read_msg = server_ad.droid.bluetoothSocketConnReadBinary().rstrip( 1737 "\r\n") 1738 else: 1739 read_msg = server_ad.droid.bluetoothSocketConnRead() 1740 except Exception as err: 1741 server_ad.log.error("Failed to read data: {}".format(err)) 1742 return False 1743 log.info("Verify message.") 1744 if msg != read_msg: 1745 log.error("Mismatch! Read: {}, Expected: {}".format(read_msg, msg)) 1746 return False 1747 return True 1748 1749 1750class MediaControlOverSl4a(object): 1751 """Media control using sl4a facade for general purpose. 1752 1753 """ 1754 def __init__(self, android_device, music_file): 1755 """Initialize the media_control class. 1756 1757 Args: 1758 android_dut: android_device object 1759 music_file: location of the music file 1760 """ 1761 self.android_device = android_device 1762 self.music_file = music_file 1763 1764 def play(self): 1765 """Play media. 1766 1767 """ 1768 self.android_device.droid.mediaPlayOpen('file://%s' % self.music_file, 1769 'default', True) 1770 playing = self.android_device.droid.mediaIsPlaying() 1771 asserts.assert_true(playing, 1772 'Failed to play music %s' % self.music_file) 1773 1774 def pause(self): 1775 """Pause media. 1776 1777 """ 1778 self.android_device.droid.mediaPlayPause('default') 1779 paused = not self.android_device.droid.mediaIsPlaying() 1780 asserts.assert_true(paused, 1781 'Failed to pause music %s' % self.music_file) 1782 1783 def resume(self): 1784 """Resume media. 1785 1786 """ 1787 self.android_device.droid.mediaPlayStart('default') 1788 playing = self.android_device.droid.mediaIsPlaying() 1789 asserts.assert_true(playing, 1790 'Failed to play music %s' % self.music_file) 1791 1792 def stop(self): 1793 """Stop media. 1794 1795 """ 1796 self.android_device.droid.mediaPlayStop('default') 1797 stopped = not self.android_device.droid.mediaIsPlaying() 1798 asserts.assert_true(stopped, 1799 'Failed to stop music %s' % self.music_file) 1800