1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Generic telephony utility functions. Cloned from test_utils.tel.""" 18 19import re 20import struct 21import time 22from queue import Empty 23 24from acts.logger import epoch_to_log_line_timestamp 25from acts.controllers.adb_lib.error import AdbCommandError 26 27INCALL_UI_DISPLAY_FOREGROUND = "foreground" 28INCALL_UI_DISPLAY_BACKGROUND = "background" 29INCALL_UI_DISPLAY_DEFAULT = "default" 30 31# Max time to wait after caller make a call and before 32# callee start ringing 33MAX_WAIT_TIME_ACCEPT_CALL_TO_OFFHOOK_EVENT = 30 34 35# Max time to wait after toggle airplane mode and before 36# get expected event 37MAX_WAIT_TIME_AIRPLANEMODE_EVENT = 90 38 39# Wait time between state check retry 40WAIT_TIME_BETWEEN_STATE_CHECK = 5 41 42# Constant for Data Roaming State 43DATA_ROAMING_ENABLE = 1 44DATA_ROAMING_DISABLE = 0 45 46# Constant for Telephony Manager Call State 47TELEPHONY_STATE_RINGING = "RINGING" 48TELEPHONY_STATE_IDLE = "IDLE" 49TELEPHONY_STATE_OFFHOOK = "OFFHOOK" 50TELEPHONY_STATE_UNKNOWN = "UNKNOWN" 51 52# Constant for Service State 53SERVICE_STATE_EMERGENCY_ONLY = "EMERGENCY_ONLY" 54SERVICE_STATE_IN_SERVICE = "IN_SERVICE" 55SERVICE_STATE_OUT_OF_SERVICE = "OUT_OF_SERVICE" 56SERVICE_STATE_POWER_OFF = "POWER_OFF" 57SERVICE_STATE_UNKNOWN = "UNKNOWN" 58 59# Constant for Network Mode 60NETWORK_MODE_GSM_ONLY = "NETWORK_MODE_GSM_ONLY" 61NETWORK_MODE_WCDMA_ONLY = "NETWORK_MODE_WCDMA_ONLY" 62NETWORK_MODE_LTE_ONLY = "NETWORK_MODE_LTE_ONLY" 63 64# Constant for Events 65EVENT_CALL_STATE_CHANGED = "CallStateChanged" 66EVENT_SERVICE_STATE_CHANGED = "ServiceStateChanged" 67 68 69class CallStateContainer: 70 INCOMING_NUMBER = "incomingNumber" 71 SUBSCRIPTION_ID = "subscriptionId" 72 CALL_STATE = "callState" 73 74 75class ServiceStateContainer: 76 VOICE_REG_STATE = "voiceRegState" 77 VOICE_NETWORK_TYPE = "voiceNetworkType" 78 DATA_REG_STATE = "dataRegState" 79 DATA_NETWORK_TYPE = "dataNetworkType" 80 OPERATOR_NAME = "operatorName" 81 OPERATOR_ID = "operatorId" 82 IS_MANUAL_NW_SELECTION = "isManualNwSelection" 83 ROAMING = "roaming" 84 IS_EMERGENCY_ONLY = "isEmergencyOnly" 85 NETWORK_ID = "networkId" 86 SYSTEM_ID = "systemId" 87 SUBSCRIPTION_ID = "subscriptionId" 88 SERVICE_STATE = "serviceState" 89 90 91def dumpsys_last_call_info(ad): 92 """ Get call information by dumpsys telecom. """ 93 num = dumpsys_last_call_number(ad) 94 output = ad.adb.shell("dumpsys telecom") 95 result = re.search(r"Call TC@%s: {(.*?)}" % num, output, re.DOTALL) 96 call_info = {"TC": num} 97 if result: 98 result = result.group(1) 99 for attr in ("startTime", "endTime", "direction", "isInterrupted", 100 "callTechnologies", "callTerminationsReason", 101 "isVideoCall", "callProperties"): 102 match = re.search(r"%s: (.*)" % attr, result) 103 if match: 104 if attr in ("startTime", "endTime"): 105 call_info[attr] = epoch_to_log_line_timestamp( 106 int(match.group(1))) 107 else: 108 call_info[attr] = match.group(1) 109 ad.log.debug("call_info = %s", call_info) 110 return call_info 111 112 113def dumpsys_last_call_number(ad): 114 output = ad.adb.shell("dumpsys telecom") 115 call_nums = re.findall("Call TC@(\d+):", output) 116 if not call_nums: 117 return 0 118 else: 119 return int(call_nums[-1]) 120 121 122def get_device_epoch_time(ad): 123 return int(1000 * float(ad.adb.shell("date +%s.%N"))) 124 125 126def get_outgoing_voice_sub_id(ad): 127 """ Get outgoing voice subscription id 128 """ 129 if hasattr(ad, "outgoing_voice_sub_id"): 130 return ad.outgoing_voice_sub_id 131 else: 132 return ad.droid.subscriptionGetDefaultVoiceSubId() 133 134 135def get_rx_tx_power_levels(log, ad): 136 """ Obtains Rx and Tx power levels from the MDS application. 137 138 The method requires the MDS app to be installed in the DUT. 139 140 Args: 141 log: logger object 142 ad: an android device 143 144 Return: 145 A tuple where the first element is an array array with the RSRP value 146 in Rx chain, and the second element is the transmitted power in dBm. 147 Values for invalid Rx / Tx chains are set to None. 148 """ 149 cmd = ('am instrument -w -e request "80 00 e8 03 00 08 00 00 00" -e ' 150 'response wait "com.google.mdstest/com.google.mdstest.instrument.' 151 'ModemCommandInstrumentation"') 152 try: 153 output = ad.adb.shell(cmd) 154 except AdbCommandError as e: 155 log.error(e) 156 output = None 157 158 if not output or 'result=SUCCESS' not in output: 159 raise RuntimeError('Could not obtain Tx/Rx power levels from MDS. Is ' 160 'the MDS app installed?') 161 162 response = re.search(r"(?<=response=).+", output) 163 164 if not response: 165 raise RuntimeError('Invalid response from the MDS app:\n' + output) 166 167 # Obtain a list of bytes in hex format from the response string 168 response_hex = response.group(0).split(' ') 169 170 def get_bool(pos): 171 """ Obtain a boolean variable from the byte array. """ 172 return response_hex[pos] == '01' 173 174 def get_int32(pos): 175 """ Obtain an int from the byte array. Bytes are printed in 176 little endian format.""" 177 return struct.unpack( 178 '<i', bytearray.fromhex(''.join(response_hex[pos:pos + 4])))[0] 179 180 rx_power = [] 181 RX_CHAINS = 4 182 183 for i in range(RX_CHAINS): 184 # Calculate starting position for the Rx chain data structure 185 start = 12 + i * 22 186 187 # The first byte in the data structure indicates if the rx chain is 188 # valid. 189 if get_bool(start): 190 rx_power.append(get_int32(start + 2) / 10) 191 else: 192 rx_power.append(None) 193 194 # Calculate the position for the tx chain data structure 195 tx_pos = 12 + RX_CHAINS * 22 196 197 tx_valid = get_bool(tx_pos) 198 if tx_valid: 199 tx_power = get_int32(tx_pos + 2) / -10 200 else: 201 tx_power = None 202 203 return rx_power, tx_power 204 205 206def get_telephony_signal_strength(ad): 207 #{'evdoEcio': -1, 'asuLevel': 28, 'lteSignalStrength': 14, 'gsmLevel': 0, 208 # 'cdmaAsuLevel': 99, 'evdoDbm': -120, 'gsmDbm': -1, 'cdmaEcio': -160, 209 # 'level': 2, 'lteLevel': 2, 'cdmaDbm': -120, 'dbm': -112, 'cdmaLevel': 0, 210 # 'lteAsuLevel': 28, 'gsmAsuLevel': 99, 'gsmBitErrorRate': 0, 211 # 'lteDbm': -112, 'gsmSignalStrength': 99} 212 try: 213 signal_strength = ad.droid.telephonyGetSignalStrength() 214 if not signal_strength: 215 signal_strength = {} 216 except Exception as e: 217 ad.log.error(e) 218 signal_strength = {} 219 return signal_strength 220 221 222def initiate_call(log, 223 ad, 224 callee_number, 225 emergency=False, 226 incall_ui_display=INCALL_UI_DISPLAY_FOREGROUND, 227 video=False): 228 """Make phone call from caller to callee. 229 230 Args: 231 log: log object. 232 ad: Caller android device object. 233 callee_number: Callee phone number. 234 emergency : specify the call is emergency. 235 Optional. Default value is False. 236 incall_ui_display: show the dialer UI foreground or background 237 video: whether to initiate as video call 238 239 Returns: 240 result: if phone call is placed successfully. 241 """ 242 ad.ed.clear_events(EVENT_CALL_STATE_CHANGED) 243 sub_id = get_outgoing_voice_sub_id(ad) 244 begin_time = get_device_epoch_time(ad) 245 ad.droid.telephonyStartTrackingCallStateForSubscription(sub_id) 246 try: 247 # Make a Call 248 ad.log.info("Make a phone call to %s", callee_number) 249 if emergency: 250 ad.droid.telecomCallEmergencyNumber(callee_number) 251 else: 252 ad.droid.telecomCallNumber(callee_number, video) 253 254 # Verify OFFHOOK state 255 if not wait_for_call_offhook_for_subscription( 256 log, ad, sub_id, event_tracking_started=True): 257 ad.log.info("sub_id %s not in call offhook state", sub_id) 258 last_call_drop_reason(ad, begin_time=begin_time) 259 return False 260 else: 261 return True 262 finally: 263 if hasattr(ad, "sdm_log") and getattr(ad, "sdm_log"): 264 ad.adb.shell("i2cset -fy 3 64 6 1 b", ignore_status=True) 265 ad.adb.shell("i2cset -fy 3 65 6 1 b", ignore_status=True) 266 ad.droid.telephonyStopTrackingCallStateChangeForSubscription(sub_id) 267 if incall_ui_display == INCALL_UI_DISPLAY_FOREGROUND: 268 ad.droid.telecomShowInCallScreen() 269 elif incall_ui_display == INCALL_UI_DISPLAY_BACKGROUND: 270 ad.droid.showHomeScreen() 271 272 273def is_event_match(event, field, value): 274 """Return if <field> in "event" match <value> or not. 275 276 Args: 277 event: event to test. This event need to have <field>. 278 field: field to match. 279 value: value to match. 280 281 Returns: 282 True if <field> in "event" match <value>. 283 False otherwise. 284 """ 285 return is_event_match_for_list(event, field, [value]) 286 287 288def is_event_match_for_list(event, field, value_list): 289 """Return if <field> in "event" match any one of the value 290 in "value_list" or not. 291 292 Args: 293 event: event to test. This event need to have <field>. 294 field: field to match. 295 value_list: a list of value to match. 296 297 Returns: 298 True if <field> in "event" match one of the value in "value_list". 299 False otherwise. 300 """ 301 try: 302 value_in_event = event['data'][field] 303 except KeyError: 304 return False 305 for value in value_list: 306 if value_in_event == value: 307 return True 308 return False 309 310 311def is_phone_in_call(log, ad): 312 """Return True if phone in call. 313 314 Args: 315 log: log object. 316 ad: android device. 317 """ 318 try: 319 return ad.droid.telecomIsInCall() 320 except: 321 return "mCallState=2" in ad.adb.shell( 322 "dumpsys telephony.registry | grep mCallState") 323 324 325def last_call_drop_reason(ad, begin_time=None): 326 reasons = ad.search_logcat( 327 "qcril_qmi_voice_map_qmi_to_ril_last_call_failure_cause", begin_time) 328 reason_string = "" 329 if reasons: 330 log_msg = "Logcat call drop reasons:" 331 for reason in reasons: 332 log_msg = "%s\n\t%s" % (log_msg, reason["log_message"]) 333 if "ril reason str" in reason["log_message"]: 334 reason_string = reason["log_message"].split(":")[-1].strip() 335 ad.log.info(log_msg) 336 reasons = ad.search_logcat("ACTION_FORBIDDEN_NO_SERVICE_AUTHORIZATION", 337 begin_time) 338 if reasons: 339 ad.log.warning("ACTION_FORBIDDEN_NO_SERVICE_AUTHORIZATION is seen") 340 ad.log.info("last call dumpsys: %s", 341 sorted(dumpsys_last_call_info(ad).items())) 342 return reason_string 343 344 345def toggle_airplane_mode(log, ad, new_state=None, strict_checking=True): 346 """ Toggle the state of airplane mode. 347 348 Args: 349 log: log handler. 350 ad: android_device object. 351 new_state: Airplane mode state to set to. 352 If None, opposite of the current state. 353 strict_checking: Whether to turn on strict checking that checks all features. 354 355 Returns: 356 result: True if operation succeed. False if error happens. 357 """ 358 if ad.skip_sl4a: 359 return toggle_airplane_mode_by_adb(log, ad, new_state) 360 else: 361 return toggle_airplane_mode_msim( 362 log, ad, new_state, strict_checking=strict_checking) 363 364 365def toggle_airplane_mode_by_adb(log, ad, new_state=None): 366 """ Toggle the state of airplane mode. 367 368 Args: 369 log: log handler. 370 ad: android_device object. 371 new_state: Airplane mode state to set to. 372 If None, opposite of the current state. 373 374 Returns: 375 result: True if operation succeed. False if error happens. 376 """ 377 cur_state = bool(int(ad.adb.shell("settings get global airplane_mode_on"))) 378 if new_state == cur_state: 379 ad.log.info("Airplane mode already in %s", new_state) 380 return True 381 elif new_state is None: 382 new_state = not cur_state 383 ad.log.info("Change airplane mode from %s to %s", cur_state, new_state) 384 try: 385 ad.adb.shell("settings put global airplane_mode_on %s" % int(new_state)) 386 ad.adb.shell("am broadcast -a android.intent.action.AIRPLANE_MODE") 387 except Exception as e: 388 ad.log.error(e) 389 return False 390 changed_state = bool(int(ad.adb.shell("settings get global airplane_mode_on"))) 391 return changed_state == new_state 392 393 394def toggle_airplane_mode_msim(log, ad, new_state=None, strict_checking=True): 395 """ Toggle the state of airplane mode. 396 397 Args: 398 log: log handler. 399 ad: android_device object. 400 new_state: Airplane mode state to set to. 401 If None, opposite of the current state. 402 strict_checking: Whether to turn on strict checking that checks all features. 403 404 Returns: 405 result: True if operation succeed. False if error happens. 406 """ 407 408 cur_state = ad.droid.connectivityCheckAirplaneMode() 409 if cur_state == new_state: 410 ad.log.info("Airplane mode already in %s", new_state) 411 return True 412 elif new_state is None: 413 new_state = not cur_state 414 ad.log.info("Toggle APM mode, from current tate %s to %s", cur_state, 415 new_state) 416 sub_id_list = [] 417 active_sub_info = ad.droid.subscriptionGetAllSubInfoList() 418 if active_sub_info: 419 for info in active_sub_info: 420 sub_id_list.append(info['subscriptionId']) 421 422 ad.ed.clear_all_events() 423 time.sleep(0.1) 424 service_state_list = [] 425 if new_state: 426 service_state_list.append(SERVICE_STATE_POWER_OFF) 427 ad.log.info("Turn on airplane mode") 428 429 else: 430 # If either one of these 3 events show up, it should be OK. 431 # Normal SIM, phone in service 432 service_state_list.append(SERVICE_STATE_IN_SERVICE) 433 # NO SIM, or Dead SIM, or no Roaming coverage. 434 service_state_list.append(SERVICE_STATE_OUT_OF_SERVICE) 435 service_state_list.append(SERVICE_STATE_EMERGENCY_ONLY) 436 ad.log.info("Turn off airplane mode") 437 438 for sub_id in sub_id_list: 439 ad.droid.telephonyStartTrackingServiceStateChangeForSubscription( 440 sub_id) 441 442 timeout_time = time.time() + MAX_WAIT_TIME_AIRPLANEMODE_EVENT 443 ad.droid.connectivityToggleAirplaneMode(new_state) 444 445 try: 446 try: 447 event = ad.ed.wait_for_event( 448 EVENT_SERVICE_STATE_CHANGED, 449 is_event_match_for_list, 450 timeout=MAX_WAIT_TIME_AIRPLANEMODE_EVENT, 451 field=ServiceStateContainer.SERVICE_STATE, 452 value_list=service_state_list) 453 ad.log.info("Got event %s", event) 454 except Empty: 455 ad.log.warning("Did not get expected service state change to %s", 456 service_state_list) 457 finally: 458 for sub_id in sub_id_list: 459 ad.droid.telephonyStopTrackingServiceStateChangeForSubscription( 460 sub_id) 461 except Exception as e: 462 ad.log.error(e) 463 464 # APM on (new_state=True) will turn off bluetooth but may not turn it on 465 try: 466 if new_state and not _wait_for_bluetooth_in_state( 467 log, ad, False, timeout_time - time.time()): 468 ad.log.error( 469 "Failed waiting for bluetooth during airplane mode toggle") 470 if strict_checking: return False 471 except Exception as e: 472 ad.log.error("Failed to check bluetooth state due to %s", e) 473 if strict_checking: 474 raise 475 476 # APM on (new_state=True) will turn off wifi but may not turn it on 477 if new_state and not _wait_for_wifi_in_state(log, ad, False, 478 timeout_time - time.time()): 479 ad.log.error("Failed waiting for wifi during airplane mode toggle on") 480 if strict_checking: return False 481 482 if ad.droid.connectivityCheckAirplaneMode() != new_state: 483 ad.log.error("Set airplane mode to %s failed", new_state) 484 return False 485 return True 486 487 488def toggle_cell_data_roaming(ad, state): 489 """Enable cell data roaming for default data subscription. 490 491 Wait for the data roaming status to be DATA_STATE_CONNECTED 492 or DATA_STATE_DISCONNECTED. 493 494 Args: 495 ad: Android Device Object. 496 state: True or False for enable or disable cell data roaming. 497 498 Returns: 499 True if success. 500 False if failed. 501 """ 502 state_int = {True: DATA_ROAMING_ENABLE, False: DATA_ROAMING_DISABLE}[state] 503 action_str = {True: "Enable", False: "Disable"}[state] 504 if ad.droid.connectivityCheckDataRoamingMode() == state: 505 ad.log.info("Data roaming is already in state %s", state) 506 return True 507 if not ad.droid.connectivitySetDataRoaming(state_int): 508 ad.error.info("Fail to config data roaming into state %s", state) 509 return False 510 if ad.droid.connectivityCheckDataRoamingMode() == state: 511 ad.log.info("Data roaming is configured into state %s", state) 512 return True 513 else: 514 ad.log.error("Data roaming is not configured into state %s", state) 515 return False 516 517 518def wait_for_call_offhook_event( 519 log, 520 ad, 521 sub_id, 522 event_tracking_started=False, 523 timeout=MAX_WAIT_TIME_ACCEPT_CALL_TO_OFFHOOK_EVENT): 524 """Wait for an incoming call on specified subscription. 525 526 Args: 527 log: log object. 528 ad: android device object. 529 event_tracking_started: True if event tracking already state outside 530 timeout: time to wait for event 531 532 Returns: 533 True: if call offhook event is received. 534 False: if call offhook event is not received. 535 """ 536 if not event_tracking_started: 537 ad.ed.clear_events(EVENT_CALL_STATE_CHANGED) 538 ad.droid.telephonyStartTrackingCallStateForSubscription(sub_id) 539 try: 540 ad.ed.wait_for_event( 541 EVENT_CALL_STATE_CHANGED, 542 is_event_match, 543 timeout=timeout, 544 field=CallStateContainer.CALL_STATE, 545 value=TELEPHONY_STATE_OFFHOOK) 546 ad.log.info("Got event %s", TELEPHONY_STATE_OFFHOOK) 547 except Empty: 548 ad.log.info("No event for call state change to OFFHOOK") 549 return False 550 finally: 551 if not event_tracking_started: 552 ad.droid.telephonyStopTrackingCallStateChangeForSubscription( 553 sub_id) 554 return True 555 556 557def wait_for_call_offhook_for_subscription( 558 log, 559 ad, 560 sub_id, 561 event_tracking_started=False, 562 timeout=MAX_WAIT_TIME_ACCEPT_CALL_TO_OFFHOOK_EVENT, 563 interval=WAIT_TIME_BETWEEN_STATE_CHECK): 564 """Wait for an incoming call on specified subscription. 565 566 Args: 567 log: log object. 568 ad: android device object. 569 sub_id: subscription ID 570 timeout: time to wait for ring 571 interval: checking interval 572 573 Returns: 574 True: if incoming call is received and answered successfully. 575 False: for errors 576 """ 577 if not event_tracking_started: 578 ad.ed.clear_events(EVENT_CALL_STATE_CHANGED) 579 ad.droid.telephonyStartTrackingCallStateForSubscription(sub_id) 580 offhook_event_received = False 581 end_time = time.time() + timeout 582 try: 583 while time.time() < end_time: 584 if not offhook_event_received: 585 if wait_for_call_offhook_event(log, ad, sub_id, True, 586 interval): 587 offhook_event_received = True 588 telephony_state = ad.droid.telephonyGetCallStateForSubscription( 589 sub_id) 590 telecom_state = ad.droid.telecomGetCallState() 591 if telephony_state == TELEPHONY_STATE_OFFHOOK and ( 592 telecom_state == TELEPHONY_STATE_OFFHOOK): 593 ad.log.info("telephony and telecom are in OFFHOOK state") 594 return True 595 else: 596 ad.log.info( 597 "telephony in %s, telecom in %s, expecting OFFHOOK state", 598 telephony_state, telecom_state) 599 if offhook_event_received: 600 time.sleep(interval) 601 finally: 602 if not event_tracking_started: 603 ad.droid.telephonyStopTrackingCallStateChangeForSubscription( 604 sub_id) 605 606 607def _wait_for_bluetooth_in_state(log, ad, state, max_wait): 608 # FIXME: These event names should be defined in a common location 609 _BLUETOOTH_STATE_ON_EVENT = 'BluetoothStateChangedOn' 610 _BLUETOOTH_STATE_OFF_EVENT = 'BluetoothStateChangedOff' 611 ad.ed.clear_events(_BLUETOOTH_STATE_ON_EVENT) 612 ad.ed.clear_events(_BLUETOOTH_STATE_OFF_EVENT) 613 614 ad.droid.bluetoothStartListeningForAdapterStateChange() 615 try: 616 bt_state = ad.droid.bluetoothCheckState() 617 if bt_state == state: 618 return True 619 if max_wait <= 0: 620 ad.log.error("Time out: bluetooth state still %s, expecting %s", 621 bt_state, state) 622 return False 623 624 event = { 625 False: _BLUETOOTH_STATE_OFF_EVENT, 626 True: _BLUETOOTH_STATE_ON_EVENT 627 }[state] 628 event = ad.ed.pop_event(event, max_wait) 629 ad.log.info("Got event %s", event['name']) 630 return True 631 except Empty: 632 ad.log.error("Time out: bluetooth state still in %s, expecting %s", 633 bt_state, state) 634 return False 635 finally: 636 ad.droid.bluetoothStopListeningForAdapterStateChange() 637 638 639def wait_for_droid_in_call(log, ad, max_time): 640 """Wait for android to be in call state. 641 642 Args: 643 log: log object. 644 ad: android device. 645 max_time: maximal wait time. 646 647 Returns: 648 If phone become in call state within max_time, return True. 649 Return False if timeout. 650 """ 651 return _wait_for_droid_in_state(log, ad, max_time, is_phone_in_call) 652 653 654def _wait_for_droid_in_state(log, ad, max_time, state_check_func, *args, 655 **kwargs): 656 while max_time >= 0: 657 if state_check_func(log, ad, *args, **kwargs): 658 return True 659 660 time.sleep(WAIT_TIME_BETWEEN_STATE_CHECK) 661 max_time -= WAIT_TIME_BETWEEN_STATE_CHECK 662 663 return False 664 665 666# TODO: replace this with an event-based function 667def _wait_for_wifi_in_state(log, ad, state, max_wait): 668 return _wait_for_droid_in_state(log, ad, max_wait, 669 lambda log, ad, state: \ 670 (True if ad.droid.wifiCheckState() == state else False), 671 state) 672