1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import gobject
6import logging
7import time
8
9from autotest_lib.client.bin import test
10from autotest_lib.client.common_lib import error
11from autotest_lib.client.cros.cellular import modem_utils
12from autotest_lib.client.cros.mainloop import ExceptionForward
13from autotest_lib.client.cros.mainloop import GenericTesterMainLoop
14from autotest_lib.client.cros.networking import shill_proxy
15
16DEFAULT_TEST_TIMEOUT_S = 600
17
18
19class DisableTester(GenericTesterMainLoop):
20  """Base class containing main test logic."""
21  def __init__(self, *args, **kwargs):
22    super(DisableTester, self).__init__(*args, **kwargs)
23
24  @ExceptionForward
25  def perform_one_test(self):
26    """Called by GenericMainTesterMainLoop to execute the test."""
27    self._configure()
28    disable_delay_ms = (
29        self.test_kwargs.get('delay_before_disable_ms', 0) +
30        self.test.iteration *
31        self.test_kwargs.get('disable_delay_per_iteration_ms', 0))
32    gobject.timeout_add(disable_delay_ms, self._start_disable)
33    self._start_test()
34
35  @ExceptionForward
36  def _connect_success_handler(self, *ignored_args):
37    logging.info('connect succeeded')
38    self.requirement_completed('connect')
39
40  @ExceptionForward
41  def _connect_error_handler(self, e):
42    # We disabled while connecting; error is OK
43    logging.info('connect errored: %s', e)
44    self.requirement_completed('connect')
45
46  @ExceptionForward
47  def _start_disable(self):
48    logging.info('disabling')
49    self.disable_start = time.time()
50    self._enable(False)
51
52  @ExceptionForward
53  def _disable_success_handler(self):
54    disable_elapsed = time.time() - self.disable_start
55    self.requirement_completed('disable')
56
57  @ExceptionForward
58  def _get_status_success_handler(self, status):
59    logging.info('Got status')
60    self.requirement_completed('get_status', warn_if_already_completed=False)
61    if self.status_delay_ms:
62      gobject.timeout_add(self.status_delay_ms, self._start_get_status)
63
64  def after_main_loop(self):
65    """Called by GenericTesterMainLoop after the main loop has exited."""
66    enabled = self._enabled()
67    logging.info('Modem enabled: %s', enabled)
68    # Will return happily if no Gobi present
69    modem_utils.ClearGobiModemFaultInjection()
70
71
72class ShillDisableTester(DisableTester):
73  """Tests that disable-while-connecting works at the shill level.
74  Expected control flow:
75
76  * self._configure() called; registers self._disable_property_changed
77    to be called when device is en/disabled
78
79  * Parent class sets a timer that calls self._enable(False) when it expires.
80
81  * _start_test calls _start_connect() which sends a connect request to
82    the device.
83
84  * we wait for the modem to power off, at which point
85    _disable_property_changed (registered above) will get called
86
87  * _disable_property_changed() completes the 'disable' requirement,
88    and we're done.
89
90  """
91  def __init__(self, *args, **kwargs):
92    super(ShillDisableTester, self).__init__(*args, **kwargs)
93
94  def _disable_property_changed(self, property, value, *args, **kwargs):
95    self._disable_success_handler()
96
97  def _start_test(self):
98    # We would love to add requirements based on connect, but in many
99    # scenarios, there is no observable response to a cancelled
100    # connect: We issue a connect, it returns instantly to let us know
101    # that the connect has started, but then the disable takes effect
102    # and the connect fails.  We don't get a state change because no
103    # state change has happened: the modem never got to a different
104    # state before we cancelled
105    self.remaining_requirements = set(['disable'])
106    self._start_connect()
107
108  def _configure(self):
109    self.cellular_device = \
110        self.test.test_env.shill.find_cellular_device_object()
111    if self.cellular_device is None:
112      raise error.TestError("Could not find cellular device")
113
114    self.cellular_service = \
115        self.test.test_env.shill.find_cellular_service_object()
116
117    self.test.test_env.bus.add_signal_receiver(
118            self.dispatch_property_changed,
119            signal_name='PropertyChanged',
120            dbus_interface=self.cellular_device.dbus_interface,
121            path=self.cellular_device.object_path)
122
123  @ExceptionForward
124  def _expect_einprogress_handler(self, e):
125    pass
126
127  def _enable(self, value):
128    self.property_changed_actions['Powered'] = self._disable_property_changed
129
130    if value:
131      self.cellular_device.Enable(
132          reply_handler=self.ignore_handler,
133          error_handler=self._expect_einprogress_handler)
134    else:
135      self.cellular_device.Disable(
136          reply_handler=self.ignore_handler,
137          error_handler=self._expect_einprogress_handler)
138
139  @ExceptionForward
140  def _start_connect(self):
141    logging.info('connecting')
142
143    def _log_connect_event(property, value, *ignored_args):
144      logging.info('%s property changed: %s', property, value)
145
146    self.property_changed_actions['Connected'] = _log_connect_event
147
148    # Contrary to documentation, Connect just returns when it has
149    # fired off the lower-level dbus messages.  So a success means
150    # nothing to us.  But a failure means it didn't even try.
151    self.cellular_service.Connect(
152        reply_handler=self.ignore_handler,
153        error_handler=self.build_error_handler('Connect'))
154
155  def _enabled(self):
156    return self.test.test_env.shill.get_dbus_property(
157            self.cellular_device,
158            shill_proxy.ShillProxy.DEVICE_PROPERTY_POWERED)
159
160
161class ModemDisableTester(DisableTester):
162  """Tests that disable-while-connecting works at the modem-manager level.
163
164  Expected control flow:
165
166  * _configure() is called.
167
168  * Parent class sets a timer that calls self._enable(False) when it
169    expires.
170
171  * _start_test calls _start_connect() which sends a connect request to
172    the device, also sets a timer that calls GetStatus on the modem.
173
174  * wait for all three (connect, disable, get_status) to complete.
175
176  """
177  def __init__(self, *args, **kwargs):
178    super(ModemDisableTester, self).__init__(*args, **kwargs)
179
180  def _is_gobi(self):
181    return 'Gobi' in self.test.test_env.modem.path
182
183  def _start_test(self):
184    self.remaining_requirements = set(['connect', 'disable'])
185
186    # Only cromo/gobi-cromo-plugin maintain the invariant that GetStatus
187    # will always succeed, so we only check it if the modem is a Gobi.
188    if self._is_gobi():
189      self.remaining_requirements.add('get_status')
190      self.status_delay_ms = self.test_kwargs.get('status_delay_ms', 200)
191      gobject.timeout_add(self.status_delay_ms, self._start_get_status)
192
193    self._start_connect()
194
195  def _configure(self):
196    self.simple_modem = self.test.test_env.modem.SimpleModem()
197
198    logging.info('Modem path: %s', self.test.test_env.modem.path)
199
200    if self._is_gobi():
201      self._configure_gobi()
202    else:
203      self._configure_non_gobi()
204
205    service = self.test.test_env.shill.wait_for_cellular_service_object()
206    if not service:
207      raise error.TestError('Modem failed to register with the network after '
208                            're-enabling.')
209
210  def _configure_gobi(self):
211    gobi_modem = self.test.test_env.modem.GobiModem()
212
213    if 'async_connect_sleep_ms' in self.test_kwargs:
214      sleep_ms = self.test_kwargs.get('async_connect_sleep_ms', 0)
215      logging.info('Sleeping %d ms before connect', sleep_ms)
216      gobi_modem.InjectFault('AsyncConnectSleepMs', sleep_ms)
217
218    if 'connect_fails_with_error_sending_qmi_request' in self.test_kwargs:
219      logging.info('Injecting QMI failure')
220      gobi_modem.InjectFault('ConnectFailsWithErrorSendingQmiRequest', 1)
221
222  def _configure_non_gobi(self):
223    # Check to make sure no Gobi-specific arguments were specified.
224    if 'async_connect_sleep_ms' in self.test_kwargs:
225      raise error.TestError('async_connect_sleep_ms on non-Gobi modem')
226    if 'connect_fails_with_error_sending_qmi_request' in self.test_kwargs:
227      raise error.TestError(
228          'connect_fails_with_error_sending_qmi_request on non-Gobi modem')
229
230  @ExceptionForward
231  def _start_connect(self):
232    logging.info('connecting')
233
234    retval = self.simple_modem.Connect(
235        {},
236        reply_handler=self._connect_success_handler,
237        error_handler=self._connect_error_handler)
238    logging.info('connect call made.  retval = %s', retval)
239
240
241  @ExceptionForward
242  def _start_get_status(self):
243    # Keep on calling get_status to make sure it works at all times
244    self.simple_modem.GetStatus(
245        reply_handler=self._get_status_success_handler,
246        error_handler=self.build_error_handler('GetStatus'))
247
248  def _enabled(self):
249    return self.test.test_env.modem.GetModemProperties().get('Enabled', -1)
250
251  def _enable(self, value):
252    self.test.test_env.modem.Enable(
253        value,
254        reply_handler=self._disable_success_handler,
255        error_handler=self.build_error_handler('Enable'))
256
257
258class cellular_DisableWhileConnecting(test.test):
259  """Check that the modem can handle a disconnect while connecting."""
260  version = 1
261
262  def run_once(self, test_env, **kwargs):
263    self.test_env = test_env
264    timeout_s = kwargs.get('timeout_s', DEFAULT_TEST_TIMEOUT_S)
265    gobject_main_loop = gobject.MainLoop()
266
267    with test_env:
268      logging.info('Shill-level test')
269      shill_level_test = ShillDisableTester(self,
270                                            gobject_main_loop,
271                                            timeout_s=timeout_s)
272      shill_level_test.run(**kwargs)
273
274    with test_env:
275      try:
276        logging.info('Modem-level test')
277        modem_level_test = ModemDisableTester(self,
278                                              gobject_main_loop,
279                                              timeout_s=timeout_s)
280        modem_level_test.run(**kwargs)
281      finally:
282        modem_utils.ClearGobiModemFaultInjection()
283