1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import time
7
8from autotest_lib.client.bin import utils
9from autotest_lib.client.common_lib import error
10from autotest_lib.server import test
11
12_MODEM_WAIT_DELAY = 120
13
14NO_MODEM_STATE_AVAILABLE = 'FAILED TO GET MODEM STATE'
15
16# TODO(harpreet / benchan): Modify the modem script to report modem health.
17# crbug.com/352351
18MM_MODEM_STATE_FAILED = '-1'
19MM_MODEM_STATE_UNKNOWN = '0'
20MM_MODEM_STATE_INITIALIZING = '1'
21MM_MODEM_STATE_LOCKED = '2'
22MM_MODEM_STATE_DISABLED = '3'
23MM_MODEM_STATE_DISABLING = '4'
24MM_MODEM_STATE_ENABLING = '5'
25MM_MODEM_STATE_ENABLED = '6'
26MM_MODEM_STATE_SEARCHING = '7'
27MM_MODEM_STATE_REGISTERED = '8'
28MM_MODEM_STATE_DISCONNECTING = '9'
29MM_MODEM_STATE_CONNECTING = '10'
30MM_MODEM_STATE_CONNECTED = '11'
31
32GOBI_MODEM_STATE_UNKNOWN = '0'
33GOBI_MODEM_STATE_DISABLED = '10'
34GOBI_MODEM_STATE_DISABLING = '20'
35GOBI_MODEM_STATE_ENABLING = '30'
36GOBI_MODEM_STATE_ENABLED = '40'
37GOBI_MODEM_STATE_SEARCHING = '50'
38GOBI_MODEM_STATE_REGISTERED = '60'
39GOBI_MODEM_STATE_DISCONNECTING = '70'
40GOBI_MODEM_STATE_CONNECTING = '80'
41GOBI_MODEM_STATE_CONNECTED = '90'
42
43ENABLED_MODEM_STATES = [
44    MM_MODEM_STATE_ENABLED,
45    GOBI_MODEM_STATE_ENABLED
46]
47
48MM_STABLE_MODEM_STATES = [
49    MM_MODEM_STATE_DISABLED,
50    MM_MODEM_STATE_REGISTERED,
51    MM_MODEM_STATE_CONNECTED,
52]
53
54GOBI_STABLE_MODEM_STATES = [
55    GOBI_MODEM_STATE_DISABLED,
56    GOBI_MODEM_STATE_REGISTERED,
57    GOBI_MODEM_STATE_CONNECTED
58]
59
60class cellular_StaleModemReboot(test.test):
61    """
62    Uses servo to cold reboot the device if modem is not available or is not in
63    testable state.
64
65    The test attempts to get modem status by running the 'modem status' command
66    on the DUT. If it is unsuccessful in getting the modem status or the modem
67    is in a bad state, it will try to reboot the DUT.
68
69    """
70
71    version = 1
72
73    def _modem_state_to_string(self, state, is_gobi):
74        """Takes the numerical modem state and returns associated state name.
75
76        @param state: The state of the modem on the device.
77        @param is_gobi: True if the device has a gobi modem.
78        @return MODEM_STATE_STRINGS as the actual name of the state associated
79                with the numeric value.
80
81        """
82        if not state:
83            return NO_MODEM_STATE_AVAILABLE
84
85        if is_gobi:
86            MODEM_STATE_STRINGS = [
87                'UNKNOWN',
88                'DISABLED',
89                'DISABLING',
90                'ENABLING',
91                'ENABLED',
92                'SEARCHING',
93                'REGISTERED',
94                'DISCONNECTING',
95                'CONNECTING',
96                'CONNECTED'
97            ]
98            return MODEM_STATE_STRINGS[int(state[:1])]
99
100        MODEM_STATE_STRINGS = [
101            'FAILED',
102            'UNKNOWN',
103            'INITIALIZING',
104            'LOCKED',
105            'DISABLED',
106            'DISABLING',
107            'ENABLING',
108            'ENABLED',
109            'SEARCHING',
110            'REGISTERED',
111            'DISCONNECTING',
112            'CONNECTING',
113            'CONNECTED'
114        ]
115        return MODEM_STATE_STRINGS[int(state) + 1]
116
117
118    def _format_modem_status(self, modem_status):
119        """Formats the modem status data and inserts it into a dictionary.
120
121        @param modem_status: Command line output of 'modem status'.
122        @return modem status dictionary
123
124        """
125
126        modem_state = ''
127        modem_status_dict = {}
128
129        if not modem_status:
130            return None
131
132        lines = modem_status.splitlines()
133
134        for item in lines:
135            columns = item.split(':')
136            columns = [x.strip() for x in columns]
137            if len(columns) > 1:
138                modem_status_dict[columns[0]] = columns[1]
139            else:
140                modem_status_dict[columns[0]] = ''
141
142        return modem_status_dict
143
144
145    def _get_modem_status(self):
146        """Gets the status of the modem by running 'modem status' command.
147
148        @return modem_status_dict: is the dictionary of all the lines retuned
149                by modem status command.
150
151        """
152        try:
153            modem_status = self._client.run('modem status').stdout.strip()
154            modem_status_dict = self._format_modem_status(modem_status)
155            return modem_status_dict
156        except error.AutoservRunError as e:
157            logging.debug("AutoservRunError is: %s", e)
158            return None
159
160
161    def _get_modem_state(self):
162        modem_status_dict = self._get_modem_status()
163
164        if not modem_status_dict:
165            return None
166
167        return modem_status_dict.get('State')
168
169
170    def _cold_reset_dut(self, boot_id):
171        self._servo.get_power_state_controller().power_off()
172        self._servo.get_power_state_controller().power_on()
173        time.sleep(self._servo.BOOT_DELAY)
174        self._client.wait_for_restart(old_boot_id=boot_id)
175        self._wait_for_stable_modem_state()
176
177
178    def _wait_for_stable_modem_state(self):
179        """
180        Wait for a maximum of _MODEM_WAIT_DELAY seconds for the modem to get
181        into stable state. Also, because we do not want the test to terminate
182        in case there is an exception, we are catching and logging the exception
183        so the test can continue with rebooting the device again as needed.
184
185        """
186        try:
187            utils.poll_for_condition(
188                  lambda: self._get_modem_state() in self.STABLE_MODEM_STATES,
189                  exception=utils.TimeoutError('Modem not in stable state '
190                                               'after %s seconds.' %
191                                               _MODEM_WAIT_DELAY),
192                  timeout=_MODEM_WAIT_DELAY,
193                  sleep_interval=5)
194        except utils.TimeoutError as e:
195            logging.debug("Stable modem state TimeoutError is: %s", e)
196
197
198    def run_once(self, host, tries=2, expect_auto_registration=True):
199        """
200        Runs the test.
201
202        @param host: A host object representing the DUT.
203        @param tries: Maximum number of times test will try to reboot the DUT.
204                Default number of tries is 2, which is set in the control file.
205        @param expect_auto_registration: To be used with an exceptional modem
206                that does not auto-register by passing False from modem specific
207                control file.
208        @raise error.TestFail if modem cannot be brought to a testable stated.
209
210        """
211
212        self._client = host
213        self._servo = host.servo
214
215        if not self._servo:
216            logging.info('Host %s does not have a servo.', host.hostname)
217            return
218
219        self.STABLE_MODEM_STATES = MM_STABLE_MODEM_STATES
220        gobi = False
221        if 'gobi' in self._client.run('modem status').stdout.strip().lower():
222            self.STABLE_MODEM_STATES = GOBI_STABLE_MODEM_STATES
223            gobi = True
224
225        if not expect_auto_registration:
226            self.STABLE_MODEM_STATES.extend(ENABLED_MODEM_STATES)
227
228        original_modem_state = self._get_modem_state()
229
230        logging.info('Modem state before reboot on host %s: %s',
231                     host.hostname,
232                     self._modem_state_to_string(original_modem_state, gobi))
233
234        boot_id = self._client.get_boot_id()
235
236        num_tries = 0
237
238        while True:
239            self._cold_reset_dut(boot_id)
240            new_modem_state = self._get_modem_state()
241            if new_modem_state in self.STABLE_MODEM_STATES:
242                logging.info('Modem is in testable state: %s',
243                             self._modem_state_to_string(new_modem_state, gobi))
244                break
245            if new_modem_state == MM_MODEM_STATE_LOCKED:
246                raise error.TestFail('Modem in locked state.')
247            if num_tries == tries:
248                logging.info('Modem still in bad state after %s reboot tries '
249                             'on host %s. Modem state: %s ',
250                             tries+1, host.hostname,
251                             self._modem_state_to_string(new_modem_state, gobi))
252                raise error.TestFail('Modem is not in testable state')
253            num_tries += 1
254