1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import dbus
6import logging
7import time
8
9from autotest_lib.client.cros.networking import shill_proxy
10
11
12class CellularProxy(shill_proxy.ShillProxy):
13    """Wrapper around shill dbus interface used by cellular tests."""
14
15    # Properties exposed by shill.
16    DEVICE_PROPERTY_DBUS_OBJECT = 'DBus.Object'
17    DEVICE_PROPERTY_MODEL_ID = 'Cellular.ModelID'
18    DEVICE_PROPERTY_OUT_OF_CREDITS = 'Cellular.OutOfCredits'
19    DEVICE_PROPERTY_SIM_LOCK_STATUS = 'Cellular.SIMLockStatus'
20    DEVICE_PROPERTY_SIM_PRESENT = 'Cellular.SIMPresent'
21    DEVICE_PROPERTY_TECHNOLOGY_FAMILY = 'Cellular.Family'
22    DEVICE_PROPERTY_TECHNOLOGY_FAMILY_CDMA = 'CDMA'
23    DEVICE_PROPERTY_TECHNOLOGY_FAMILY_GSM = 'GSM'
24    SERVICE_PROPERTY_LAST_GOOD_APN = 'Cellular.LastGoodAPN'
25
26    # APN info property names.
27    APN_INFO_PROPERTY_APN = 'apn'
28
29    # Keys into the dictionaries exposed as properties.
30    PROPERTY_KEY_SIM_LOCK_TYPE = 'LockType'
31    PROPERTY_KEY_SIM_LOCK_ENABLED = 'LockEnabled'
32    PROPERTY_KEY_SIM_LOCK_RETRIES_LEFT = 'RetriesLeft'
33
34    # Valid values taken by properties exposed by shill.
35    VALUE_SIM_LOCK_TYPE_PIN = 'sim-pin'
36    VALUE_SIM_LOCK_TYPE_PUK = 'sim-puk'
37
38    # Various timeouts in seconds.
39    SERVICE_CONNECT_TIMEOUT = 60
40    SERVICE_DISCONNECT_TIMEOUT = 60
41    SERVICE_REGISTRATION_TIMEOUT = 60
42    SLEEP_INTERVAL = 0.1
43
44    def set_logging_for_cellular_test(self):
45        """Set the logging in shill for a test of cellular technology.
46
47        Set the log level to |ShillProxy.LOG_LEVEL_FOR_TEST| and the log scopes
48        to the ones defined in |ShillProxy.LOG_SCOPES_FOR_TEST| for
49        |ShillProxy.TECHNOLOGY_CELLULAR|.
50
51        """
52        self.set_logging_for_test(self.TECHNOLOGY_CELLULAR)
53
54
55    def find_cellular_service_object(self):
56        """Returns the first dbus object found that is a cellular service.
57
58        @return DBus object for the first cellular service found. None if no
59                service found.
60
61        """
62        return self.find_object('Service', {'Type': self.TECHNOLOGY_CELLULAR})
63
64
65    def wait_for_cellular_service_object(
66            self, timeout_seconds=SERVICE_REGISTRATION_TIMEOUT):
67        """Waits for the cellular service object to show up.
68
69        @param timeout_seconds: Amount of time to wait for cellular service.
70        @return DBus object for the first cellular service found.
71        @raises ShillProxyError if no cellular service is found within the
72            registration timeout period.
73
74        """
75        CellularProxy._poll_for_condition(
76                lambda: self.find_cellular_service_object() is not None,
77                'Failed to find cellular service object',
78                timeout=timeout_seconds)
79        return self.find_cellular_service_object()
80
81
82    def find_cellular_device_object(self):
83        """Returns the first dbus object found that is a cellular device.
84
85        @return DBus object for the first cellular device found. None if no
86                device found.
87
88        """
89        return self.find_object('Device', {'Type': self.TECHNOLOGY_CELLULAR})
90
91
92    def reset_modem(self, modem, expect_device=True, expect_powered=True,
93                    expect_service=True):
94        """Reset |modem|.
95
96        Do, in sequence,
97        (1) Ensure that the current device object disappears.
98        (2) If |expect_device|, ensure that the device reappears.
99        (3) If |expect_powered|, ensure that the device is powered.
100        (4) If |expect_service|, ensure that the service reappears.
101
102        This function does not check the service state for the device after
103        reset.
104
105        @param modem: DBus object for the modem to reset.
106        @param expect_device: If True, ensure that a DBus object reappears for
107                the same modem after the reset.
108        @param expect_powered: If True, ensure that the modem is powered on
109                after the reset.
110        @param expect_service: If True, ensure that a service managing the
111                reappeared modem also reappears.
112
113        @return (device, service)
114                device: DBus object for the reappeared Device after the reset.
115                service: DBus object for the reappeared Service after the reset.
116                Either of these may be None, if the object is not expected to
117                reappear.
118
119        @raises ShillProxyError if any of the conditions (1)-(4) fail.
120
121        """
122        logging.info('Resetting modem')
123        # Obtain identifying information about the modem.
124        properties = modem.GetProperties(utf8_strings=True)
125        # NOTE: Using the Model ID means that this will break if we have two
126        # identical cellular modems in a DUT. Fortunately, we only support one
127        # modem at a time.
128        model_id = properties.get(self.DEVICE_PROPERTY_MODEL_ID)
129        if not model_id:
130            raise shill_proxy.ShillProxyError(
131                    'Failed to get identifying information for the modem.')
132        old_modem_path = modem.object_path
133        old_modem_mm_object = properties.get(self.DEVICE_PROPERTY_DBUS_OBJECT)
134        if not old_modem_mm_object:
135            raise shill_proxy.ShillProxyError(
136                    'Failed to get the mm object path for the modem.')
137
138        modem.Reset()
139
140        # (1) Wait for the old modem to disappear
141        CellularProxy._poll_for_condition(
142                lambda: self._is_old_modem_gone(old_modem_path,
143                                                old_modem_mm_object),
144                'Old modem disappeared',
145                timeout=60)
146
147        # (2) Wait for the device to reappear
148        if not expect_device:
149            return None, None
150        # The timeout here should be sufficient for our slowest modem to
151        # reappear.
152        CellularProxy._poll_for_condition(
153                lambda: self._get_reappeared_modem(model_id,
154                                                   old_modem_mm_object),
155                desc='The modem reappeared after reset.',
156                timeout=60)
157        new_modem = self._get_reappeared_modem(model_id, old_modem_mm_object)
158
159        # (3) Check powered state of the device
160        if not expect_powered:
161            return new_modem, None
162        success, _, _ = self.wait_for_property_in(new_modem,
163                                                  self.DEVICE_PROPERTY_POWERED,
164                                                  [self.VALUE_POWERED_ON],
165                                                  timeout_seconds=10)
166        if not success:
167            raise shill_proxy.ShillProxyError(
168                    'After modem reset, new modem failed to enter powered '
169                    'state.')
170
171        # (4) Check that service reappears
172        if not expect_service:
173            return new_modem, None
174        new_service = self.get_service_for_device(new_modem)
175        if not new_service:
176            raise shill_proxy.ShillProxyError(
177                    'Failed to find a shill service managing the reappeared '
178                    'device.')
179        return new_modem, new_service
180
181
182    def disable_modem_for_test_setup(self, timeout_seconds=10):
183        """
184        Disables all cellular modems.
185
186        Use this method only for setting up tests.  Do not use this method to
187        test disable functionality because this method repeatedly attempts to
188        disable the cellular technology until it succeeds (ignoring all DBus
189        errors) since the DisableTechnology() call may fail for various reasons
190        (eg. an enable is in progress).
191
192        @param timeout_seconds: Amount of time to wait until the modem is
193                disabled.
194        @raises ShillProxyError if the modems fail to disable within
195                |timeout_seconds|.
196
197        """
198        def _disable_cellular_technology(self):
199            try:
200                self._manager.DisableTechnology(self.TECHNOLOGY_CELLULAR)
201                return True
202            except dbus.DBusException as e:
203                return False
204
205        CellularProxy._poll_for_condition(
206                lambda: _disable_cellular_technology(self),
207                'Failed to disable cellular technology.',
208                timeout=timeout_seconds)
209        modem = self.find_cellular_device_object()
210        self.wait_for_property_in(modem, self.DEVICE_PROPERTY_POWERED,
211                                  [self.VALUE_POWERED_OFF],
212                                  timeout_seconds=timeout_seconds)
213
214
215    # TODO(pprabhu) Use utils.poll_for_condition instead
216    @staticmethod
217    def _poll_for_condition(condition, desc, timeout=10):
218        """Poll till |condition| is satisfied.
219
220        @param condition: A function taking no arguments. The condition is
221                met when the return value can be cast to the bool True.
222        @param desc: The description given when we timeout waiting for
223                |condition|.
224
225        """
226        start_time = time.time()
227        while True:
228            value = condition()
229            if value:
230                return value
231            if(time.time() + CellularProxy.SLEEP_INTERVAL - start_time >
232               timeout):
233                raise shill_proxy.ShillProxyError(
234                        'Timed out waiting for condition %s.' % desc)
235            time.sleep(CellularProxy.SLEEP_INTERVAL)
236
237
238    def _is_old_modem_gone(self, modem_path, modem_mm_object):
239        """Tests if the DBus object for modem disappears after Reset.
240
241        @param modem_path: The DBus path for the modem object that must vanish.
242        @param modem_mm_object: The modemmanager object path reported by the
243            old modem. This is unique everytime a new modem is (re)exposed.
244
245        @return True if the object disappeared, false otherwise.
246
247        """
248        device = self.get_dbus_object(self.DBUS_TYPE_DEVICE, modem_path)
249        try:
250            properties = device.GetProperties()
251            # DBus object exists, perhaps a reappeared device?
252            return (properties.get(self.DEVICE_PROPERTY_DBUS_OBJECT) !=
253                    modem_mm_object)
254        except dbus.DBusException as e:
255            if e.get_dbus_name() == self.DBUS_ERROR_UNKNOWN_OBJECT:
256                return True
257            return False
258
259
260    def _get_reappeared_modem(self, model_id, old_modem_mm_object):
261        """Check that a vanished modem reappers.
262
263        @param model_id: The model ID reported by the vanished modem.
264        @param old_modem_mm_object: The previously reported modemmanager object
265                path for this modem.
266
267        @return The reappeared DBus object, if any. None otherwise.
268
269        """
270        # TODO(pprabhu) This will break if we have multiple cellular devices
271        # in the system at the same time.
272        device = self.find_cellular_device_object()
273        if not device:
274            return None
275        properties = device.GetProperties(utf8_strings=True)
276        if (model_id == properties.get(self.DEVICE_PROPERTY_MODEL_ID) and
277            (old_modem_mm_object !=
278             properties.get(self.DEVICE_PROPERTY_DBUS_OBJECT))):
279            return device
280        return None
281