1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import dbus
6import logging
7import time
8
9from autotest_lib.client.bin import utils
10from autotest_lib.client.cros.networking import shill_proxy
11
12
13class CellularProxy(shill_proxy.ShillProxy):
14    """Wrapper around shill dbus interface used by cellular tests."""
15
16    # Properties exposed by shill.
17    DEVICE_PROPERTY_DBUS_OBJECT = 'DBus.Object'
18    DEVICE_PROPERTY_MODEL_ID = 'Cellular.ModelID'
19    DEVICE_PROPERTY_MANUFACTURER = 'Cellular.Manufacturer'
20    DEVICE_PROPERTY_OUT_OF_CREDITS = 'Cellular.OutOfCredits'
21    DEVICE_PROPERTY_SIM_LOCK_STATUS = 'Cellular.SIMLockStatus'
22    DEVICE_PROPERTY_SIM_PRESENT = 'Cellular.SIMPresent'
23    DEVICE_PROPERTY_TECHNOLOGY_FAMILY = 'Cellular.Family'
24    DEVICE_PROPERTY_TECHNOLOGY_FAMILY_CDMA = 'CDMA'
25    DEVICE_PROPERTY_TECHNOLOGY_FAMILY_GSM = 'GSM'
26    SERVICE_PROPERTY_LAST_GOOD_APN = 'Cellular.LastGoodAPN'
27
28    # APN info property names.
29    APN_INFO_PROPERTY_APN = 'apn'
30
31    # Keys into the dictionaries exposed as properties.
32    PROPERTY_KEY_SIM_LOCK_TYPE = 'LockType'
33    PROPERTY_KEY_SIM_LOCK_ENABLED = 'LockEnabled'
34    PROPERTY_KEY_SIM_LOCK_RETRIES_LEFT = 'RetriesLeft'
35
36    # Valid values taken by properties exposed by shill.
37    VALUE_SIM_LOCK_TYPE_PIN = 'sim-pin'
38    VALUE_SIM_LOCK_TYPE_PUK = 'sim-puk'
39
40    # Various timeouts in seconds.
41    SERVICE_CONNECT_TIMEOUT = 60
42    SERVICE_DISCONNECT_TIMEOUT = 60
43    SERVICE_REGISTRATION_TIMEOUT = 60
44    SLEEP_INTERVAL = 0.1
45
46    def set_logging_for_cellular_test(self):
47        """Set the logging in shill for a test of cellular technology.
48
49        Set the log level to |ShillProxy.LOG_LEVEL_FOR_TEST| and the log scopes
50        to the ones defined in |ShillProxy.LOG_SCOPES_FOR_TEST| for
51        |ShillProxy.TECHNOLOGY_CELLULAR|.
52
53        """
54        self.set_logging_for_test(self.TECHNOLOGY_CELLULAR)
55
56
57    def find_cellular_service_object(self):
58        """Returns the first dbus object found that is a cellular service.
59
60        @return DBus object for the first cellular service found. None if no
61                service found.
62
63        """
64        return self.find_object('Service', {'Type': self.TECHNOLOGY_CELLULAR})
65
66
67    def wait_for_cellular_service_object(
68            self, timeout_seconds=SERVICE_REGISTRATION_TIMEOUT):
69        """Waits for the cellular service object to show up.
70
71        @param timeout_seconds: Amount of time to wait for cellular service.
72        @return DBus object for the first cellular service found.
73        @raises ShillProxyError if no cellular service is found within the
74            registration timeout period.
75
76        """
77        return utils.poll_for_condition(
78                lambda: self.find_cellular_service_object(),
79                exception=shill_proxy.ShillProxyTimeoutError(
80                        'Failed to find cellular service object'),
81                timeout=timeout_seconds)
82
83
84    def find_cellular_device_object(self):
85        """Returns the first dbus object found that is a cellular device.
86
87        @return DBus object for the first cellular device found. None if no
88                device found.
89
90        """
91        return self.find_object('Device', {'Type': self.TECHNOLOGY_CELLULAR})
92
93
94    def reset_modem(self, modem, expect_device=True, expect_powered=True,
95                    expect_service=True):
96        """Reset |modem|.
97
98        Do, in sequence,
99        (1) Ensure that the current device object disappears.
100        (2) If |expect_device|, ensure that the device reappears.
101        (3) If |expect_powered|, ensure that the device is powered.
102        (4) If |expect_service|, ensure that the service reappears.
103
104        This function does not check the service state for the device after
105        reset.
106
107        @param modem: DBus object for the modem to reset.
108        @param expect_device: If True, ensure that a DBus object reappears for
109                the same modem after the reset.
110        @param expect_powered: If True, ensure that the modem is powered on
111                after the reset.
112        @param expect_service: If True, ensure that a service managing the
113                reappeared modem also reappears.
114
115        @return (device, service)
116                device: DBus object for the reappeared Device after the reset.
117                service: DBus object for the reappeared Service after the reset.
118                Either of these may be None, if the object is not expected to
119                reappear.
120
121        @raises ShillProxyError if any of the conditions (1)-(4) fail.
122
123        """
124        logging.info('Resetting modem')
125        # Obtain identifying information about the modem.
126        properties = modem.GetProperties(utf8_strings=True)
127        # NOTE: Using the Model ID means that this will break if we have two
128        # identical cellular modems in a DUT. Fortunately, we only support one
129        # modem at a time.
130        model_id = properties.get(self.DEVICE_PROPERTY_MODEL_ID)
131        if not model_id:
132            raise shill_proxy.ShillProxyError(
133                    'Failed to get identifying information for the modem.')
134        old_modem_path = modem.object_path
135        old_modem_mm_object = properties.get(self.DEVICE_PROPERTY_DBUS_OBJECT)
136        if not old_modem_mm_object:
137            raise shill_proxy.ShillProxyError(
138                    'Failed to get the mm object path for the modem.')
139
140        manufacturer = properties.get(self.DEVICE_PROPERTY_MANUFACTURER)
141        if not manufacturer:
142            raise shill_proxy.ShillProxyError(
143                    'Failed to get the manufacturer for the modem.')
144        if "QUALCOMM" in manufacturer:
145            logging.info(
146                    'Qualcomm modem found. Bypassing modem reset (b/168113309)'
147            )
148            new_modem = modem
149        else:
150            modem.Reset()
151
152            # (1) Wait for the old modem to disappear
153            utils.poll_for_condition(lambda: self._is_old_modem_gone(
154                    old_modem_path, old_modem_mm_object),
155                                     exception=shill_proxy.
156                                     ShillProxyTimeoutError(
157                                             'Old modem disappeared'),
158                                     timeout=60)
159
160            # (2) Wait for the device to reappear
161            if not expect_device:
162                return None, None
163            # The timeout here should be sufficient for our slowest modem to
164            # reappear.
165            new_modem = utils.poll_for_condition(
166                    lambda: self._get_reappeared_modem(model_id,
167                                                       old_modem_mm_object),
168                    exception=shill_proxy.ShillProxyTimeoutError(
169                            'The modem reappeared after reset.'),
170                    timeout=60)
171
172            # (3) Check powered state of the device
173            if not expect_powered:
174                return new_modem, None
175            success, _, _ = self.wait_for_property_in(
176                    new_modem,
177                    self.DEVICE_PROPERTY_POWERED, [self.VALUE_POWERED_ON],
178                    timeout_seconds=10)
179            if not success:
180                raise shill_proxy.ShillProxyError(
181                        'After modem reset, new modem failed to enter powered '
182                        'state.')
183
184        # (4) Check that service reappears
185        if not expect_service:
186            return new_modem, None
187        new_service = self.get_service_for_device(new_modem)
188        if not new_service:
189            raise shill_proxy.ShillProxyError(
190                    'Failed to find a shill service managing the reappeared '
191                    'device.')
192        return new_modem, new_service
193
194
195    def disable_modem_for_test_setup(self, timeout_seconds=10):
196        """
197        Disables all cellular modems.
198
199        Use this method only for setting up tests.  Do not use this method to
200        test disable functionality because this method repeatedly attempts to
201        disable the cellular technology until it succeeds (ignoring all DBus
202        errors) since the DisableTechnology() call may fail for various reasons
203        (eg. an enable is in progress).
204
205        @param timeout_seconds: Amount of time to wait until the modem is
206                disabled.
207        @raises ShillProxyError if the modems fail to disable within
208                |timeout_seconds|.
209
210        """
211        def _disable_cellular_technology(self):
212            try:
213                self._manager.DisableTechnology(self.TECHNOLOGY_CELLULAR)
214                return True
215            except dbus.DBusException as e:
216                return False
217
218        utils.poll_for_condition(
219                lambda: _disable_cellular_technology(self),
220                exception=shill_proxy.ShillProxyTimeoutError(
221                        'Failed to disable cellular technology.'),
222                timeout=timeout_seconds)
223        modem = self.find_cellular_device_object()
224        self.wait_for_property_in(modem, self.DEVICE_PROPERTY_POWERED,
225                                  [self.VALUE_POWERED_OFF],
226                                  timeout_seconds=timeout_seconds)
227
228
229    def _is_old_modem_gone(self, modem_path, modem_mm_object):
230        """Tests if the DBus object for modem disappears after Reset.
231
232        @param modem_path: The DBus path for the modem object that must vanish.
233        @param modem_mm_object: The modemmanager object path reported by the
234            old modem. This is unique everytime a new modem is (re)exposed.
235
236        @return True if the object disappeared, false otherwise.
237
238        """
239        device = self.get_dbus_object(self.DBUS_TYPE_DEVICE, modem_path)
240        try:
241            properties = device.GetProperties()
242            # DBus object exists, perhaps a reappeared device?
243            return (properties.get(self.DEVICE_PROPERTY_DBUS_OBJECT) !=
244                    modem_mm_object)
245        except dbus.DBusException as e:
246            if e.get_dbus_name() == self.DBUS_ERROR_UNKNOWN_OBJECT:
247                return True
248            return False
249
250
251    def _get_reappeared_modem(self, model_id, old_modem_mm_object):
252        """Check that a vanished modem reappers.
253
254        @param model_id: The model ID reported by the vanished modem.
255        @param old_modem_mm_object: The previously reported modemmanager object
256                path for this modem.
257
258        @return The reappeared DBus object, if any. None otherwise.
259
260        """
261        # TODO(pprabhu) This will break if we have multiple cellular devices
262        # in the system at the same time.
263        device = self.find_cellular_device_object()
264        if not device:
265            return None
266        properties = device.GetProperties(utf8_strings=True)
267        if (model_id == properties.get(self.DEVICE_PROPERTY_MODEL_ID) and
268            (old_modem_mm_object !=
269             properties.get(self.DEVICE_PROPERTY_DBUS_OBJECT))):
270            return device
271        return None
272