1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from autotest_lib.client.bin import test, utils
6from autotest_lib.client.common_lib import error
7
8import dbus, dbus.mainloop.glib, gobject
9
10class network_SwitchCarrier(test.test):
11    version = 1
12
13    def fail(self, msg):
14        print 'Failed: %s' % msg
15        self.failed = error.TestFail(msg)
16        self.loop.quit()
17
18    def device_added(self, dev, *args, **kwargs):
19        print 'Device added: %s' % dev
20        self.modem = self.bus.get_object(self.CMM, dev)
21        carrier = self.get_carrier()
22        if not carrier:
23            print 'No carrier.'
24            return
25        if carrier != self.to_carrier:
26            self.fail('Wrong carrier: %s != %s' % (carrier, self.to_carrier))
27        if not self.carriers:
28            self.loop.quit() # success!
29            return
30        while len(self.carriers):
31            try:
32                self.to_carrier = self.carriers[0]
33                self.carriers = self.carriers[1:]
34                self.set_carrier(self.to_carrier)
35                break
36            except dbus.exceptions.DBusException, e:
37                if e.get_dbus_message() == "Unknown carrier name":
38                    print 'Ignoring invalid carrier %s' % self.to_carrier
39                    continue
40                raise
41
42    def device_removed(self, *args, **kwargs):
43        print 'Device removed.'
44
45    def waitfor(self, signame, fn):
46        print 'Waiting for %s' % signame
47        self.bus.add_signal_receiver(fn, signal_name=signame,
48                                     dbus_interface=self.IMM)
49
50    def timeout(self):
51        self.fail('Timeout')
52
53    def get_carrier(self):
54        status = self.modem.GetStatus(dbus_interface=self.IMODEM_SIMPLE)
55        if not status or not 'carrier' in status:
56            self.fail('Bogus GetStatus reply: %s' % status)
57            return None
58        return status['carrier']
59
60    def set_carrier(self, c):
61        print 'Switch: ? -> %s' % c
62        self.modem.SetCarrier(c, dbus_interface=self.IMODEM_GOBI)
63
64    def find_modem(self):
65        modems = self.mm.EnumerateDevices(dbus_interface=self.IMM)
66        if modems:
67            self.modem = self.bus.get_object(self.CMM, modems[0])
68        else:
69            self.modem = None
70
71    def run_once(self, start_carrier='Verizon Wireless',
72                 carriers=None,
73                 timeout_secs=90):
74        carriers = carriers or ['Vodafone', 'Sprint', 'Verizon Wireless']
75        self.CMM = 'org.chromium.ModemManager'
76        self.IMM = 'org.freedesktop.ModemManager'
77        self.IMODEM_SIMPLE = self.IMM + '.Modem.Simple'
78        self.IMODEM_GOBI = 'org.chromium.ModemManager.Modem.Gobi'
79        self.failed = None
80        self.carriers = carriers
81        self.to_carrier = None
82        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
83        self.loop = gobject.MainLoop()
84        self.bus = dbus.SystemBus()
85        gobject.timeout_add(timeout_secs * 1000, self.timeout)
86        self.mm = self.bus.get_object(self.CMM, '/org/chromium/ModemManager')
87        self.find_modem()
88        self.waitfor('DeviceRemoved', self.device_removed)
89        self.waitfor('DeviceAdded', self.device_added)
90        carrier = self.get_carrier()
91        if not carrier:
92            raise self.failed
93        self.to_carrier = carrier
94        self.device_added(self.modem.__dbus_object_path__) # start test
95        self.loop.run()
96        self.find_modem()
97        if self.modem and self.to_carrier != carrier:
98            self.set_carrier(carrier)
99        if self.failed:
100            raise self.failed
101