1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import dbus
6import logging
7import random
8import time
9
10from autotest_lib.client.bin import test, utils
11from autotest_lib.client.common_lib import error
12from autotest_lib.client.cros.cellular import cell_tools
13from autotest_lib.client.cros.cellular import cellular
14from autotest_lib.client.cros.networking import cellular_proxy
15from autotest_lib.client.cros.networking import shill_proxy
16
17# Number of seconds we wait for the cellular service to perform an action.
18DEVICE_TIMEOUT=45
19SERVICE_TIMEOUT=75
20
21# Number of times and seconds between modem state checks to ensure that the
22# modem is not in a temporary transition state.
23NUM_MODEM_STATE_CHECKS=2
24MODEM_STATE_CHECK_PERIOD_SECONDS=5
25
26# Number of seconds to sleep after a connect request in slow-connect mode.
27SLOW_CONNECT_WAIT_SECONDS=20
28
29
30class TechnologyCommands():
31    """Control the modem mostly using shill Technology interfaces."""
32    def __init__(self, shill, command_delegate):
33        self.shill = shill
34        self.command_delegate = command_delegate
35
36    def Enable(self):
37        self.shill.manager.EnableTechnology(
38                shill_proxy.ShillProxy.TECHNOLOGY_CELLULAR)
39
40    def Disable(self):
41        self.shill.manager.DisableTechnology(
42                shill_proxy.ShillProxy.TECHNOLOGY_CELLULAR)
43
44    def Connect(self, **kwargs):
45        self.command_delegate.Connect(**kwargs)
46
47    def Disconnect(self):
48        return self.command_delegate.Disconnect()
49
50    def __str__(self):
51        return 'Technology Commands'
52
53
54class ModemCommands():
55    """Control the modem using modem manager DBUS interfaces."""
56    def __init__(self, modem, slow_connect):
57        self.modem = modem
58        self.slow_connect = slow_connect
59
60    def Enable(self):
61        self.modem.Enable(True)
62
63    def Disable(self):
64        self.modem.Enable(False)
65
66    def Connect(self, simple_connect_props):
67        logging.debug('Connecting with properties: %r' % simple_connect_props)
68        self.modem.Connect(simple_connect_props)
69        if self.slow_connect:
70            time.sleep(SLOW_CONNECT_WAIT_SECONDS)
71
72    def Disconnect(self):
73        """
74        Disconnect Modem.
75
76        Returns:
77            True - to indicate that shill may autoconnect again.
78        """
79        try:
80            self.modem.Disconnect()
81        except dbus.DBusException as e:
82            if (e.get_dbus_name() !=
83                    'org.chromium.ModemManager.Error.OperationInitiated'):
84                raise e
85        return True
86
87    def __str__(self):
88        return 'Modem Commands'
89
90
91class DeviceCommands():
92    """Control the modem using shill device interfaces."""
93    def __init__(self, shill, device, slow_connect):
94        self.shill = shill
95        self.device = device
96        self.slow_connect = slow_connect
97        self.service = None
98
99    def GetService(self):
100        service = self.shill.find_cellular_service_object()
101        if not service:
102            raise error.TestFail(
103                'Service failed to appear when using device commands.')
104        return service
105
106    def Enable(self):
107        self.device.Enable(timeout=DEVICE_TIMEOUT)
108
109    def Disable(self):
110        self.service = None
111        self.device.Disable(timeout=DEVICE_TIMEOUT)
112
113    def Connect(self, **kwargs):
114        self.GetService().Connect()
115        if self.slow_connect:
116            time.sleep(SLOW_CONNECT_WAIT_SECONDS)
117
118    def Disconnect(self):
119        """
120        Disconnect Modem.
121
122        Returns:
123            False - to indicate that shill may not autoconnect again.
124        """
125        self.GetService().Disconnect()
126        return False
127
128    def __str__(self):
129        return 'Device Commands'
130
131
132class MixedRandomCommands():
133    """Control the modem using a mixture of commands on device, modems, etc."""
134    def __init__(self, commands_list):
135        self.commands_list = commands_list
136
137    def PickRandomCommands(self):
138        return self.commands_list[random.randrange(len(self.commands_list))]
139
140    def Enable(self):
141        cmds = self.PickRandomCommands()
142        logging.info('Enable with %s' % cmds)
143        cmds.Enable()
144
145    def Disable(self):
146        cmds = self.PickRandomCommands()
147        logging.info('Disable with %s' % cmds)
148        cmds.Disable()
149
150    def Connect(self, **kwargs):
151        cmds = self.PickRandomCommands()
152        logging.info('Connect with %s' % cmds)
153        cmds.Connect(**kwargs)
154
155    def Disconnect(self):
156        cmds = self.PickRandomCommands()
157        logging.info('Disconnect with %s' % cmds)
158        return cmds.Disconnect()
159
160    def __str__(self):
161        return 'Mixed Commands'
162
163
164class network_3GModemControl(test.test):
165    version = 1
166
167    def CompareModemPowerState(self, modem, expected_state):
168        """Compare modem manager power state of a modem to an expected state."""
169        return modem.IsEnabled() == expected_state
170
171    def CompareDevicePowerState(self, device, expected_state):
172        """Compare the shill device power state to an expected state."""
173        device_properties = device.GetProperties(utf8_strings=True);
174        state = device_properties['Powered']
175        logging.info('Device Enabled = %s' % state)
176        return state == expected_state
177
178    def CompareServiceState(self, service, expected_states):
179        """Compare the shill service state to a set of expected states."""
180        if not service:
181            logging.info('Service not found.')
182            return False
183
184        service_properties = service.GetProperties(utf8_strings=True);
185        state = service_properties['State']
186        logging.info('Service State = %s' % state)
187        return state in expected_states
188
189    def EnsureNotConnectingOrDisconnecting(self):
190        """
191        Ensure modem is not connecting or disconnecting.
192
193        Raises:
194            error.TestFail if it timed out waiting for the modem to finish
195            connecting or disconnecting.
196        """
197        # Shill retries a failed connect attempt with a different APN so
198        # check a few times to ensure the modem is not in between connect
199        # attempts.
200        for _ in range(NUM_MODEM_STATE_CHECKS):
201            utils.poll_for_condition(
202                lambda: not self.test_env.modem.IsConnectingOrDisconnecting(),
203                error.TestFail('Timed out waiting for modem to finish ' +
204                               'connecting or disconnecting.'),
205                timeout=SERVICE_TIMEOUT)
206            time.sleep(MODEM_STATE_CHECK_PERIOD_SECONDS)
207
208    def EnsureDisabled(self):
209        """
210        Ensure modem disabled, device powered off, and no service.
211
212        Raises:
213            error.TestFail if the states are not consistent.
214        """
215        utils.poll_for_condition(
216            lambda: self.CompareModemPowerState(self.test_env.modem, False),
217            error.TestFail('Modem failed to enter state Disabled.'))
218        utils.poll_for_condition(
219            lambda: self.CompareDevicePowerState(self.device, False),
220            error.TestFail('Device failed to enter state Powered=False.'))
221        utils.poll_for_condition(
222            lambda: not self.test_env.shill.find_cellular_service_object(),
223            error.TestFail('Service should not be available.'),
224            timeout=SERVICE_TIMEOUT)
225
226    def EnsureEnabled(self, check_idle):
227        """
228        Ensure modem enabled, device powered and service exists.
229
230        Args:
231            check_idle: if True, then ensure that the service is idle
232                        (i.e. not connected) otherwise ignore the
233                        service state
234
235        Raises:
236            error.TestFail if the states are not consistent.
237        """
238        utils.poll_for_condition(
239            lambda: self.CompareModemPowerState(self.test_env.modem, True),
240            error.TestFail('Modem failed to enter state Enabled'))
241        utils.poll_for_condition(
242            lambda: self.CompareDevicePowerState(self.device, True),
243            error.TestFail('Device failed to enter state Powered=True.'),
244            timeout=30)
245
246        service = self.test_env.shill.wait_for_cellular_service_object()
247        if check_idle:
248            utils.poll_for_condition(
249                lambda: self.CompareServiceState(service, ['idle']),
250                error.TestFail('Service failed to enter idle state.'),
251                timeout=SERVICE_TIMEOUT)
252
253    def EnsureConnected(self):
254        """
255        Ensure modem connected, device powered on, service connected.
256
257        Raises:
258            error.TestFail if the states are not consistent.
259        """
260        self.EnsureEnabled(check_idle=False)
261        utils.poll_for_condition(
262            lambda: self.CompareServiceState(
263                    self.test_env.shill.find_cellular_service_object(),
264                    ['ready', 'portal', 'online']),
265            error.TestFail('Service failed to connect.'),
266            timeout=SERVICE_TIMEOUT)
267
268
269    def TestCommands(self, commands):
270        """
271        Manipulate the modem using modem, device or technology commands.
272
273        Changes the state of the modem in various ways including
274        disable while connected and then verifies the state of the
275        modem manager and shill.
276
277        Raises:
278            error.TestFail if the states are not consistent.
279
280        """
281        logging.info('Testing using %s' % commands)
282
283        logging.info('Enabling')
284        commands.Enable()
285        self.EnsureEnabled(check_idle=not self.autoconnect)
286
287        technology_family = self.test_env.modem.GetCurrentTechnologyFamily()
288        if technology_family == cellular.TechnologyFamily.CDMA:
289            simple_connect_props = {'number': r'#777'}
290        else:
291            simple_connect_props = {'number': r'#777', 'apn': self.FindAPN()}
292
293        # Icera modems behave weirdly if we cancel the operation while the
294        # modem is connecting. Work around the issue by waiting until the
295        # connect operation completes.
296        # TODO(benchan): Remove this workaround once the issue is addressed
297        # on the modem side.
298        self.EnsureNotConnectingOrDisconnecting()
299
300        logging.info('Disabling')
301        commands.Disable()
302        self.EnsureDisabled()
303
304        logging.info('Enabling again')
305        commands.Enable()
306        self.EnsureEnabled(check_idle=not self.autoconnect)
307
308        if not self.autoconnect:
309            logging.info('Connecting')
310            commands.Connect(simple_connect_props=simple_connect_props)
311        else:
312            logging.info('Expecting AutoConnect to connect')
313        self.EnsureConnected()
314
315        logging.info('Disconnecting')
316        will_autoreconnect = commands.Disconnect()
317
318        if not (self.autoconnect and will_autoreconnect):
319            # Icera modems behave weirdly if we cancel the operation while the
320            # modem is disconnecting. Work around the issue by waiting until
321            # the disconnect operation completes.
322            # TODO(benchan): Remove this workaround once the issue is addressed
323            # on the modem side.
324            self.EnsureNotConnectingOrDisconnecting()
325
326            self.EnsureEnabled(check_idle=True)
327            logging.info('Connecting manually, since AutoConnect was on')
328            commands.Connect(simple_connect_props=simple_connect_props)
329        self.EnsureConnected()
330
331        logging.info('Disabling')
332        commands.Disable()
333        self.EnsureDisabled()
334
335    def FindAPN(self):
336        default = 'None'
337        service = self.test_env.shill.find_cellular_service_object()
338        props = service.GetProperties()
339        last_good_apn = props.get(
340                cellular_proxy.CellularProxy.SERVICE_PROPERTY_LAST_GOOD_APN,
341                None)
342        if not last_good_apn:
343            return default
344        return last_good_apn.get(
345                cellular_proxy.CellularProxy.APN_INFO_PROPERTY_APN, default)
346
347    def run_once(self, test_env, autoconnect, mixed_iterations=2,
348                 slow_connect=False):
349        self.test_env = test_env
350        self.autoconnect = autoconnect
351
352        with test_env:
353            self.device = self.test_env.shill.find_cellular_device_object()
354
355            modem_commands = ModemCommands(self.test_env.modem,
356                                           slow_connect)
357            technology_commands = TechnologyCommands(self.test_env.shill,
358                                                     modem_commands)
359            device_commands = DeviceCommands(self.test_env.shill,
360                                             self.device,
361                                             slow_connect)
362
363            with cell_tools.AutoConnectContext(self.device,
364                                               self.test_env.flim,
365                                               autoconnect):
366                # Start with cellular disabled.
367                self.test_env.shill.manager.DisableTechnology(
368                        shill_proxy.ShillProxy.TECHNOLOGY_CELLULAR)
369                self.EnsureDisabled()
370
371                # Run the device commands test first to make sure we have
372                # a valid APN needed to connect using the modem commands.
373                self.TestCommands(device_commands)
374                self.TestCommands(technology_commands)
375                self.TestCommands(modem_commands)
376
377                # Run several times using commands mixed from each type
378                mixed = MixedRandomCommands([modem_commands,
379                                             technology_commands,
380                                             device_commands])
381                for _ in range(mixed_iterations):
382                    self.TestCommands(mixed)
383