1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import gobject
6import logging
7import time
8
9from autotest_lib.client.bin import test
10from autotest_lib.client.common_lib import error
11from autotest_lib.client.cros.cellular import modem_utils
12from autotest_lib.client.cros.mainloop import GenericTesterMainLoop
13from autotest_lib.client.cros.mainloop import ExceptionForward
14
15DEFAULT_TEST_TIMEOUT_S = 600
16
17
18class DisableTester(GenericTesterMainLoop):
19  """Base class containing main test logic."""
20  def __init__(self, *args, **kwargs):
21    super(DisableTester, self).__init__(*args, **kwargs)
22
23  @ExceptionForward
24  def perform_one_test(self):
25    """Called by GenericMainTesterMainLoop to execute the test."""
26    self._configure()
27    disable_delay_ms = (
28        self.test_kwargs.get('delay_before_disable_ms', 0) +
29        self.test.iteration *
30        self.test_kwargs.get('disable_delay_per_iteration_ms', 0))
31    gobject.timeout_add(disable_delay_ms, self._start_disable)
32    self._start_test()
33
34  @ExceptionForward
35  def _connect_success_handler(self, *ignored_args):
36    logging.info('connect succeeded')
37    self.requirement_completed('connect')
38
39  @ExceptionForward
40  def _connect_error_handler(self, e):
41    # We disabled while connecting; error is OK
42    logging.info('connect errored: %s', e)
43    self.requirement_completed('connect')
44
45  @ExceptionForward
46  def _start_disable(self):
47    logging.info('disabling')
48    self.disable_start = time.time()
49    self._enable(False)
50
51  @ExceptionForward
52  def _disable_success_handler(self):
53    disable_elapsed = time.time() - self.disable_start
54    self.requirement_completed('disable')
55
56  @ExceptionForward
57  def _get_status_success_handler(self, status):
58    logging.info('Got status')
59    self.requirement_completed('get_status', warn_if_already_completed=False)
60    if self.status_delay_ms:
61      gobject.timeout_add(self.status_delay_ms, self._start_get_status)
62
63  def after_main_loop(self):
64    """Called by GenericTesterMainLoop after the main loop has exited."""
65    enabled = self._enabled()
66    logging.info('Modem enabled: %s', enabled)
67    # Will return happily if no Gobi present
68    modem_utils.ClearGobiModemFaultInjection()
69
70
71class ShillDisableTester(DisableTester):
72  """Tests that disable-while-connecting works at the shill level.
73  Expected control flow:
74
75  * self._configure() called; registers self._disable_property_changed
76    to be called when device is en/disabled
77
78  * Parent class sets a timer that calls self._enable(False) when it expires.
79
80  * _start_test calls _start_connect() which sends a connect request to
81    the device.
82
83  * we wait for the modem to power off, at which point
84    _disable_property_changed (registered above) will get called
85
86  * _disable_property_changed() completes the 'disable' requirement,
87    and we're done.
88
89  """
90  def __init__(self, *args, **kwargs):
91    super(ShillDisableTester, self).__init__(*args, **kwargs)
92
93  def _disable_property_changed(self, property, value, *args, **kwargs):
94    self._disable_success_handler()
95
96  def _start_test(self):
97    # We would love to add requirements based on connect, but in many
98    # scenarios, there is no observable response to a cancelled
99    # connect: We issue a connect, it returns instantly to let us know
100    # that the connect has started, but then the disable takes effect
101    # and the connect fails.  We don't get a state change because no
102    # state change has happened: the modem never got to a different
103    # state before we cancelled
104    self.remaining_requirements = set(['disable'])
105    self._start_connect()
106
107  def _configure(self):
108    self.cellular_device = \
109        self.test.test_env.shill.find_cellular_device_object()
110    if self.cellular_device is None:
111      raise error.TestError("Could not find cellular device")
112
113    self.cellular_service = \
114        self.test.test_env.shill.find_cellular_service_object()
115
116    self.test.test_env.bus.add_signal_receiver(
117            self.dispatch_property_changed,
118            signal_name='PropertyChanged',
119            dbus_interface=self.cellular_device.dbus_interface,
120            path=self.cellular_device.object_path)
121
122  @ExceptionForward
123  def _expect_einprogress_handler(self, e):
124    pass
125
126  def _enable(self, value):
127    self.property_changed_actions['Powered'] = self._disable_property_changed
128
129    if value:
130      self.cellular_device.Enable(
131          reply_handler=self.ignore_handler,
132          error_handler=self._expect_einprogress_handler)
133    else:
134      self.cellular_device.Disable(
135          reply_handler=self.ignore_handler,
136          error_handler=self._expect_einprogress_handler)
137
138  @ExceptionForward
139  def _start_connect(self):
140    logging.info('connecting')
141
142    def _log_connect_event(property, value, *ignored_args):
143      logging.info('%s property changed: %s', property, value)
144
145    self.property_changed_actions['Connected'] = _log_connect_event
146
147    # Contrary to documentation, Connect just returns when it has
148    # fired off the lower-level dbus messages.  So a success means
149    # nothing to us.  But a failure means it didn't even try.
150    self.cellular_service.Connect(
151        reply_handler=self.ignore_handler,
152        error_handler=self.build_error_handler('Connect'))
153
154  def _enabled(self):
155    return self.cellular_device.GetProperties()['Powered']
156
157
158class ModemDisableTester(DisableTester):
159  """Tests that disable-while-connecting works at the modem-manager level.
160
161  Expected control flow:
162
163  * _configure() is called.
164
165  * Parent class sets a timer that calls self._enable(False) when it
166    expires.
167
168  * _start_test calls _start_connect() which sends a connect request to
169    the device, also sets a timer that calls GetStatus on the modem.
170
171  * wait for all three (connect, disable, get_status) to complete.
172
173  """
174  def __init__(self, *args, **kwargs):
175    super(ModemDisableTester, self).__init__(*args, **kwargs)
176
177  def _is_gobi(self):
178    return 'Gobi' in self.test.test_env.modem.path
179
180  def _start_test(self):
181    self.remaining_requirements = set(['connect', 'disable'])
182
183    # Only cromo/gobi-cromo-plugin maintain the invariant that GetStatus
184    # will always succeed, so we only check it if the modem is a Gobi.
185    if self._is_gobi():
186      self.remaining_requirements.add('get_status')
187      self.status_delay_ms = self.test_kwargs.get('status_delay_ms', 200)
188      gobject.timeout_add(self.status_delay_ms, self._start_get_status)
189
190    self._start_connect()
191
192  def _configure(self):
193    self.simple_modem = self.test.test_env.modem.SimpleModem()
194
195    logging.info('Modem path: %s', self.test.test_env.modem.path)
196
197    if self._is_gobi():
198      self._configure_gobi()
199    else:
200      self._configure_non_gobi()
201
202    service = self.test.test_env.shill.wait_for_cellular_service_object()
203    if not service:
204      raise error.TestError('Modem failed to register with the network after '
205                            're-enabling.')
206
207  def _configure_gobi(self):
208    gobi_modem = self.test.test_env.modem.GobiModem()
209
210    if 'async_connect_sleep_ms' in self.test_kwargs:
211      sleep_ms = self.test_kwargs.get('async_connect_sleep_ms', 0)
212      logging.info('Sleeping %d ms before connect', sleep_ms)
213      gobi_modem.InjectFault('AsyncConnectSleepMs', sleep_ms)
214
215    if 'connect_fails_with_error_sending_qmi_request' in self.test_kwargs:
216      logging.info('Injecting QMI failure')
217      gobi_modem.InjectFault('ConnectFailsWithErrorSendingQmiRequest', 1)
218
219  def _configure_non_gobi(self):
220    # Check to make sure no Gobi-specific arguments were specified.
221    if 'async_connect_sleep_ms' in self.test_kwargs:
222      raise error.TestError('async_connect_sleep_ms on non-Gobi modem')
223    if 'connect_fails_with_error_sending_qmi_request' in self.test_kwargs:
224      raise error.TestError(
225          'connect_fails_with_error_sending_qmi_request on non-Gobi modem')
226
227  @ExceptionForward
228  def _start_connect(self):
229    logging.info('connecting')
230
231    retval = self.simple_modem.Connect(
232        {},
233        reply_handler=self._connect_success_handler,
234        error_handler=self._connect_error_handler)
235    logging.info('connect call made.  retval = %s', retval)
236
237
238  @ExceptionForward
239  def _start_get_status(self):
240    # Keep on calling get_status to make sure it works at all times
241    self.simple_modem.GetStatus(
242        reply_handler=self._get_status_success_handler,
243        error_handler=self.build_error_handler('GetStatus'))
244
245  def _enabled(self):
246    return self.test.test_env.modem.GetModemProperties().get('Enabled', -1)
247
248  def _enable(self, value):
249    self.test.test_env.modem.Enable(
250        value,
251        reply_handler=self._disable_success_handler,
252        error_handler=self.build_error_handler('Enable'))
253
254
255class network_3GDisableWhileConnecting(test.test):
256  """Check that the modem can handle a disconnect while connecting."""
257  version = 1
258
259  def run_once(self, test_env, **kwargs):
260    self.test_env = test_env
261    timeout_s = kwargs.get('timeout_s', DEFAULT_TEST_TIMEOUT_S)
262    gobject_main_loop = gobject.MainLoop()
263
264    with test_env:
265      logging.info('Shill-level test')
266      shill_level_test = ShillDisableTester(self,
267                                            gobject_main_loop,
268                                            timeout_s=timeout_s)
269      shill_level_test.run(**kwargs)
270
271    with test_env:
272      try:
273        logging.info('Modem-level test')
274        modem_level_test = ModemDisableTester(self,
275                                              gobject_main_loop,
276                                              timeout_s=timeout_s)
277        modem_level_test.run(**kwargs)
278      finally:
279        modem_utils.ClearGobiModemFaultInjection()
280