1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import time
7
8from autotest_lib.client.bin import test
9from autotest_lib.client.common_lib import error
10from autotest_lib.client.cros.cellular import mm1_constants
11from autotest_lib.client.cros.cellular import test_environment
12from autotest_lib.client.cros.cellular.pseudomodem import modem_3gpp
13from autotest_lib.client.cros.cellular.pseudomodem import modem_cdma
14from autotest_lib.client.cros.cellular.pseudomodem import pm_errors
15from autotest_lib.client.cros.cellular.pseudomodem import utils as pm_utils
16
17# Use our own connect/disconnect timeout for this test because we are using a
18# a pseudomodem which should run faster than a real modem.
19CONNECT_DISCONNECT_TIMEOUT = 10
20
21
22def _GetModemSuperClass(family):
23    """
24    Obtains the correct Modem base class to use for the given family.
25
26    @param family: The modem family. Should be one of |3GPP|/|CDMA|.
27    @returns: The relevant Modem base class.
28    @raises error.TestError, if |family| is not one of '3GPP' or 'CDMA'.
29
30    """
31    if family == '3GPP':
32        return modem_3gpp.Modem3gpp
33    elif family == 'CDMA':
34        return modem_cdma.ModemCdma
35    else:
36        raise error.TestError('Invalid pseudomodem family: %s', family)
37
38
39def GetModemDisconnectWhileStateIsDisconnecting(family):
40    """
41    Returns a modem that fails on disconnect request.
42
43    @param family: The family of the modem returned.
44    @returns: A modem of the given family that fails disconnect.
45
46    """
47    modem_class = _GetModemSuperClass(family)
48    class _TestModem(modem_class):
49        """ Actual modem implementation. """
50        @pm_utils.log_dbus_method(return_cb_arg='return_cb',
51                                  raise_cb_arg='raise_cb')
52        def Disconnect(
53            self, bearer_path, return_cb, raise_cb, *return_cb_args):
54            """
55            Test implementation of
56            org.freedesktop.ModemManager1.Modem.Simple.Disconnect. Sets the
57            modem state to DISCONNECTING and then fails, fooling shill into
58            thinking that the disconnect failed while disconnecting.
59
60            Refer to modem_simple.ModemSimple.Connect for documentation.
61
62            """
63            logging.info('Simulating failed Disconnect')
64            self.ChangeState(mm1_constants.MM_MODEM_STATE_DISCONNECTING,
65                             mm1_constants.MM_MODEM_STATE_CHANGE_REASON_UNKNOWN)
66            time.sleep(5)
67            raise pm_errors.MMCoreError(pm_errors.MMCoreError.FAILED)
68
69    return _TestModem()
70
71
72def GetModemDisconnectWhileDisconnectInProgress(family):
73    """
74    Returns a modem implementation that fails disconnect except the first one.
75
76    @param family: The family of the returned modem.
77    @returns: A modem of the given family that fails all but the first
78            disconnect attempts.
79
80    """
81    modem_class = _GetModemSuperClass(family)
82    class _TestModem(modem_class):
83        """ The actual modem implementation. """
84        def __init__(self):
85            modem_class.__init__(self)
86            self.disconnect_count = 0
87
88        @pm_utils.log_dbus_method(return_cb_arg='return_cb',
89                                  raise_cb_arg='raise_cb')
90        def Disconnect(
91            self, bearer_path, return_cb, raise_cb, *return_cb_args):
92            """
93            Test implementation of
94            org.freedesktop.ModemManager1.Modem.Simple.Disconnect. Keeps
95            count of successive disconnect operations and fails during all
96            but the first one.
97
98            Refer to modem_simple.ModemSimple.Connect for documentation.
99
100            """
101            # On the first call, set the state to DISCONNECTING.
102            self.disconnect_count += 1
103            if self.disconnect_count == 1:
104                self.ChangeState(
105                        mm1_constants.MM_MODEM_STATE_DISCONNECTING,
106                        mm1_constants.MM_MODEM_STATE_CHANGE_REASON_UNKNOWN)
107                time.sleep(5)
108            else:
109                raise pm_errors.MMCoreError(pm_errors.MMCoreError.FAILED)
110
111    return _TestModem()
112
113
114def GetModemDisconnectFailOther(family):
115    """
116    Returns a modem that fails a disconnect attempt with a generic error.
117
118    @param family: The family of the modem returned.
119    @returns: A modem of the give family that fails disconnect.
120
121    """
122    modem_class = _GetModemSuperClass(family)
123    class _TestModem(modem_class):
124        """ The actual modem implementation. """
125        @pm_utils.log_dbus_method(return_cb_arg='return_cb',
126                                  raise_cb_arg='raise_cb')
127        def Disconnect(
128            self, bearer_path, return_cb, raise_cb, *return_cb_args):
129            """
130            Test implementation of
131            org.freedesktop.ModemManager1.Modem.Simple.Disconnect.
132            Fails with an error.
133
134            Refer to modem_simple.ModemSimple.Connect for documentation.
135
136            """
137            raise pm_errors.MMCoreError(pm_errors.MMCoreError.FAILED)
138
139    return _TestModem()
140
141
142class DisconnectFailTest(object):
143    """
144    DisconnectFailTest implements common functionality in all test cases.
145
146    """
147    def __init__(self, test, pseudomodem_family):
148        self.test = test
149        self._pseudomodem_family = pseudomodem_family
150
151
152    def IsServiceConnected(self):
153        """
154        @return True, if service is connected.
155
156        """
157        service = self.test_env.shill.find_cellular_service_object()
158        properties = service.GetProperties(utf8_strings=True)
159        state = properties.get('State', None)
160        return state in ['portal', 'online']
161
162
163    def IsServiceDisconnected(self):
164        """
165        @return True, if service is disconnected.
166
167        """
168        service = self.test_env.shill.find_cellular_service_object()
169        properties = service.GetProperties(utf8_strings=True)
170        state = properties.get('State', None)
171        return state == 'idle'
172
173
174    def Run(self):
175        """
176        Runs the test.
177
178        @raises test.TestFail, if |test_modem| hasn't been initialized.
179
180        """
181        self.test_env = test_environment.CellularPseudoMMTestEnvironment(
182                pseudomm_args=(
183                        {'test-module' : __file__,
184                         'test-modem-class' : self._GetTestModemFunctorName(),
185                         'test-modem-arg' : [self._pseudomodem_family]},))
186        with self.test_env:
187            self._RunTest()
188
189
190    def _GetTestModemFunctorName(self):
191        """ Returns the modem to be used by the pseudomodem for this test. """
192        raise NotImplementedError()
193
194
195    def _RunTest(self):
196        raise NotImplementedError()
197
198
199class DisconnectWhileStateIsDisconnectingTest(DisconnectFailTest):
200    """
201    Simulates a disconnect failure while the modem is still disconnecting.
202    Fails if the service doesn't remain connected.
203
204    """
205    def _GetTestModemFunctorName(self):
206        return 'GetModemDisconnectWhileStateIsDisconnecting'
207
208
209    def _RunTest(self):
210        # Connect to the service.
211        service = self.test_env.shill.find_cellular_service_object()
212        self.test_env.shill.connect_service_synchronous(
213                service, CONNECT_DISCONNECT_TIMEOUT)
214
215        # Disconnect attempt should fail.
216        self.test_env.shill.disconnect_service_synchronous(
217                service, CONNECT_DISCONNECT_TIMEOUT)
218
219        # Service should remain connected.
220        if not self.IsServiceConnected():
221            raise error.TestError('Service should remain connected after '
222                                  'disconnect failure.')
223
224
225class DisconnectWhileDisconnectInProgressTest(DisconnectFailTest):
226    """
227    Simulates a disconnect failure on successive disconnects. Fails if the
228    service doesn't remain connected.
229
230    """
231    def _GetTestModemFunctorName(self):
232        return 'GetModemDisconnectWhileDisconnectInProgress'
233
234
235    def _RunTest(self):
236        # Connect to the service.
237        service = self.test_env.shill.find_cellular_service_object()
238        self.test_env.shill.connect_service_synchronous(
239                service, CONNECT_DISCONNECT_TIMEOUT)
240
241        # Issue first disconnect. Service should remain connected.
242        self.test_env.shill.disconnect_service_synchronous(
243                service, CONNECT_DISCONNECT_TIMEOUT)
244        if not self.IsServiceConnected():
245            raise error.TestError('Service should remain connected after '
246                                  'first disconnect.')
247
248        # Modem state should be disconnecting.
249        props = self.test_env.modem.GetAll(mm1_constants.I_MODEM)
250        if not props['State'] == mm1_constants.MM_MODEM_STATE_DISCONNECTING:
251            raise error.TestError('Modem should be in the DISCONNECTING state.')
252
253        # Issue second disconnect. Service should remain connected.
254        self.test_env.shill.disconnect_service_synchronous(
255                service, CONNECT_DISCONNECT_TIMEOUT)
256        if not self.IsServiceConnected():
257            raise error.TestError('Service should remain connected after '
258                                  'disconnect failure.')
259
260
261class DisconnectFailOtherTest(DisconnectFailTest):
262    """
263    Simulates a disconnect failure. Fails if the service doesn't disconnect.
264
265    """
266    def _GetTestModemFunctorName(self):
267        return 'GetModemDisconnectFailOther'
268
269
270    def _RunTest(self):
271        # Connect to the service.
272        service = self.test_env.shill.find_cellular_service_object()
273        self.test_env.shill.connect_service_synchronous(
274                service, CONNECT_DISCONNECT_TIMEOUT)
275
276        # Disconnect attempt should fail.
277        self.test_env.shill.disconnect_service_synchronous(
278                service, CONNECT_DISCONNECT_TIMEOUT)
279
280        # Service should be cleaned up as if disconnect succeeded.
281        if not self.IsServiceDisconnected():
282            raise error.TestError('Service should be disconnected.')
283
284
285class cellular_DisconnectFailure(test.test):
286    """
287    The test uses the pseudo modem manager to simulate two failure scenarios of
288    a Disconnect call: failure while the modem state is DISCONNECTING and
289    failure while it is CONNECTED. The expected behavior of shill is to do
290    nothing if the modem state is DISCONNECTING and to clean up the service
291    otherwise.
292
293    """
294    version = 1
295
296    def run_once(self, pseudomodem_family='3GPP'):
297        tests = [
298                DisconnectWhileStateIsDisconnectingTest(self,
299                                                        pseudomodem_family),
300                DisconnectWhileDisconnectInProgressTest(self,
301                                                        pseudomodem_family),
302                DisconnectFailOtherTest(self, pseudomodem_family),
303        ]
304
305        for test in tests:
306            test.Run()
307