1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Utilities for cellular tests."""
6import copy, dbus, os, tempfile
7
8# TODO(thieule): Consider renaming mm.py, mm1.py, modem.py, etc to be more
9# descriptive (crosbug.com/37060).
10import common
11from autotest_lib.client.bin import utils
12from autotest_lib.client.common_lib import error
13from autotest_lib.client.cros.cellular import cellular
14from autotest_lib.client.cros.cellular import cellular_system_error
15from autotest_lib.client.cros.cellular import mm
16from autotest_lib.client.cros.cellular import modem
17
18from autotest_lib.client.cros import flimflam_test_path
19import flimflam
20
21
22TIMEOUT = 30
23SERVICE_TIMEOUT = 60
24
25import cellular_logging
26
27logger = cellular_logging.SetupCellularLogging('cell_tools')
28
29
30def ConnectToCellular(flim, timeout=TIMEOUT):
31    """Attempts to connect to a cell network using FlimFlam.
32
33    Args:
34        flim: A flimflam object
35        timeout: Timeout (in seconds) before giving up on connect
36
37    Returns:
38        a tuple of the service and the service state
39
40    Raises:
41        Error if connection fails or times out
42    """
43
44    service = flim.FindCellularService(timeout=timeout)
45    if not service:
46        raise cellular_system_error.ConnectionFailure(
47            'Could not find cell service')
48    properties = service.GetProperties(utf8_strings=True)
49    logger.error('Properties are: %s', properties)
50
51    logger.info('Connecting to cell service: %s', service)
52
53    states = ['portal', 'online', 'idle']
54    state = flim.WaitForServiceState(service=service,
55                                     expected_states=states,
56                                     timeout=timeout,
57                                     ignore_failure=True)[0]
58    logger.debug('Cell connection state : %s ' % state)
59    connected_states = ['portal', 'online']
60    if state in connected_states:
61        logger.debug('Looks good, skip ConnectService')
62        return service, state
63    else:
64        logger.debug('Trying to ConnectService')
65
66    success, status = flim.ConnectService(
67        service=service,
68        assoc_timeout=timeout,
69        config_timeout=timeout)
70
71    if not success:
72        logger.error('Connect failed: %s' % status)
73        # TODO(rochberg):  Turn off autoconnect
74        if 'Error.AlreadyConnected' not in status['reason']:
75            raise cellular_system_error.ConnectionFailure(
76                'Could not connect: %s.' % status)
77
78    state = flim.WaitForServiceState(service=service,
79                                     expected_states=connected_states,
80                                     timeout=timeout,
81                                     ignore_failure=True)[0]
82    if not state in connected_states:
83        raise cellular_system_error.BadState(
84            'Still in state %s, expecting one of: %s ' %
85            (state, str(connected_states)))
86
87    return service, state
88
89
90def FindLastGoodAPN(service, default=None):
91    if not service:
92        return default
93    props = service.GetProperties()
94    if 'Cellular.LastGoodAPN' not in props:
95        return default
96    last_good_apn = props['Cellular.LastGoodAPN']
97    return last_good_apn.get('apn', default)
98
99
100def DisconnectFromCellularService(bs, flim, service):
101    """Attempts to disconnect from the supplied cellular service.
102
103    Args:
104        bs:  A basestation object.  Pass None to skip basestation-side checks
105        flim:  A flimflam object
106        service:  A cellular service object
107    """
108
109    flim.DisconnectService(service)  # Waits for flimflam state to go to idle
110
111    if bs:
112        verifier = bs.GetAirStateVerifier()
113        # This is racy: The modem is free to report itself as
114        # disconnected before it actually finishes tearing down its RF
115        # connection.
116        verifier.AssertDataStatusIn([
117            cellular.UeGenericDataStatus.DISCONNECTING,
118            cellular.UeGenericDataStatus.REGISTERED,
119            cellular.UeGenericDataStatus.NONE,])
120
121        def _ModemIsFullyDisconnected():
122            return verifier.IsDataStatusIn([
123                cellular.UeGenericDataStatus.REGISTERED,
124                cellular.UeGenericDataStatus.NONE,])
125
126        utils.poll_for_condition(
127            _ModemIsFullyDisconnected,
128            timeout=20,
129            exception=cellular_system_error.BadState(
130                'modem not disconnected from base station'))
131
132
133def _EnumerateModems(manager):
134    """Get a set of modem paths."""
135    return set([x[1] for x in mm.EnumerateDevices(manager)])
136
137
138def _SawNewModem(manager, preexisting_modems, old_modem):
139    current_modems = _EnumerateModems(manager)
140    if old_modem in current_modems:
141        return False
142    # NB: This fails if an unrelated modem disappears.  Not fixing
143    # until we support > 1 modem
144    return preexisting_modems != current_modems
145
146
147def _WaitForModemToReturn(manager, preexisting_modems_original, modem_path):
148    preexisting_modems = copy.copy(preexisting_modems_original)
149    preexisting_modems.remove(modem_path)
150
151    utils.poll_for_condition(
152        lambda: _SawNewModem(manager, preexisting_modems, modem_path),
153        timeout=50,
154        exception=cellular_system_error.BadState(
155            'Modem did not come back after settings change'))
156
157    current_modems = _EnumerateModems(manager)
158
159    new_modems = [x for x in current_modems - preexisting_modems]
160    if len(new_modems) != 1:
161        raise cellular_system_error.BadState(
162            'Unexpected modem list change: %s vs %s' %
163            (current_modems, new_modems))
164
165    logger.info('New modem: %s' % new_modems[0])
166    return new_modems[0]
167
168
169def SetFirmwareForTechnologyFamily(manager, modem_path, family):
170    """Set the modem to firmware.  Return potentially-new modem path."""
171    # todo(byronk): put this in a modem object?
172    if family == cellular.TechnologyFamily.LTE:
173        return  # nothing to set up on a Pixel. todo(byronk) how about others?
174    logger.debug('SetFirmwareForTechnologyFamily : manager : %s ' % manager)
175    logger.debug('SetFirmwareForTechnologyFamily : modem_path : %s ' %
176                 modem_path)
177    logger.debug('SetFirmwareForTechnologyFamily : family : %s ' % family)
178    preexisting_modems = _EnumerateModems(manager)
179    # We do not currently support any multi-family modems besides Gobi
180    gobi = manager.GetModem(modem_path).GobiModem()
181    if not gobi:
182        raise cellular_system_error.BadScpiCommand(
183            'Modem %s does not support %s, cannot change technologies' %
184            modem_path, family)
185
186    logger.info('Changing firmware to technology family %s' % family)
187
188    FamilyToCarrierString = {
189            cellular.TechnologyFamily.UMTS: 'Generic UMTS',
190            cellular.TechnologyFamily.CDMA: 'Verizon Wireless',}
191
192    gobi.SetCarrier(FamilyToCarrierString[family])
193    return _WaitForModemToReturn(manager, preexisting_modems, modem_path)
194
195
196# A test PRL that has an ID of 3333 and sets the device to aquire the
197# default config of an 8960 with system_id 331.  Base64 encoding
198# Generated with "base64 < prl"
199
200TEST_PRL_3333 = (
201    'ADENBQMAAMAAAYADAgmABgIKDQsEAYAKDUBAAQKWAAICQGAJApYAAgIw8BAAAQDhWA=='.
202    decode('base64_codec'))
203
204
205# A modem with this MDN will always report itself as activated
206TESTING_MDN = dbus.String('1115551212', variant_level=1)
207
208
209def _IsCdmaModemConfiguredCorrectly(manager, modem_path):
210    """Returns true iff the CDMA modem at modem_path is configured correctly."""
211    # We don't test for systemID because the PRL should take care of
212    # that.
213
214    status = manager.GetModem(modem_path).SimpleModem().GetStatus()
215
216    required_settings = {'mdn': TESTING_MDN,
217                         'min': TESTING_MDN,
218                         'prl_version': 3333}
219    configured_correctly = True
220
221    for rk, rv in required_settings.iteritems():
222        if rk not in status or rv != status[rk]:
223            logger.error('_CheckCdmaModemStatus:  %s: expected %s, got %s' % (
224                rk, rv, status.get(rk)))
225            configured_correctly = False
226    return configured_correctly
227
228
229def PrepareCdmaModem(manager, modem_path):
230    """Configure a CDMA device (including PRL, MIN, and MDN)."""
231
232    if _IsCdmaModemConfiguredCorrectly(manager, modem_path):
233        return modem_path
234
235    logger.info('Updating modem settings')
236    preexisting_modems = _EnumerateModems(manager)
237    cdma = manager.GetModem(modem_path).CdmaModem()
238
239    with tempfile.NamedTemporaryFile() as f:
240        os.chmod(f.name, 0744)
241        f.write(TEST_PRL_3333)
242        f.flush()
243        logger.info('Calling ActivateManual to change PRL')
244
245        cdma.ActivateManual({
246            'mdn': TESTING_MDN,
247            'min': TESTING_MDN,
248            'prlfile': dbus.String(f.name, variant_level=1),
249            'system_id': dbus.UInt16(331, variant_level=1),  # Default 8960 SID
250            'spc': dbus.String('000000'),})
251        new_path = _WaitForModemToReturn(
252            manager, preexisting_modems, modem_path)
253
254    if not _IsCdmaModemConfiguredCorrectly(manager, new_path):
255        raise cellular_system_error.BadState('Modem configuration failed')
256    return new_path
257
258
259def PrepareModemForTechnology(modem_path, target_technology):
260    """Prepare modem for the technology: Sets things like firmware, PRL."""
261
262    manager, modem_path = mm.PickOneModem(modem_path)
263
264    logger.info('Found modem %s' % modem_path)
265
266
267    # todo(byronk) : This returns TechnologyFamily:UMTS on a Pixel. ????
268    current_family = manager.GetModem(modem_path).GetCurrentTechnologyFamily()
269    target_family = cellular.TechnologyToFamily[target_technology]
270
271    if current_family != target_family:
272        logger.debug('Modem Current Family: %s ' % current_family)
273        logger.debug('Modem Target Family : %s ' %target_family )
274        modem_path = SetFirmwareForTechnologyFamily(
275            manager, modem_path, target_family)
276
277    if target_family == cellular.TechnologyFamily.CDMA:
278        modem_path = PrepareCdmaModem(manager, modem_path)
279        # Force the modem to report that is has been activated since we
280        # use a custom PRL and have already manually activated it.
281        manager.GetModem(modem_path).GobiModem().ForceModemActivatedStatus()
282
283    # When testing EVDO, we need to force the modem to register with EVDO
284    # directly (bypassing CDMA 1x RTT) else the modem will not register
285    # properly because it looks for CDMA 1x RTT first but can't find it
286    # because the call box can only emulate one technology at a time (EVDO).
287    try:
288        if target_technology == cellular.Technology.EVDO_1X:
289            network_preference = modem.Modem.NETWORK_PREFERENCE_EVDO_1X
290        else:
291            network_preference = modem.Modem.NETWORK_PREFERENCE_AUTOMATIC
292        gobi = manager.GetModem(modem_path).GobiModem()
293        gobi.SetNetworkPreference(network_preference)
294    except AttributeError:
295        # Not a Gobi modem
296        pass
297
298    return modem_path
299
300
301def FactoryResetModem(modem_pattern, spc='000000'):
302    """Factory resets modem, returns DBus pathname of modem after reset."""
303    manager, modem_path = mm.PickOneModem(modem_pattern)
304    preexisting_modems = _EnumerateModems(manager)
305    modem = manager.GetModem(modem_path).Modem()
306    modem.FactoryReset(spc)
307    return _WaitForModemToReturn(manager, preexisting_modems, modem_path)
308
309
310class OtherDeviceShutdownContext(object):
311    """Context manager that shuts down other devices.
312
313    Usage:
314        with cell_tools.OtherDeviceShutdownContext('cellular'):
315            block
316
317    TODO(rochberg):  Replace flimflam.DeviceManager with this
318    """
319
320    def __init__(self, device_type):
321        self.device_type = device_type
322        self.device_manager = None
323
324    def __enter__(self):
325        self.device_manager = flimflam.DeviceManager(flimflam.FlimFlam())
326        self.device_manager.ShutdownAllExcept(self.device_type)
327        return self
328
329    def __exit__(self, exception, value, traceback):
330        if self.device_manager:
331            self.device_manager.RestoreDevices()
332        return False
333
334
335class AutoConnectContext(object):
336    """Context manager which sets autoconnect to either true or false.
337
338       Enable or Disable autoconnect for the cellular service.
339       Restore it when done.
340
341       Usage:
342           with cell_tools.DisableAutoConnectContext(device, flim, autoconnect):
343               block
344    """
345
346    def __init__(self, device, flim, autoconnect):
347        self.device = device
348        self.flim = flim
349        self.autoconnect = autoconnect
350        self.autoconnect_changed = False
351
352    def PowerOnDevice(self, device):
353        """Power on a flimflam device, ignoring in progress errors."""
354        logger.info('powered = %s' % device.GetProperties()['Powered'])
355        if device.GetProperties()['Powered']:
356            return
357        try:
358            device.Enable()
359        except dbus.exceptions.DBusException, e:
360            if e._dbus_error_name != 'org.chromium.flimflam.Error.InProgress':
361                raise e
362
363    def __enter__(self):
364        """Power up device, get the service and disable autoconnect."""
365        changed = False
366        self.PowerOnDevice(self.device)
367
368        # Use SERVICE_TIMEOUT*2 here because it may take SERVICE_TIMEOUT
369        # seconds for the modem to disconnect when the base emulator is taken
370        # offline for reconfiguration and then another SERVICE_TIMEOUT
371        # seconds for the modem to reconnect after the base emulator is
372        # brought back online.
373        #
374        # TODO(jglasgow): generalize to use services associated with device
375        service = self.flim.FindCellularService(timeout=SERVICE_TIMEOUT*2)
376        if not service:
377            raise error.TestFail('No cellular service available.')
378
379        # Always set the AutoConnect property even if the requested value
380        # is the same so that shill will retain the AutoConnect property, else
381        # shill may override it.
382        props = service.GetProperties()
383        autoconnect = props['AutoConnect']
384        logger.info('AutoConnect = %s' % autoconnect)
385        logger.info('Setting AutoConnect = %s.', self.autoconnect)
386        service.SetProperty('AutoConnect', dbus.Boolean(self.autoconnect))
387
388        if autoconnect != self.autoconnect:
389            props = service.GetProperties()
390            autoconnect = props['AutoConnect']
391            changed = True
392
393        # Make sure the cellular service gets persisted by taking it out of
394        # the ephemeral profile.
395        if not props['Profile']:
396            manager_props = self.flim.manager.GetProperties()
397            active_profile = manager_props['ActiveProfile']
398            logger.info("Setting cellular service profile to %s",
399                        active_profile)
400            service.SetProperty('Profile', active_profile)
401
402        if autoconnect != self.autoconnect:
403            raise error.TestFail('AutoConnect is %s, but we want it to be %s' %
404                                 (autoconnect, self.autoconnect))
405
406        self.autoconnect_changed = changed
407
408        return self
409
410    def __exit__(self, exception, value, traceback):
411        """Restore autoconnect state if we changed it."""
412        if not self.autoconnect_changed:
413            return False
414
415        try:
416            self.PowerOnDevice(self.device)
417        except Exception as e:
418            if exception:
419                logger.error(
420                    'Exiting AutoConnectContext with one exception, but ' +
421                    'PowerOnDevice raised another')
422                logger.error(
423                    'Swallowing PowerOnDevice exception %s' % e)
424                return False
425            else:
426                raise e
427
428        # TODO(jglasgow): generalize to use services associated with
429        # device, and restore state only on changed services
430        service = self.flim.FindCellularService()
431        if not service:
432            logger.error('Cannot find cellular service.  '
433                          'Autoconnect state not restored.')
434            return False
435        service.SetProperty('AutoConnect', dbus.Boolean(not self.autoconnect))
436
437        return False
438