1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import contextlib
6import dbus
7import logging
8import sys
9import traceback
10
11import common
12from autotest_lib.client.bin import utils
13from autotest_lib.client.common_lib import error
14from autotest_lib.client.cros import backchannel
15from autotest_lib.client.cros.cellular import mm
16from autotest_lib.client.cros.cellular.pseudomodem import pseudomodem_context
17from autotest_lib.client.cros.networking import cellular_proxy
18from autotest_lib.client.cros.networking import shill_context
19from autotest_lib.client.cros.networking import shill_proxy
20
21
22class CellularTestEnvironment(object):
23    """Setup and verify cellular test environment.
24
25    This context manager configures the following:
26        - Sets up backchannel.
27        - Shuts down other devices except cellular.
28        - Shill and MM logging is enabled appropriately for cellular.
29        - Initializes members that tests should use to access test environment
30          (eg. |shill|, |modem_manager|, |modem|).
31
32    Then it verifies the following is valid:
33        - The backchannel is using an Ethernet device.
34        - The SIM is inserted and valid.
35        - There is one and only one modem in the device.
36        - The modem is registered to the network.
37        - There is a cellular service in shill and it's not connected.
38
39    Don't use this base class directly, use the appropriate subclass.
40
41    Setup for over-the-air tests:
42        with CellularOTATestEnvironment() as test_env:
43            # Test body
44
45    Setup for pseudomodem tests:
46        with CellularPseudoMMTestEnvironment(
47                pseudomm_args=({'family': '3GPP'})) as test_env:
48            # Test body
49
50    """
51
52    def __init__(self, use_backchannel=True, shutdown_other_devices=True,
53                 modem_pattern='', skip_modem_reset=False):
54        """
55        @param use_backchannel: Set up the backchannel that can be used to
56                communicate with the DUT.
57        @param shutdown_other_devices: If True, shutdown all devices except
58                cellular.
59        @param modem_pattern: Search string used when looking for the modem.
60
61        """
62        # Tests should use this main loop instead of creating their own.
63        self.mainloop = dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
64        self.bus = dbus.SystemBus(mainloop=self.mainloop)
65
66        self.shill = None
67        self.modem_manager = None
68        self.modem = None
69        self.modem_path = None
70        self._backchannel = None
71
72        self._modem_pattern = modem_pattern
73        self._skip_modem_reset = skip_modem_reset
74
75        self._nested = None
76        self._context_managers = []
77        if use_backchannel:
78            self._backchannel = backchannel.Backchannel()
79            self._context_managers.append(self._backchannel)
80        if shutdown_other_devices:
81            self._context_managers.append(
82                    shill_context.AllowedTechnologiesContext(
83                            [shill_proxy.ShillProxy.TECHNOLOGY_CELLULAR]))
84
85
86    @contextlib.contextmanager
87    def _disable_shill_autoconnect(self):
88        self._enable_shill_cellular_autoconnect(False)
89        yield
90        self._enable_shill_cellular_autoconnect(True)
91
92
93    def __enter__(self):
94        try:
95            # Temporarily disable shill autoconnect to cellular service while
96            # the test environment is setup to prevent a race condition
97            # between disconnecting the modem in _verify_cellular_service()
98            # and shill autoconnect.
99            with self._disable_shill_autoconnect():
100                self._nested = contextlib.nested(*self._context_managers)
101                self._nested.__enter__()
102
103                self._initialize_shill()
104
105                # Perform SIM verification now to ensure that we can enable the
106                # modem in _initialize_modem_components(). ModemManager does not
107                # allow enabling a modem without a SIM.
108                self._verify_sim()
109                self._initialize_modem_components()
110
111                self._setup_logging()
112
113                self._verify_backchannel()
114                self._wait_for_modem_registration()
115                self._verify_cellular_service()
116
117                return self
118        except (error.TestError, dbus.DBusException,
119                shill_proxy.ShillProxyError) as e:
120            except_type, except_value, except_traceback = sys.exc_info()
121            lines = traceback.format_exception(except_type, except_value,
122                                               except_traceback)
123            logging.error('Error during test initialization:\n' +
124                          ''.join(lines))
125            self.__exit__(*sys.exc_info())
126            raise error.TestError('INIT_ERROR: %s' % str(e))
127        except:
128            self.__exit__(*sys.exc_info())
129            raise
130
131
132    def __exit__(self, exception, value, traceback):
133        if self._nested:
134            return self._nested.__exit__(exception, value, traceback)
135        self.shill = None
136        self.modem_manager = None
137        self.modem = None
138        self.modem_path = None
139
140
141    def _get_shill_cellular_device_object(self):
142        return utils.poll_for_condition(
143            lambda: self.shill.find_cellular_device_object(),
144            exception=error.TestError('Cannot find cellular device in shill. '
145                                      'Is the modem plugged in?'),
146            timeout=shill_proxy.ShillProxy.DEVICE_ENUMERATION_TIMEOUT)
147
148
149    def _enable_modem(self):
150        modem_device = self._get_shill_cellular_device_object()
151        try:
152            modem_device.Enable()
153        except dbus.DBusException as e:
154            if (e.get_dbus_name() !=
155                    shill_proxy.ShillProxy.ERROR_IN_PROGRESS):
156                raise
157
158        utils.poll_for_condition(
159            lambda: modem_device.GetProperties()['Powered'],
160            exception=error.TestError(
161                    'Failed to enable modem.'),
162            timeout=shill_proxy.ShillProxy.DEVICE_ENABLE_DISABLE_TIMEOUT)
163
164
165    def _enable_shill_cellular_autoconnect(self, enable):
166        shill = cellular_proxy.CellularProxy.get_proxy(self.bus)
167        shill.manager.SetProperty(
168                shill_proxy.ShillProxy.
169                MANAGER_PROPERTY_NO_AUTOCONNECT_TECHNOLOGIES,
170                '' if enable else 'cellular')
171
172
173    def _is_unsupported_error(self, e):
174        return (e.get_dbus_name() ==
175                shill_proxy.ShillProxy.ERROR_NOT_SUPPORTED or
176                (e.get_dbus_name() ==
177                 shill_proxy.ShillProxy.ERROR_FAILURE and
178                 'operation not supported' in e.get_dbus_message()))
179
180
181    def _reset_modem(self):
182        modem_device = self._get_shill_cellular_device_object()
183        try:
184            # Cromo/MBIM modems do not support being reset.
185            self.shill.reset_modem(modem_device, expect_service=False)
186        except dbus.DBusException as e:
187            if not self._is_unsupported_error(e):
188                raise
189
190
191    def _initialize_shill(self):
192        """Get access to shill."""
193        # CellularProxy.get_proxy() checks to see if shill is running and
194        # responding to DBus requests. It returns None if that's not the case.
195        self.shill = cellular_proxy.CellularProxy.get_proxy(self.bus)
196        if self.shill is None:
197            raise error.TestError('Cannot connect to shill, is shill running?')
198
199
200    def _initialize_modem_components(self):
201        """Reset the modem and get access to modem components."""
202        # Enable modem first so shill initializes the modemmanager proxies so
203        # we can call reset on it.
204        self._enable_modem()
205        if not self._skip_modem_reset:
206            self._reset_modem()
207
208        # PickOneModem() makes sure there's a modem manager and that there is
209        # one and only one modem.
210        self.modem_manager, self.modem_path = \
211                mm.PickOneModem(self._modem_pattern)
212        self.modem = self.modem_manager.GetModem(self.modem_path)
213        if self.modem is None:
214            raise error.TestError('Cannot get modem object at %s.' %
215                                  self.modem_path)
216
217
218    def _setup_logging(self):
219        self.shill.set_logging_for_cellular_test()
220        self.modem_manager.SetDebugLogging()
221
222
223    def _verify_sim(self):
224        """Verify SIM is valid.
225
226        Make sure a SIM in inserted and that it is not locked.
227
228        @raise error.TestError if SIM does not exist or is locked.
229
230        """
231        modem_device = self._get_shill_cellular_device_object()
232        props = modem_device.GetProperties()
233
234        # No SIM in CDMA modems.
235        family = props[
236                cellular_proxy.CellularProxy.DEVICE_PROPERTY_TECHNOLOGY_FAMILY]
237        if (family ==
238                cellular_proxy.CellularProxy.
239                DEVICE_PROPERTY_TECHNOLOGY_FAMILY_CDMA):
240            return
241
242        # Make sure there is a SIM.
243        if not props[cellular_proxy.CellularProxy.DEVICE_PROPERTY_SIM_PRESENT]:
244            raise error.TestError('There is no SIM in the modem.')
245
246        # Make sure SIM is not locked.
247        lock_status = props.get(
248                cellular_proxy.CellularProxy.DEVICE_PROPERTY_SIM_LOCK_STATUS,
249                None)
250        if lock_status is None:
251            raise error.TestError('Failed to read SIM lock status.')
252        locked = lock_status.get(
253                cellular_proxy.CellularProxy.PROPERTY_KEY_SIM_LOCK_ENABLED,
254                None)
255        if locked is None:
256            raise error.TestError('Failed to read SIM LockEnabled status.')
257        elif locked:
258            raise error.TestError(
259                    'SIM is locked, test requires an unlocked SIM.')
260
261
262    def _verify_backchannel(self):
263        """Verify backchannel is on an ethernet device.
264
265        @raise error.TestError if backchannel is not on an ethernet device.
266
267        """
268        if self._backchannel is None:
269            return
270
271        if not self._backchannel.is_using_ethernet():
272            raise error.TestError('An ethernet connection is required between '
273                                  'the test server and the device under test.')
274
275
276    def _wait_for_modem_registration(self):
277        """Wait for the modem to register with the network.
278
279        @raise error.TestError if modem is not registered.
280
281        """
282        utils.poll_for_condition(
283            self.modem.ModemIsRegistered,
284            exception=error.TestError(
285                    'Modem failed to register with the network.'),
286            timeout=cellular_proxy.CellularProxy.SERVICE_REGISTRATION_TIMEOUT)
287
288
289    def _verify_cellular_service(self):
290        """Make sure a cellular service exists.
291
292        The cellular service should not be connected to the network.
293
294        @raise error.TestError if cellular service does not exist or if
295                there are multiple cellular services.
296
297        """
298        service = self.shill.wait_for_cellular_service_object()
299
300        try:
301            service.Disconnect()
302        except dbus.DBusException as e:
303            if (e.get_dbus_name() !=
304                    cellular_proxy.CellularProxy.ERROR_NOT_CONNECTED):
305                raise
306        success, state, _ = self.shill.wait_for_property_in(
307                service,
308                cellular_proxy.CellularProxy.SERVICE_PROPERTY_STATE,
309                ('idle',),
310                cellular_proxy.CellularProxy.SERVICE_DISCONNECT_TIMEOUT)
311        if not success:
312            raise error.TestError(
313                    'Cellular service needs to start in the "idle" state. '
314                    'Current state is "%s". '
315                    'Modem disconnect may have failed.' %
316                    state)
317
318
319class CellularOTATestEnvironment(CellularTestEnvironment):
320    """Setup and verify cellular over-the-air (OTA) test environment. """
321    def __init__(self, **kwargs):
322        super(CellularOTATestEnvironment, self).__init__(**kwargs)
323
324
325class CellularPseudoMMTestEnvironment(CellularTestEnvironment):
326    """Setup and verify cellular pseudomodem test environment. """
327    def __init__(self, pseudomm_args=None, **kwargs):
328        """
329        @param pseudomm_args: Tuple of arguments passed to the pseudomodem, see
330                pseudomodem_context.py for description of each argument in the
331                tuple: (flags_map, block_output, bus)
332
333        """
334        kwargs["skip_modem_reset"] = True
335        super(CellularPseudoMMTestEnvironment, self).__init__(**kwargs)
336        self._context_managers.append(
337                pseudomodem_context.PseudoModemManagerContext(
338                        True, bus=self.bus, *pseudomm_args))
339