1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import dbus
6import logging
7import os
8import time
9
10from autotest_lib.client.bin import test
11from autotest_lib.client.bin import utils
12from autotest_lib.client.common_lib import error
13from autotest_lib.client.cros.cellular import mm1_constants
14from autotest_lib.client.cros.cellular.pseudomodem import pm_constants
15from autotest_lib.client.cros.cellular.pseudomodem import pseudomodem_context
16from autotest_lib.client.cros.networking import cellular_proxy
17
18# Used for software message propagation latencies.
19SHORT_TIMEOUT_SECONDS = 2
20STATE_MACHINE_SCAN = 'ScanMachine'
21TEST_MODEMS_MODULE_PATH = os.path.join(os.path.dirname(__file__), 'files',
22                                       'modems.py')
23
24class cellular_ScanningProperty(test.test):
25    """
26    Test that the |Scanning| Property of the shill cellular device object is
27    updated correctly in the following two scenarios:
28      (1) When a user requests a cellular network scan using the |RequestScan|
29          method of the shill Manager interface.
30      (2) During the initial modem enable-register-connect sequence.
31
32    """
33    version = 1
34
35    def _find_mm_modem(self):
36        """
37        Find the modemmanager modem object.
38
39        Assumption: There is only one modem in the system.
40
41        @raises: TestError unless exactly one modem is found.
42
43        """
44        object_manager = dbus.Interface(
45                self._bus.get_object(mm1_constants.I_MODEM_MANAGER,
46                                     mm1_constants.MM1),
47                mm1_constants.I_OBJECT_MANAGER)
48        try:
49            modems = object_manager.GetManagedObjects()
50        except dbus.exceptions.DBusException as e:
51            raise error.TestFail('Failed to list the available modems. '
52                                 'DBus error: |%s|', repr(e))
53        if len(modems) != 1:
54            raise error.TestFail('Expected one modem object, found %d' %
55                                 len(modems))
56
57        modem_path = modems.keys()[0]
58        modem_object = self._bus.get_object(mm1_constants.I_MODEM_MANAGER,
59                                            modem_path)
60        # Check that this object is valid
61        try:
62            modem_object.GetAll(mm1_constants.I_MODEM,
63                                dbus_interface=mm1_constants.I_PROPERTIES)
64        except dbus.exceptions.DBusException as e:
65            raise error.TestFail('Failed to obtain dbus object for the modem '
66                                 'DBus error: |%s|', repr(e))
67
68        return dbus.Interface(modem_object, mm1_constants.I_MODEM)
69
70
71    def _check_mm_state(self, modem, states):
72        """
73        Verify that the modemmanager state is |state|.
74
75        @param modem: A DBus object for the modemmanager modem.
76        @param states: The expected state of the modem. This is either a single
77                state, or a list of states.
78        @raises: TestError if the state differs.
79        """
80        if not isinstance(states, list):
81            states = [states]
82        properties = modem.GetAll(mm1_constants.I_MODEM,
83                                  dbus_interface=mm1_constants.I_PROPERTIES)
84        actual_state = properties[mm1_constants.MM_MODEM_PROPERTY_NAME_STATE]
85        if actual_state not in states:
86            state_names = [mm1_constants.ModemStateToString(x) for x in states]
87            raise error.TestFail(
88                    'Expected modemmanager modem state to be one of %s but '
89                    'found %s' %
90                    (state_names,
91                     mm1_constants.ModemStateToString(actual_state)))
92
93
94    def _check_shill_property_update(self, cellular_device, property_name,
95                                     old_state, new_state):
96        """
97        Check the value of property of shill.
98
99        @param cellular_device: The DBus proxy object for the cellular device.
100        @param property_name: Name of the property to check.
101        @param old_state: old value of property.
102        @param new_state: new expected value of property.
103        @raises: TestError if the property fails to enter the given state.
104
105        """
106        # If we don't expect a change in the value, there is a race between this
107        # check and a possible (erronous) update of the value. Allow some time
108        # for the property to be updated before checking.
109        if old_state == new_state:
110            time.sleep(SHORT_TIMEOUT_SECONDS)
111            polling_timeout = 0
112        else:
113            polling_timeout = SHORT_TIMEOUT_SECONDS
114        success, _, _ = self._cellular_proxy.wait_for_property_in(
115                cellular_device,
116                property_name,
117                (new_state,),
118                timeout_seconds=polling_timeout)
119        if not success:
120            raise error.TestFail('Shill failed to set |%s| to %s.' %
121                                 (property_name, str(new_state)))
122
123
124    def _itesting_machine(self, machine_name, timeout=SHORT_TIMEOUT_SECONDS):
125        """
126        Get the testing interface of the given interactive state machine.
127
128        @param machine_name: The name of the interactive state machine.
129        @return dbus.Interface for the testing interface of
130                InteractiveScanningMachine, if found. None otherwise.
131        @raises utils.TimeoutError if a valid dbus object can't be found.
132
133        """
134        def _get_machine():
135            machine = self._bus.get_object(
136                    mm1_constants.I_MODEM_MANAGER,
137                    '/'.join([pm_constants.TESTING_PATH, machine_name]))
138            if machine:
139                i_machine = dbus.Interface(machine, pm_constants.I_TESTING_ISM)
140                # Only way to know if this DBus object is valid is to call a
141                # method on it.
142                try:
143                    i_machine.IsWaiting()  # Ignore result.
144                    return i_machine
145                except dbus.exceptions.DBusException as e:
146                    logging.debug(e)
147                    return None
148
149        utils.poll_for_condition(_get_machine, timeout=timeout)
150        return _get_machine()
151
152
153    def test_user_initiated_cellular_scan(self):
154        """
155        Test that the |RequestScan| DBus method exported by shill Manager
156        interfac correctly updates the cellular object |Scanning| property while
157        the scan is in progress.
158        """
159        with pseudomodem_context.PseudoModemManagerContext(
160                True,
161                {'test-module' : TEST_MODEMS_MODULE_PATH,
162                 'test-modem-class' : 'AsyncScanModem'}):
163            self._cellular_proxy = cellular_proxy.CellularProxy.get_proxy()
164            self._bus = dbus.SystemBus()
165            self._cellular_proxy.set_logging_for_cellular_test()
166
167            logging.info('Sanity check initial values')
168            utils.poll_for_condition(
169                    self._cellular_proxy.find_cellular_device_object,
170                    exception=error.TestFail(
171                            'Bad initial state: Failed to obtain a cellular '
172                            'device in pseudomodem context.'),
173                    timeout=SHORT_TIMEOUT_SECONDS)
174            device = self._cellular_proxy.find_cellular_device_object()
175            try:
176                self._itesting_machine(STATE_MACHINE_SCAN, 0)
177                raise error.TestFail('Bad initial state: scan machine created '
178                                     'by pseudomodem before scan is proposed.')
179            except utils.TimeoutError:
180                pass
181
182            self._check_shill_property_update(
183                    device,
184                    self._cellular_proxy.DEVICE_PROPERTY_SCANNING,
185                    False,
186                    False)
187
188            logging.info('Test actions and checks')
189            self._cellular_proxy.manager.RequestScan(
190                    self._cellular_proxy.TECHNOLOGY_CELLULAR)
191            try:
192                itesting_scan_machine = self._itesting_machine(
193                        STATE_MACHINE_SCAN)
194            except utils.TimeoutError:
195                raise error.TestFail('Pseudomodem failed to launch %s' %
196                                     STATE_MACHINE_SCAN)
197            utils.poll_for_condition(
198                    itesting_scan_machine.IsWaiting,
199                    exception=error.TestFail('Scan machine failed to enter '
200                                             'scan state'),
201                    timeout=SHORT_TIMEOUT_SECONDS)
202            self._check_shill_property_update(
203                    device,
204                    self._cellular_proxy.DEVICE_PROPERTY_SCANNING,
205                    False,
206                    True)
207
208            itesting_scan_machine.Advance()
209            utils.poll_for_condition(
210                    lambda: not itesting_scan_machine.IsWaiting(),
211                    exception=error.TestFail('Scan machine failed to exit '
212                                             'scan state'),
213                    timeout=SHORT_TIMEOUT_SECONDS)
214            self._check_shill_property_update(
215                    device,
216                    self._cellular_proxy.DEVICE_PROPERTY_SCANNING,
217                    True,
218                    False)
219
220
221    def test_activated_service_states(self):
222        """
223        Test that shill |Scanning| property is updated correctly when an
224        activated 3GPP service connects.
225        """
226        with pseudomodem_context.PseudoModemManagerContext(
227                True,
228                {'test-module' : TEST_MODEMS_MODULE_PATH,
229                 'test-state-machine-factory-class' :
230                        'InteractiveStateMachineFactory'}):
231            self._cellular_proxy = cellular_proxy.CellularProxy.get_proxy()
232            self._bus = dbus.SystemBus()
233            self._cellular_proxy.set_logging_for_cellular_test()
234
235            logging.info('Sanity check initial values')
236            enable_machine = self._itesting_machine(
237                    pm_constants.STATE_MACHINE_ENABLE)
238            utils.poll_for_condition(
239                    enable_machine.IsWaiting,
240                    exception=error.TestFail(
241                            'Bad initial state: Pseudomodem did not launch '
242                            'Enable machine'),
243                    timeout=SHORT_TIMEOUT_SECONDS)
244            utils.poll_for_condition(
245                    self._cellular_proxy.find_cellular_device_object,
246                    exception=error.TestFail(
247                            'Bad initial state: Failed to obtain a cellular '
248                            'device in pseudomodem context.'),
249                    timeout=SHORT_TIMEOUT_SECONDS)
250            device = self._cellular_proxy.find_cellular_device_object()
251            mm_modem = self._find_mm_modem()
252
253            logging.info('Test Connect sequence')
254            self._check_mm_state(mm_modem,
255                                 mm1_constants.MM_MODEM_STATE_DISABLED)
256            self._check_shill_property_update(
257                    device,
258                    self._cellular_proxy.DEVICE_PROPERTY_POWERED,
259                    False,
260                    False)
261            self._check_shill_property_update(
262                    device,
263                    self._cellular_proxy.DEVICE_PROPERTY_SCANNING,
264                    False,
265                    False)
266            logging.info('Expectation met: |Scanning| is False in MM state '
267                         'Disabled')
268            enable_machine.Advance()
269
270            # MM state: Enabling
271            utils.poll_for_condition(
272                    enable_machine.IsWaiting,
273                    exception=error.TestFail('EnableMachine failed to wait in '
274                                             'Enabling state'),
275                    timeout=SHORT_TIMEOUT_SECONDS)
276            self._check_mm_state(mm_modem,
277                                 mm1_constants.MM_MODEM_STATE_ENABLING)
278            self._check_shill_property_update(
279                    device,
280                    self._cellular_proxy.DEVICE_PROPERTY_SCANNING,
281                    False,
282                    True)
283            logging.info('Expectation met: |Scanning| is True in MM state '
284                         'Enabling')
285            enable_machine.Advance()
286
287            # MM state: Enabled
288            utils.poll_for_condition(
289                    enable_machine.IsWaiting,
290                    exception=error.TestFail('EnableMachine failed to wait in '
291                                             'Enabled state'),
292                    timeout=SHORT_TIMEOUT_SECONDS)
293            # Finish the enable call.
294            enable_machine.Advance()
295
296            self._check_mm_state(mm_modem, mm1_constants.MM_MODEM_STATE_ENABLED)
297            self._check_shill_property_update(
298                    device,
299                    self._cellular_proxy.DEVICE_PROPERTY_POWERED,
300                    False,
301                    True)
302            self._check_shill_property_update(
303                    device,
304                    self._cellular_proxy.DEVICE_PROPERTY_SCANNING,
305                    True,
306                    True)
307
308            register_machine = self._itesting_machine(
309                    pm_constants.STATE_MACHINE_REGISTER)
310            utils.poll_for_condition(
311                    register_machine.IsWaiting,
312                    exception=error.TestFail('SearchingMachine failed to wait '
313                                             'in Enabled state'),
314                    timeout=SHORT_TIMEOUT_SECONDS)
315            logging.info('Expectation met: |Scanning| is True in MM state '
316                         'Enabled')
317            register_machine.Advance()
318
319            # MM state: Searching
320            utils.poll_for_condition(
321                    register_machine.IsWaiting,
322                    exception=error.TestFail('SearchingMachine failed to wait '
323                                             'in Searching state'),
324                    timeout=SHORT_TIMEOUT_SECONDS)
325            self._check_mm_state(mm_modem,
326                                 mm1_constants.MM_MODEM_STATE_SEARCHING)
327            enable_machine.Advance()
328            self._check_shill_property_update(
329                    device,
330                    self._cellular_proxy.DEVICE_PROPERTY_SCANNING,
331                    True,
332                    True)
333            logging.info('Expectation met: |Scanning| is True in MM state '
334                         'Searching')
335            register_machine.Advance()
336
337            # MM state: >= Registered
338            utils.poll_for_condition(
339                    self._cellular_proxy.find_cellular_service_object,
340                    error.TestFail('Failed to create Cellular Service for a '
341                                   'registered modem'),
342                    timeout=SHORT_TIMEOUT_SECONDS)
343            self._check_mm_state(mm_modem,
344                                 [mm1_constants.MM_MODEM_STATE_REGISTERED,
345                                  mm1_constants.MM_MODEM_STATE_CONNECTING,
346                                  mm1_constants.MM_MODEM_STATE_CONNECTED])
347            self._check_shill_property_update(
348                    device,
349                    self._cellular_proxy.DEVICE_PROPERTY_SCANNING,
350                    True,
351                    False)
352            logging.info('Expectation met: |Scanning| is False in MM state '
353                         'Registered')
354
355
356    def run_once(self):
357        """ Autotest entry function """
358        self.test_user_initiated_cellular_scan()
359        self.test_activated_service_states()
360