1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import dbus
6import logging
7import os
8import time
9
10from autotest_lib.client.bin import test
11from autotest_lib.client.bin import utils
12from autotest_lib.client.common_lib import error
13from autotest_lib.client.cros.cellular import mm1_constants
14from autotest_lib.client.cros.cellular import test_environment
15from autotest_lib.client.cros.networking import pm_proxy
16
17I_ACTIVATION_TEST = 'Interface.LTEActivationTest'
18TEST_MODEMS_MODULE_PATH = os.path.join(os.path.dirname(__file__), 'files',
19                                       'modems.py')
20
21LONG_TIMEOUT = 20
22SHORT_TIMEOUT = 10
23
24class ActivationTest(object):
25    """
26    Super class that implements setup code that is common to the individual
27    tests.
28
29    """
30    def __init__(self, test):
31        self.test = test
32
33
34    def Cleanup(self):
35        """
36        Makes the modem look like it has been activated to satisfy the test
37        end condition.
38
39        """
40        # Set the MDN to a non-zero value, so that shill removes the ICCID from
41        # activating_iccid_store.profile. This way, individual test runs won't
42        # interfere with each other.
43        modem = self.test.pseudomm.wait_for_modem(timeout_seconds=LONG_TIMEOUT)
44        modem.iface_properties.Set(mm1_constants.I_MODEM,
45                                   'OwnNumbers',
46                                   ['1111111111'])
47        # Put the modem in the unknown subscription state so that the mdn value is
48        # used to remove the iccid entry
49        self.test.pseudomm.iface_testing.SetSubscriptionState(
50                mm1_constants.MM_MODEM_3GPP_SUBSCRIPTION_STATE_UNKNOWN,
51                mm1_constants.MM_MODEM_3GPP_SUBSCRIPTION_STATE_UNKNOWN)
52        time.sleep(5)
53        self.test.CheckServiceActivationState('activated')
54
55
56    def Run(self):
57        """
58        Configures the pseudomodem to run with the test modem, runs the test
59        and cleans up.
60
61        """
62        self.RunTest()
63        self.Cleanup()
64
65
66    def TestModemClass(self):
67        """ Returns the name of the custom modem to use for this test. """
68        raise NotImplementedError()
69
70
71    def RunTest(self):
72        """
73        Runs the body of the test. Should be implemented by the subclass.
74
75        """
76        raise NotImplementedError()
77
78
79class ActivationResetTest(ActivationTest):
80    """
81    This test verifies that the modem resets after online payment.
82
83    """
84    def TestModemClass(self):
85        return 'TestModem'
86
87
88    def RunTest(self):
89        # Service should appear as 'not-activated'.
90        self.test.CheckServiceActivationState('not-activated')
91        self.test.CheckResetCalled(False)
92
93        # Call 'CompleteActivation' on the device. The service will become
94        # 'activating' and the modem should reset immediately.
95        # Not checking for the intermediate 'activating' state because it makes
96        # the test too fragile
97        service = self.test.FindCellularService()
98        service.CompleteCellularActivation()
99        time.sleep(SHORT_TIMEOUT)
100        self.test.CheckResetCalled(True)
101
102
103class ActivationCompleteTest(ActivationTest):
104    """
105    This test verifies that the service eventually becomes 'activated' in the
106    case of a post-payment registration and the modem finally registers
107    to a network after a reset.
108
109    """
110    def TestModemClass(self):
111        return 'ResetRequiredForActivationModem'
112
113
114    def RunTest(self):
115        # Service should appear as 'not-activated'.
116        self.test.CheckServiceActivationState('not-activated')
117        self.test.CheckResetCalled(False)
118
119        # Call 'CompleteActivation' on the device. The service will become
120        # 'activating' and the modem should reset immediately.
121        # Not checking for the intermediate 'activating' state because it makes
122        # the test too fragile
123        service = self.test.FindCellularService()
124        service.CompleteCellularActivation()
125        time.sleep(SHORT_TIMEOUT)
126        self.test.CheckResetCalled(True)
127
128        # The service should register and be marked as 'activated'.
129        self.test.CheckServiceActivationState('activated')
130
131
132class ActivationDueToMdnTest(ActivationTest):
133    """
134    This test verifies that a valid MDN should cause the service to get marked
135    as 'activated' when the modem is in unknown subscription state.
136
137    """
138    def TestModemClass(self):
139        return 'TestModem'
140
141
142    def RunTest(self):
143        # Service should appear as 'not-activated'.
144        self.test.CheckServiceActivationState('not-activated')
145
146        # Update the MDN. The service should get marked as activated.
147        modem = self.test.pseudomm.get_modem()
148        modem.iface_properties.Set(mm1_constants.I_MODEM,
149                                   'OwnNumbers',
150                                   ['1111111111'])
151        # Put the modem in the unknown subscription state so that the mdn value is
152        # used to determine the service activation status.
153        self.test.pseudomm.iface_testing.SetSubscriptionState(
154                mm1_constants.MM_MODEM_3GPP_SUBSCRIPTION_STATE_UNKNOWN,
155                mm1_constants.MM_MODEM_3GPP_SUBSCRIPTION_STATE_UNKNOWN)
156        time.sleep(SHORT_TIMEOUT)
157        self.test.CheckServiceActivationState('activated')
158
159
160class network_LTEActivate(test.test):
161    """
162    After an online payment to activate a network, shill keeps track of service
163    activation by monitoring changes to network registration and MDN updates
164    combined with a modem reset. The test checks that the
165    Cellular.ActivationState property of the service has the correct value
166    associated with it by simulating possible scenarios using the pseudo modem
167    manager.
168
169    """
170    version = 1
171
172    def GetModemState(self):
173        """Returns the current ModemManager modem state."""
174        modem = self.pseudomm.get_modem()
175        props = modem.properties(mm1_constants.I_MODEM)
176        return props['State']
177
178
179    def SetResetCalled(self, value):
180        """
181        Sets the value of the "ResetCalled" property of the current
182        modem.
183
184        @param value: Value to set in the property.
185
186        """
187        modem = self.pseudomm.get_modem()
188        if modem is None:
189            return
190        modem.iface_properties.Set(
191                I_ACTIVATION_TEST,
192                'ResetCalled',
193                dbus.types.Boolean(value))
194
195
196    def GetResetCalled(self, modem):
197        """
198        Returns the current value of the "ResetCalled" property of the current
199        modem.
200
201        @param modem: Modem proxy to send the query to.
202
203        """
204        return modem.properties(I_ACTIVATION_TEST)['ResetCalled']
205
206
207    def _CheckResetCalledHelper(self, expected_value):
208        modem = self.pseudomm.get_modem()
209        if modem is None:
210            return False
211        try:
212            return self.GetResetCalled(modem) == expected_value
213        except dbus.exceptions.DBusException as e:
214            name = e.get_dbus_name()
215            if (name == mm1_constants.DBUS_UNKNOWN_METHOD or
216                name == mm1_constants.DBUS_UNKNOWN_OBJECT):
217                return False
218            raise e
219
220
221    def CheckResetCalled(self, expected_value):
222        """
223        Checks that the ResetCalled property on the modem matches the expect
224        value.
225
226        @param expected_value: The expected value of ResetCalled.
227
228        """
229        utils.poll_for_condition(
230            lambda: self._CheckResetCalledHelper(expected_value),
231            exception=error.TestFail("\"ResetCalled\" did not match: " +
232                                     str(expected_value)),
233            timeout=LONG_TIMEOUT)
234
235
236    def EnsureModemStateReached(self, expected_state, timeout):
237        """
238        Asserts that the underlying modem state becomes |expected_state| within
239        |timeout|.
240
241        @param expected_state: The expected modem state.
242        @param timeout: Timeout in which the condition should be met.
243
244        """
245        utils.poll_for_condition(
246                lambda: self.GetModemState() == expected_state,
247                exception=error.TestFail(
248                        'Modem failed to reach state ' +
249                        mm1_constants.ModemStateToString(expected_state)),
250                timeout=timeout)
251
252
253    def CheckServiceActivationState(self, expected_state):
254        """
255        Asserts that the service activation state matches |expected_state|
256        within SHORT_TIMEOUT.
257
258        @param expected_state: The expected service activation state.
259
260        """
261        logging.info('Checking for service activation state: %s',
262                     expected_state)
263        service = self.FindCellularService()
264        success, state, duration = self.test_env.shill.wait_for_property_in(
265            service,
266            'Cellular.ActivationState',
267            [expected_state],
268            SHORT_TIMEOUT)
269        if not success and state != expected_state:
270            raise error.TestError(
271                'Service activation state should be \'%s\', but it is \'%s\'.'
272                % (expected_state, state))
273
274
275    def FindCellularService(self, check_not_none=True):
276        """
277        Returns the current cellular service.
278
279        @param check_not_none: If True, an error will be raised if no service
280                was found.
281
282        """
283        if check_not_none:
284            utils.poll_for_condition(
285                    lambda: (self.test_env.shill.find_cellular_service_object()
286                             is not None),
287                    exception=error.TestError(
288                            'Could not find cellular service within timeout.'),
289                    timeout=LONG_TIMEOUT);
290
291        service = self.test_env.shill.find_cellular_service_object()
292
293        # Check once more, to make sure it's valid.
294        if check_not_none and not service:
295            raise error.TestError('Could not find cellular service.')
296        return service
297
298
299    def FindCellularDevice(self):
300        """Returns the current cellular device."""
301        device = self.test_env.shill.find_cellular_device_object()
302        if not device:
303            raise error.TestError('Could not find cellular device.')
304        return device
305
306
307    def ResetCellularDevice(self):
308        """
309        Resets all modems, guaranteeing that the operation succeeds and doesn't
310        fail due to race conditions in pseudomodem start-up and test execution.
311
312        """
313        self.EnsureModemStateReached(
314                mm1_constants.MM_MODEM_STATE_ENABLED, SHORT_TIMEOUT)
315        self.test_env.shill.reset_modem(self.FindCellularDevice())
316        self.EnsureModemStateReached(
317                mm1_constants.MM_MODEM_STATE_ENABLED, SHORT_TIMEOUT)
318
319
320    def run_once(self):
321        tests = [
322            ActivationResetTest(self),
323            ActivationCompleteTest(self),
324            ActivationDueToMdnTest(self),
325        ]
326
327        for test in tests:
328            self.test_env = test_environment.CellularPseudoMMTestEnvironment(
329                    pseudomm_args = ({'family' : '3GPP',
330                                      'test-module' : TEST_MODEMS_MODULE_PATH,
331                                      'test-modem-class' : test.TestModemClass(),
332                                      'test-sim-class' : 'TestSIM'},))
333            with self.test_env:
334                self.pseudomm = pm_proxy.PseudoMMProxy.get_proxy()
335                # Set the reset flag to False explicitly before each test
336                # sequence starts to ignore the reset as a part of the test init
337                self.SetResetCalled(False)
338                test.Run()
339