1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import contextlib
6import dbus
7import logging
8import sys
9import traceback
10
11import common
12from autotest_lib.client.bin import utils
13from autotest_lib.client.common_lib import error
14from autotest_lib.client.cros import backchannel
15from autotest_lib.client.cros.cellular import cell_tools
16from autotest_lib.client.cros.cellular import mm
17from autotest_lib.client.cros.cellular.pseudomodem import pseudomodem_context
18from autotest_lib.client.cros.cellular.wardmodem import wardmodem
19from autotest_lib.client.cros.networking import cellular_proxy
20from autotest_lib.client.cros.networking import shill_proxy
21
22# Import 'flimflam_test_path' first in order to import flimflam.
23# pylint: disable=W0611
24from autotest_lib.client.cros import flimflam_test_path
25import flimflam
26
27class CellularTestEnvironment(object):
28    """Setup and verify cellular test environment.
29
30    This context manager configures the following:
31        - Sets up backchannel.
32        - Shuts down other devices except cellular.
33        - Shill and MM logging is enabled appropriately for cellular.
34        - Initializes members that tests should use to access test environment
35          (eg. |shill|, |flimflam|, |modem_manager|, |modem|).
36
37    Then it verifies the following is valid:
38        - The backchannel is using an Ethernet device.
39        - The SIM is inserted and valid.
40        - There is one and only one modem in the device.
41        - The modem is registered to the network.
42        - There is a cellular service in shill and it's not connected.
43
44    Don't use this base class directly, use the appropriate subclass.
45
46    Setup for over-the-air tests:
47        with CellularOTATestEnvironment() as test_env:
48            # Test body
49
50    Setup for pseudomodem tests:
51        with CellularPseudoMMTestEnvironment(
52                pseudomm_args=({'family': '3GPP'})) as test_env:
53            # Test body
54
55    Setup for wardmodem tests:
56        with CellularWardModemTestEnvironment(
57                wardmodem_modem='e362') as test_env:
58            # Test body
59
60    """
61
62    def __init__(self, use_backchannel=True, shutdown_other_devices=True,
63                 modem_pattern=''):
64        """
65        @param use_backchannel: Set up the backchannel that can be used to
66                communicate with the DUT.
67        @param shutdown_other_devices: If True, shutdown all devices except
68                cellular.
69        @param modem_pattern: Search string used when looking for the modem.
70
71        """
72        # Tests should use this main loop instead of creating their own.
73        self.mainloop = dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
74        self.bus = dbus.SystemBus(mainloop=self.mainloop)
75
76        self.shill = None
77        self.flim = None  # Only use this for legacy tests.
78        self.modem_manager = None
79        self.modem = None
80        self.modem_path = None
81        self._backchannel = None
82
83        self._modem_pattern = modem_pattern
84
85        self._nested = None
86        self._context_managers = []
87        if use_backchannel:
88            self._backchannel = backchannel.Backchannel()
89            self._context_managers.append(self._backchannel)
90        if shutdown_other_devices:
91            self._context_managers.append(
92                    cell_tools.OtherDeviceShutdownContext('cellular'))
93
94
95    @contextlib.contextmanager
96    def _disable_shill_autoconnect(self):
97        self._enable_shill_cellular_autoconnect(False)
98        yield
99        self._enable_shill_cellular_autoconnect(True)
100
101
102    def __enter__(self):
103        try:
104            # Temporarily disable shill autoconnect to cellular service while
105            # the test environment is setup to prevent a race condition
106            # between disconnecting the modem in _verify_cellular_service()
107            # and shill autoconnect.
108            with self._disable_shill_autoconnect():
109                self._nested = contextlib.nested(*self._context_managers)
110                self._nested.__enter__()
111
112                self._initialize_shill()
113
114                # Perform SIM verification now to ensure that we can enable the
115                # modem in _initialize_modem_components(). ModemManager does not
116                # allow enabling a modem without a SIM.
117                self._verify_sim()
118                self._initialize_modem_components()
119
120                self._setup_logging()
121
122                self._verify_backchannel()
123                self._wait_for_modem_registration()
124                self._verify_cellular_service()
125
126                return self
127        except (error.TestError, dbus.DBusException,
128                shill_proxy.ShillProxyError) as e:
129            except_type, except_value, except_traceback = sys.exc_info()
130            lines = traceback.format_exception(except_type, except_value,
131                                               except_traceback)
132            logging.error('Error during test initialization:\n' +
133                          ''.join(lines))
134            self.__exit__(*sys.exc_info())
135            raise error.TestError('INIT_ERROR: %s' % str(e))
136        except:
137            self.__exit__(*sys.exc_info())
138            raise
139
140
141    def __exit__(self, exception, value, traceback):
142        if self._nested:
143            return self._nested.__exit__(exception, value, traceback)
144        self.shill = None
145        self.flim = None
146        self.modem_manager = None
147        self.modem = None
148        self.modem_path = None
149
150
151    def _get_shill_cellular_device_object(self):
152        modem_device = self.shill.find_cellular_device_object()
153        if not modem_device:
154            raise error.TestError('Cannot find cellular device in shill. '
155                                  'Is the modem plugged in?')
156        return modem_device
157
158
159    def _enable_modem(self):
160        modem_device = self._get_shill_cellular_device_object()
161        try:
162            modem_device.Enable()
163        except dbus.DBusException as e:
164            if (e.get_dbus_name() !=
165                    shill_proxy.ShillProxy.ERROR_IN_PROGRESS):
166                raise
167
168        utils.poll_for_condition(
169            lambda: modem_device.GetProperties()['Powered'],
170            exception=error.TestError(
171                    'Failed to enable modem.'),
172            timeout=shill_proxy.ShillProxy.DEVICE_ENABLE_DISABLE_TIMEOUT)
173
174
175    def _enable_shill_cellular_autoconnect(self, enable):
176        shill = cellular_proxy.CellularProxy.get_proxy(self.bus)
177        shill.manager.SetProperty(
178                shill_proxy.ShillProxy.
179                MANAGER_PROPERTY_NO_AUTOCONNECT_TECHNOLOGIES,
180                '' if enable else 'cellular')
181
182
183    def _is_unsupported_error(self, e):
184        return (e.get_dbus_name() ==
185                shill_proxy.ShillProxy.ERROR_NOT_SUPPORTED or
186                (e.get_dbus_name() ==
187                 shill_proxy.ShillProxy.ERROR_FAILURE and
188                 'operation not supported' in e.get_dbus_message()))
189
190
191    def _reset_modem(self):
192        modem_device = self._get_shill_cellular_device_object()
193        try:
194            # Cromo/MBIM modems do not support being reset.
195            self.shill.reset_modem(modem_device, expect_service=False)
196        except dbus.DBusException as e:
197            if not self._is_unsupported_error(e):
198                raise
199
200
201    def _initialize_shill(self):
202        """Get access to shill."""
203        # CellularProxy.get_proxy() checks to see if shill is running and
204        # responding to DBus requests. It returns None if that's not the case.
205        self.shill = cellular_proxy.CellularProxy.get_proxy(self.bus)
206        if self.shill is None:
207            raise error.TestError('Cannot connect to shill, is shill running?')
208
209        # Keep this around to support older tests that haven't migrated to
210        # cellular_proxy.
211        self.flim = flimflam.FlimFlam()
212
213
214    def _initialize_modem_components(self):
215        """Reset the modem and get access to modem components."""
216        # Enable modem first so shill initializes the modemmanager proxies so
217        # we can call reset on it.
218        self._enable_modem()
219        self._reset_modem()
220
221        # PickOneModem() makes sure there's a modem manager and that there is
222        # one and only one modem.
223        self.modem_manager, self.modem_path = \
224                mm.PickOneModem(self._modem_pattern)
225        self.modem = self.modem_manager.GetModem(self.modem_path)
226        if self.modem is None:
227            raise error.TestError('Cannot get modem object at %s.' %
228                                  self.modem_path)
229
230
231    def _setup_logging(self):
232        self.shill.set_logging_for_cellular_test()
233        self.modem_manager.SetDebugLogging()
234
235
236    def _verify_sim(self):
237        """Verify SIM is valid.
238
239        Make sure a SIM in inserted and that it is not locked.
240
241        @raise error.TestError if SIM does not exist or is locked.
242
243        """
244        modem_device = self._get_shill_cellular_device_object()
245        props = modem_device.GetProperties()
246
247        # No SIM in CDMA modems.
248        family = props[
249                cellular_proxy.CellularProxy.DEVICE_PROPERTY_TECHNOLOGY_FAMILY]
250        if (family ==
251                cellular_proxy.CellularProxy.
252                DEVICE_PROPERTY_TECHNOLOGY_FAMILY_CDMA):
253            return
254
255        # Make sure there is a SIM.
256        if not props[cellular_proxy.CellularProxy.DEVICE_PROPERTY_SIM_PRESENT]:
257            raise error.TestError('There is no SIM in the modem.')
258
259        # Make sure SIM is not locked.
260        lock_status = props.get(
261                cellular_proxy.CellularProxy.DEVICE_PROPERTY_SIM_LOCK_STATUS,
262                None)
263        if lock_status is None:
264            raise error.TestError('Failed to read SIM lock status.')
265        locked = lock_status.get(
266                cellular_proxy.CellularProxy.PROPERTY_KEY_SIM_LOCK_ENABLED,
267                None)
268        if locked is None:
269            raise error.TestError('Failed to read SIM LockEnabled status.')
270        elif locked:
271            raise error.TestError(
272                    'SIM is locked, test requires an unlocked SIM.')
273
274
275    def _verify_backchannel(self):
276        """Verify backchannel is on an ethernet device.
277
278        @raise error.TestError if backchannel is not on an ethernet device.
279
280        """
281        if self._backchannel is None:
282            return
283
284        if not self._backchannel.is_using_ethernet():
285            raise error.TestError('An ethernet connection is required between '
286                                  'the test server and the device under test.')
287
288
289    def _wait_for_modem_registration(self):
290        """Wait for the modem to register with the network.
291
292        @raise error.TestError if modem is not registered.
293
294        """
295        utils.poll_for_condition(
296            self.modem.ModemIsRegistered,
297            exception=error.TestError(
298                    'Modem failed to register with the network.'),
299            timeout=cellular_proxy.CellularProxy.SERVICE_REGISTRATION_TIMEOUT)
300
301
302    def _verify_cellular_service(self):
303        """Make sure a cellular service exists.
304
305        The cellular service should not be connected to the network.
306
307        @raise error.TestError if cellular service does not exist or if
308                there are multiple cellular services.
309
310        """
311        service = self.shill.wait_for_cellular_service_object()
312
313        try:
314            service.Disconnect()
315        except dbus.DBusException as e:
316            if (e.get_dbus_name() !=
317                    cellular_proxy.CellularProxy.ERROR_NOT_CONNECTED):
318                raise
319        success, state, _ = self.shill.wait_for_property_in(
320                service,
321                cellular_proxy.CellularProxy.SERVICE_PROPERTY_STATE,
322                ('idle',),
323                cellular_proxy.CellularProxy.SERVICE_DISCONNECT_TIMEOUT)
324        if not success:
325            raise error.TestError(
326                    'Cellular service needs to start in the "idle" state. '
327                    'Current state is "%s". '
328                    'Modem disconnect may have failed.' %
329                    state)
330
331
332class CellularOTATestEnvironment(CellularTestEnvironment):
333    """Setup and verify cellular over-the-air (OTA) test environment. """
334    def __init__(self, **kwargs):
335        super(CellularOTATestEnvironment, self).__init__(**kwargs)
336
337
338class CellularPseudoMMTestEnvironment(CellularTestEnvironment):
339    """Setup and verify cellular pseudomodem test environment. """
340    def __init__(self, pseudomm_args=None, **kwargs):
341        """
342        @param pseudomm_args: Tuple of arguments passed to the pseudomodem, see
343                pseudomodem_context.py for description of each argument in the
344                tuple: (flags_map, block_output, bus)
345
346        """
347        super(CellularPseudoMMTestEnvironment, self).__init__(**kwargs)
348        self._context_managers.append(
349                pseudomodem_context.PseudoModemManagerContext(
350                        True, bus=self.bus, *pseudomm_args))
351
352
353class CellularWardModemTestEnvironment(CellularTestEnvironment):
354    """Setup and verify cellular ward modem test environment. """
355    def __init__(self, wardmodem_modem=None, **kwargs):
356        """
357        @param wardmodem_modem: Customized ward modem to use instead of the
358                default implementation, see wardmodem.py.
359
360        """
361        super(CellularWardModemTestEnvironment, self).__init__(**kwargs)
362        self._context_managers.append(
363                wardmodem.WardModemContext(args=['--modem', wardmodem_modem]))
364