1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17"""Generic telephony utility functions. Cloned from test_utils.tel."""
18
19import re
20import struct
21import time
22from queue import Empty
23
24from acts.logger import epoch_to_log_line_timestamp
25from acts.controllers.adb_lib.error import AdbCommandError
26
27INCALL_UI_DISPLAY_FOREGROUND = "foreground"
28INCALL_UI_DISPLAY_BACKGROUND = "background"
29INCALL_UI_DISPLAY_DEFAULT = "default"
30
31# Max time to wait after caller make a call and before
32# callee start ringing
33MAX_WAIT_TIME_ACCEPT_CALL_TO_OFFHOOK_EVENT = 30
34
35# Max time to wait after toggle airplane mode and before
36# get expected event
37MAX_WAIT_TIME_AIRPLANEMODE_EVENT = 90
38
39# Wait time between state check retry
40WAIT_TIME_BETWEEN_STATE_CHECK = 5
41
42# Constant for Data Roaming State
43DATA_ROAMING_ENABLE = 1
44DATA_ROAMING_DISABLE = 0
45
46# Constant for Telephony Manager Call State
47TELEPHONY_STATE_RINGING = "RINGING"
48TELEPHONY_STATE_IDLE = "IDLE"
49TELEPHONY_STATE_OFFHOOK = "OFFHOOK"
50TELEPHONY_STATE_UNKNOWN = "UNKNOWN"
51
52# Constant for Service State
53SERVICE_STATE_EMERGENCY_ONLY = "EMERGENCY_ONLY"
54SERVICE_STATE_IN_SERVICE = "IN_SERVICE"
55SERVICE_STATE_OUT_OF_SERVICE = "OUT_OF_SERVICE"
56SERVICE_STATE_POWER_OFF = "POWER_OFF"
57SERVICE_STATE_UNKNOWN = "UNKNOWN"
58
59# Constant for Network Mode
60NETWORK_MODE_GSM_ONLY = "NETWORK_MODE_GSM_ONLY"
61NETWORK_MODE_WCDMA_ONLY = "NETWORK_MODE_WCDMA_ONLY"
62NETWORK_MODE_LTE_ONLY = "NETWORK_MODE_LTE_ONLY"
63
64# Constant for Events
65EVENT_CALL_STATE_CHANGED = "CallStateChanged"
66EVENT_SERVICE_STATE_CHANGED = "ServiceStateChanged"
67
68
69class CallStateContainer:
70    INCOMING_NUMBER = "incomingNumber"
71    SUBSCRIPTION_ID = "subscriptionId"
72    CALL_STATE = "callState"
73
74
75class ServiceStateContainer:
76    VOICE_REG_STATE = "voiceRegState"
77    VOICE_NETWORK_TYPE = "voiceNetworkType"
78    DATA_REG_STATE = "dataRegState"
79    DATA_NETWORK_TYPE = "dataNetworkType"
80    OPERATOR_NAME = "operatorName"
81    OPERATOR_ID = "operatorId"
82    IS_MANUAL_NW_SELECTION = "isManualNwSelection"
83    ROAMING = "roaming"
84    IS_EMERGENCY_ONLY = "isEmergencyOnly"
85    NETWORK_ID = "networkId"
86    SYSTEM_ID = "systemId"
87    SUBSCRIPTION_ID = "subscriptionId"
88    SERVICE_STATE = "serviceState"
89
90
91def dumpsys_last_call_info(ad):
92    """ Get call information by dumpsys telecom. """
93    num = dumpsys_last_call_number(ad)
94    output = ad.adb.shell("dumpsys telecom")
95    result = re.search(r"Call TC@%s: {(.*?)}" % num, output, re.DOTALL)
96    call_info = {"TC": num}
97    if result:
98        result = result.group(1)
99        for attr in ("startTime", "endTime", "direction", "isInterrupted",
100                     "callTechnologies", "callTerminationsReason",
101                     "isVideoCall", "callProperties"):
102            match = re.search(r"%s: (.*)" % attr, result)
103            if match:
104                if attr in ("startTime", "endTime"):
105                    call_info[attr] = epoch_to_log_line_timestamp(
106                        int(match.group(1)))
107                else:
108                    call_info[attr] = match.group(1)
109    ad.log.debug("call_info = %s", call_info)
110    return call_info
111
112
113def dumpsys_last_call_number(ad):
114    output = ad.adb.shell("dumpsys telecom")
115    call_nums = re.findall("Call TC@(\d+):", output)
116    if not call_nums:
117        return 0
118    else:
119        return int(call_nums[-1])
120
121
122def get_device_epoch_time(ad):
123    return int(1000 * float(ad.adb.shell("date +%s.%N")))
124
125
126def get_outgoing_voice_sub_id(ad):
127    """ Get outgoing voice subscription id
128    """
129    if hasattr(ad, "outgoing_voice_sub_id"):
130        return ad.outgoing_voice_sub_id
131    else:
132        return ad.droid.subscriptionGetDefaultVoiceSubId()
133
134
135def get_rx_tx_power_levels(log, ad):
136    """ Obtains Rx and Tx power levels from the MDS application.
137
138    The method requires the MDS app to be installed in the DUT.
139
140    Args:
141        log: logger object
142        ad: an android device
143
144    Return:
145        A tuple where the first element is an array array with the RSRP value
146        in Rx chain, and the second element is the transmitted power in dBm.
147        Values for invalid Rx / Tx chains are set to None.
148    """
149    cmd = ('am instrument -w -e request "80 00 e8 03 00 08 00 00 00" -e '
150           'response wait "com.google.mdstest/com.google.mdstest.instrument.'
151           'ModemCommandInstrumentation"')
152    try:
153        output = ad.adb.shell(cmd)
154    except AdbCommandError as e:
155        log.error(e)
156        output = None
157
158    if not output or 'result=SUCCESS' not in output:
159        raise RuntimeError('Could not obtain Tx/Rx power levels from MDS. Is '
160                           'the MDS app installed?')
161
162    response = re.search(r"(?<=response=).+", output)
163
164    if not response:
165        raise RuntimeError('Invalid response from the MDS app:\n' + output)
166
167    # Obtain a list of bytes in hex format from the response string
168    response_hex = response.group(0).split(' ')
169
170    def get_bool(pos):
171        """ Obtain a boolean variable from the byte array. """
172        return response_hex[pos] == '01'
173
174    def get_int32(pos):
175        """ Obtain an int from the byte array. Bytes are printed in
176        little endian format."""
177        return struct.unpack(
178            '<i', bytearray.fromhex(''.join(response_hex[pos:pos + 4])))[0]
179
180    rx_power = []
181    RX_CHAINS = 4
182
183    for i in range(RX_CHAINS):
184        # Calculate starting position for the Rx chain data structure
185        start = 12 + i * 22
186
187        # The first byte in the data structure indicates if the rx chain is
188        # valid.
189        if get_bool(start):
190            rx_power.append(get_int32(start + 2) / 10)
191        else:
192            rx_power.append(None)
193
194    # Calculate the position for the tx chain data structure
195    tx_pos = 12 + RX_CHAINS * 22
196
197    tx_valid = get_bool(tx_pos)
198    if tx_valid:
199        tx_power = get_int32(tx_pos + 2) / -10
200    else:
201        tx_power = None
202
203    return rx_power, tx_power
204
205
206def get_telephony_signal_strength(ad):
207    #{'evdoEcio': -1, 'asuLevel': 28, 'lteSignalStrength': 14, 'gsmLevel': 0,
208    # 'cdmaAsuLevel': 99, 'evdoDbm': -120, 'gsmDbm': -1, 'cdmaEcio': -160,
209    # 'level': 2, 'lteLevel': 2, 'cdmaDbm': -120, 'dbm': -112, 'cdmaLevel': 0,
210    # 'lteAsuLevel': 28, 'gsmAsuLevel': 99, 'gsmBitErrorRate': 0,
211    # 'lteDbm': -112, 'gsmSignalStrength': 99}
212    try:
213        signal_strength = ad.droid.telephonyGetSignalStrength()
214        if not signal_strength:
215            signal_strength = {}
216    except Exception as e:
217        ad.log.error(e)
218        signal_strength = {}
219    return signal_strength
220
221
222def initiate_call(log,
223                  ad,
224                  callee_number,
225                  emergency=False,
226                  incall_ui_display=INCALL_UI_DISPLAY_FOREGROUND,
227                  video=False):
228    """Make phone call from caller to callee.
229
230    Args:
231        log: log object.
232        ad: Caller android device object.
233        callee_number: Callee phone number.
234        emergency : specify the call is emergency.
235            Optional. Default value is False.
236        incall_ui_display: show the dialer UI foreground or background
237        video: whether to initiate as video call
238
239    Returns:
240        result: if phone call is placed successfully.
241    """
242    ad.ed.clear_events(EVENT_CALL_STATE_CHANGED)
243    sub_id = get_outgoing_voice_sub_id(ad)
244    begin_time = get_device_epoch_time(ad)
245    ad.droid.telephonyStartTrackingCallStateForSubscription(sub_id)
246    try:
247        # Make a Call
248        ad.log.info("Make a phone call to %s", callee_number)
249        if emergency:
250            ad.droid.telecomCallEmergencyNumber(callee_number)
251        else:
252            ad.droid.telecomCallNumber(callee_number, video)
253
254        # Verify OFFHOOK state
255        if not wait_for_call_offhook_for_subscription(
256                log, ad, sub_id, event_tracking_started=True):
257            ad.log.info("sub_id %s not in call offhook state", sub_id)
258            last_call_drop_reason(ad, begin_time=begin_time)
259            return False
260        else:
261            return True
262    finally:
263        if hasattr(ad, "sdm_log") and getattr(ad, "sdm_log"):
264            ad.adb.shell("i2cset -fy 3 64 6 1 b", ignore_status=True)
265            ad.adb.shell("i2cset -fy 3 65 6 1 b", ignore_status=True)
266        ad.droid.telephonyStopTrackingCallStateChangeForSubscription(sub_id)
267        if incall_ui_display == INCALL_UI_DISPLAY_FOREGROUND:
268            ad.droid.telecomShowInCallScreen()
269        elif incall_ui_display == INCALL_UI_DISPLAY_BACKGROUND:
270            ad.droid.showHomeScreen()
271
272
273def is_event_match(event, field, value):
274    """Return if <field> in "event" match <value> or not.
275
276    Args:
277        event: event to test. This event need to have <field>.
278        field: field to match.
279        value: value to match.
280
281    Returns:
282        True if <field> in "event" match <value>.
283        False otherwise.
284    """
285    return is_event_match_for_list(event, field, [value])
286
287
288def is_event_match_for_list(event, field, value_list):
289    """Return if <field> in "event" match any one of the value
290        in "value_list" or not.
291
292    Args:
293        event: event to test. This event need to have <field>.
294        field: field to match.
295        value_list: a list of value to match.
296
297    Returns:
298        True if <field> in "event" match one of the value in "value_list".
299        False otherwise.
300    """
301    try:
302        value_in_event = event['data'][field]
303    except KeyError:
304        return False
305    for value in value_list:
306        if value_in_event == value:
307            return True
308    return False
309
310
311def is_phone_in_call(log, ad):
312    """Return True if phone in call.
313
314    Args:
315        log: log object.
316        ad:  android device.
317    """
318    try:
319        return ad.droid.telecomIsInCall()
320    except:
321        return "mCallState=2" in ad.adb.shell(
322            "dumpsys telephony.registry | grep mCallState")
323
324
325def last_call_drop_reason(ad, begin_time=None):
326    reasons = ad.search_logcat(
327        "qcril_qmi_voice_map_qmi_to_ril_last_call_failure_cause", begin_time)
328    reason_string = ""
329    if reasons:
330        log_msg = "Logcat call drop reasons:"
331        for reason in reasons:
332            log_msg = "%s\n\t%s" % (log_msg, reason["log_message"])
333            if "ril reason str" in reason["log_message"]:
334                reason_string = reason["log_message"].split(":")[-1].strip()
335        ad.log.info(log_msg)
336    reasons = ad.search_logcat("ACTION_FORBIDDEN_NO_SERVICE_AUTHORIZATION",
337                               begin_time)
338    if reasons:
339        ad.log.warning("ACTION_FORBIDDEN_NO_SERVICE_AUTHORIZATION is seen")
340    ad.log.info("last call dumpsys: %s",
341                sorted(dumpsys_last_call_info(ad).items()))
342    return reason_string
343
344
345def toggle_airplane_mode(log, ad, new_state=None, strict_checking=True):
346    """ Toggle the state of airplane mode.
347
348    Args:
349        log: log handler.
350        ad: android_device object.
351        new_state: Airplane mode state to set to.
352            If None, opposite of the current state.
353        strict_checking: Whether to turn on strict checking that checks all features.
354
355    Returns:
356        result: True if operation succeed. False if error happens.
357    """
358    if ad.skip_sl4a:
359        return toggle_airplane_mode_by_adb(log, ad, new_state)
360    else:
361        return toggle_airplane_mode_msim(
362            log, ad, new_state, strict_checking=strict_checking)
363
364
365def toggle_airplane_mode_by_adb(log, ad, new_state=None):
366    """ Toggle the state of airplane mode.
367
368    Args:
369        log: log handler.
370        ad: android_device object.
371        new_state: Airplane mode state to set to.
372            If None, opposite of the current state.
373
374    Returns:
375        result: True if operation succeed. False if error happens.
376    """
377    cur_state = bool(int(ad.adb.shell("settings get global airplane_mode_on")))
378    if new_state == cur_state:
379        ad.log.info("Airplane mode already in %s", new_state)
380        return True
381    elif new_state is None:
382        new_state = not cur_state
383    ad.log.info("Change airplane mode from %s to %s", cur_state, new_state)
384    try:
385        ad.adb.shell("settings put global airplane_mode_on %s" % int(new_state))
386        ad.adb.shell("am broadcast -a android.intent.action.AIRPLANE_MODE")
387    except Exception as e:
388        ad.log.error(e)
389        return False
390    changed_state = bool(int(ad.adb.shell("settings get global airplane_mode_on")))
391    return changed_state == new_state
392
393
394def toggle_airplane_mode_msim(log, ad, new_state=None, strict_checking=True):
395    """ Toggle the state of airplane mode.
396
397    Args:
398        log: log handler.
399        ad: android_device object.
400        new_state: Airplane mode state to set to.
401            If None, opposite of the current state.
402        strict_checking: Whether to turn on strict checking that checks all features.
403
404    Returns:
405        result: True if operation succeed. False if error happens.
406    """
407
408    cur_state = ad.droid.connectivityCheckAirplaneMode()
409    if cur_state == new_state:
410        ad.log.info("Airplane mode already in %s", new_state)
411        return True
412    elif new_state is None:
413        new_state = not cur_state
414        ad.log.info("Toggle APM mode, from current tate %s to %s", cur_state,
415                    new_state)
416    sub_id_list = []
417    active_sub_info = ad.droid.subscriptionGetAllSubInfoList()
418    if active_sub_info:
419        for info in active_sub_info:
420            sub_id_list.append(info['subscriptionId'])
421
422    ad.ed.clear_all_events()
423    time.sleep(0.1)
424    service_state_list = []
425    if new_state:
426        service_state_list.append(SERVICE_STATE_POWER_OFF)
427        ad.log.info("Turn on airplane mode")
428
429    else:
430        # If either one of these 3 events show up, it should be OK.
431        # Normal SIM, phone in service
432        service_state_list.append(SERVICE_STATE_IN_SERVICE)
433        # NO SIM, or Dead SIM, or no Roaming coverage.
434        service_state_list.append(SERVICE_STATE_OUT_OF_SERVICE)
435        service_state_list.append(SERVICE_STATE_EMERGENCY_ONLY)
436        ad.log.info("Turn off airplane mode")
437
438    for sub_id in sub_id_list:
439        ad.droid.telephonyStartTrackingServiceStateChangeForSubscription(
440            sub_id)
441
442    timeout_time = time.time() + MAX_WAIT_TIME_AIRPLANEMODE_EVENT
443    ad.droid.connectivityToggleAirplaneMode(new_state)
444
445    try:
446        try:
447            event = ad.ed.wait_for_event(
448                EVENT_SERVICE_STATE_CHANGED,
449                is_event_match_for_list,
450                timeout=MAX_WAIT_TIME_AIRPLANEMODE_EVENT,
451                field=ServiceStateContainer.SERVICE_STATE,
452                value_list=service_state_list)
453            ad.log.info("Got event %s", event)
454        except Empty:
455            ad.log.warning("Did not get expected service state change to %s",
456                           service_state_list)
457        finally:
458            for sub_id in sub_id_list:
459                ad.droid.telephonyStopTrackingServiceStateChangeForSubscription(
460                    sub_id)
461    except Exception as e:
462        ad.log.error(e)
463
464    # APM on (new_state=True) will turn off bluetooth but may not turn it on
465    try:
466        if new_state and not _wait_for_bluetooth_in_state(
467                log, ad, False, timeout_time - time.time()):
468            ad.log.error(
469                "Failed waiting for bluetooth during airplane mode toggle")
470            if strict_checking: return False
471    except Exception as e:
472        ad.log.error("Failed to check bluetooth state due to %s", e)
473        if strict_checking:
474            raise
475
476    # APM on (new_state=True) will turn off wifi but may not turn it on
477    if new_state and not _wait_for_wifi_in_state(log, ad, False,
478                                                 timeout_time - time.time()):
479        ad.log.error("Failed waiting for wifi during airplane mode toggle on")
480        if strict_checking: return False
481
482    if ad.droid.connectivityCheckAirplaneMode() != new_state:
483        ad.log.error("Set airplane mode to %s failed", new_state)
484        return False
485    return True
486
487
488def toggle_cell_data_roaming(ad, state):
489    """Enable cell data roaming for default data subscription.
490
491    Wait for the data roaming status to be DATA_STATE_CONNECTED
492        or DATA_STATE_DISCONNECTED.
493
494    Args:
495        ad: Android Device Object.
496        state: True or False for enable or disable cell data roaming.
497
498    Returns:
499        True if success.
500        False if failed.
501    """
502    state_int = {True: DATA_ROAMING_ENABLE, False: DATA_ROAMING_DISABLE}[state]
503    action_str = {True: "Enable", False: "Disable"}[state]
504    if ad.droid.connectivityCheckDataRoamingMode() == state:
505        ad.log.info("Data roaming is already in state %s", state)
506        return True
507    if not ad.droid.connectivitySetDataRoaming(state_int):
508        ad.error.info("Fail to config data roaming into state %s", state)
509        return False
510    if ad.droid.connectivityCheckDataRoamingMode() == state:
511        ad.log.info("Data roaming is configured into state %s", state)
512        return True
513    else:
514        ad.log.error("Data roaming is not configured into state %s", state)
515        return False
516
517
518def wait_for_call_offhook_event(
519        log,
520        ad,
521        sub_id,
522        event_tracking_started=False,
523        timeout=MAX_WAIT_TIME_ACCEPT_CALL_TO_OFFHOOK_EVENT):
524    """Wait for an incoming call on specified subscription.
525
526    Args:
527        log: log object.
528        ad: android device object.
529        event_tracking_started: True if event tracking already state outside
530        timeout: time to wait for event
531
532    Returns:
533        True: if call offhook event is received.
534        False: if call offhook event is not received.
535    """
536    if not event_tracking_started:
537        ad.ed.clear_events(EVENT_CALL_STATE_CHANGED)
538        ad.droid.telephonyStartTrackingCallStateForSubscription(sub_id)
539    try:
540        ad.ed.wait_for_event(
541            EVENT_CALL_STATE_CHANGED,
542            is_event_match,
543            timeout=timeout,
544            field=CallStateContainer.CALL_STATE,
545            value=TELEPHONY_STATE_OFFHOOK)
546        ad.log.info("Got event %s", TELEPHONY_STATE_OFFHOOK)
547    except Empty:
548        ad.log.info("No event for call state change to OFFHOOK")
549        return False
550    finally:
551        if not event_tracking_started:
552            ad.droid.telephonyStopTrackingCallStateChangeForSubscription(
553                sub_id)
554    return True
555
556
557def wait_for_call_offhook_for_subscription(
558        log,
559        ad,
560        sub_id,
561        event_tracking_started=False,
562        timeout=MAX_WAIT_TIME_ACCEPT_CALL_TO_OFFHOOK_EVENT,
563        interval=WAIT_TIME_BETWEEN_STATE_CHECK):
564    """Wait for an incoming call on specified subscription.
565
566    Args:
567        log: log object.
568        ad: android device object.
569        sub_id: subscription ID
570        timeout: time to wait for ring
571        interval: checking interval
572
573    Returns:
574        True: if incoming call is received and answered successfully.
575        False: for errors
576    """
577    if not event_tracking_started:
578        ad.ed.clear_events(EVENT_CALL_STATE_CHANGED)
579        ad.droid.telephonyStartTrackingCallStateForSubscription(sub_id)
580    offhook_event_received = False
581    end_time = time.time() + timeout
582    try:
583        while time.time() < end_time:
584            if not offhook_event_received:
585                if wait_for_call_offhook_event(log, ad, sub_id, True,
586                                               interval):
587                    offhook_event_received = True
588            telephony_state = ad.droid.telephonyGetCallStateForSubscription(
589                sub_id)
590            telecom_state = ad.droid.telecomGetCallState()
591            if telephony_state == TELEPHONY_STATE_OFFHOOK and (
592                    telecom_state == TELEPHONY_STATE_OFFHOOK):
593                ad.log.info("telephony and telecom are in OFFHOOK state")
594                return True
595            else:
596                ad.log.info(
597                    "telephony in %s, telecom in %s, expecting OFFHOOK state",
598                    telephony_state, telecom_state)
599            if offhook_event_received:
600                time.sleep(interval)
601    finally:
602        if not event_tracking_started:
603            ad.droid.telephonyStopTrackingCallStateChangeForSubscription(
604                sub_id)
605
606
607def _wait_for_bluetooth_in_state(log, ad, state, max_wait):
608    # FIXME: These event names should be defined in a common location
609    _BLUETOOTH_STATE_ON_EVENT = 'BluetoothStateChangedOn'
610    _BLUETOOTH_STATE_OFF_EVENT = 'BluetoothStateChangedOff'
611    ad.ed.clear_events(_BLUETOOTH_STATE_ON_EVENT)
612    ad.ed.clear_events(_BLUETOOTH_STATE_OFF_EVENT)
613
614    ad.droid.bluetoothStartListeningForAdapterStateChange()
615    try:
616        bt_state = ad.droid.bluetoothCheckState()
617        if bt_state == state:
618            return True
619        if max_wait <= 0:
620            ad.log.error("Time out: bluetooth state still %s, expecting %s",
621                         bt_state, state)
622            return False
623
624        event = {
625            False: _BLUETOOTH_STATE_OFF_EVENT,
626            True: _BLUETOOTH_STATE_ON_EVENT
627        }[state]
628        event = ad.ed.pop_event(event, max_wait)
629        ad.log.info("Got event %s", event['name'])
630        return True
631    except Empty:
632        ad.log.error("Time out: bluetooth state still in %s, expecting %s",
633                     bt_state, state)
634        return False
635    finally:
636        ad.droid.bluetoothStopListeningForAdapterStateChange()
637
638
639def wait_for_droid_in_call(log, ad, max_time):
640    """Wait for android to be in call state.
641
642    Args:
643        log: log object.
644        ad:  android device.
645        max_time: maximal wait time.
646
647    Returns:
648        If phone become in call state within max_time, return True.
649        Return False if timeout.
650    """
651    return _wait_for_droid_in_state(log, ad, max_time, is_phone_in_call)
652
653
654def _wait_for_droid_in_state(log, ad, max_time, state_check_func, *args,
655                             **kwargs):
656    while max_time >= 0:
657        if state_check_func(log, ad, *args, **kwargs):
658            return True
659
660        time.sleep(WAIT_TIME_BETWEEN_STATE_CHECK)
661        max_time -= WAIT_TIME_BETWEEN_STATE_CHECK
662
663    return False
664
665
666# TODO: replace this with an event-based function
667def _wait_for_wifi_in_state(log, ad, state, max_wait):
668    return _wait_for_droid_in_state(log, ad, max_wait,
669        lambda log, ad, state: \
670                (True if ad.droid.wifiCheckState() == state else False),
671                state)
672