1# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from autotest_lib.client.bin import test, utils
6from autotest_lib.client.common_lib import error
7
8import dbus, dbus.mainloop.glib, gobject
9import glib
10
11from autotest_lib.client.cros import flimflam_test_path
12from autotest_lib.client.cros.cellular import mm
13from autotest_lib.client.cros.mainloop import ExceptionForward
14from autotest_lib.client.cros.mainloop import ExceptionForwardingMainLoop
15
16import flimflam
17
18class State:
19    ENABLING = 0
20    REGISTERING = 1
21    CONNECTING = 2
22    WAITING = 3
23    DISCONNECTING = 4
24    DISABLING = 5
25
26class DormancyTester(ExceptionForwardingMainLoop):
27    def __init__(self, loops, flim, device, *args, **kwargs):
28        self.loopsleft = loops
29        self.flim = flim
30        self.device = device
31        super(DormancyTester, self).__init__(
32                *args, timeout_s=20 * loops + 20, **kwargs)
33
34    def countdown(self):
35        self.loopsleft -= 1
36        print 'Countdown: %d' % (self.loopsleft,)
37        if self.loopsleft == 0:
38            self.quit()
39
40    @ExceptionForward
41    def enable(self):
42        print 'Enabling...'
43        self.state = State.ENABLING
44        self.flim.EnableTechnology('cellular')
45
46    @ExceptionForward
47    def disable(self):
48        print 'Disabling...'
49        self.state = State.DISABLING
50        self.flim.DisableTechnology('cellular')
51
52    @ExceptionForward
53    def connect(self):
54        print 'Connecting...'
55        self.state = State.CONNECTING
56        self.flim.ConnectService(service=self.service, config_timeout=120)
57
58    @ExceptionForward
59    def disconnect(self):
60        print 'Disconnecting...'
61        self.state = State.DISCONNECTING
62        self.flim.DisconnectService(service=self.service, wait_timeout=60)
63
64    @ExceptionForward
65    def PropertyChanged(self, *args, **kwargs):
66        if args[0] == 'Powered':
67            if not args[1]:
68                self.HandleDisabled()
69            else:
70                self.HandleEnabled()
71        elif args[0] == 'Connected':
72            if not args[1]:
73                self.HandleDisconnected()
74            else:
75                self.HandleConnected()
76        elif args[0] == 'Services':
77            self.CheckService()
78
79    @ExceptionForward
80    def DormancyStatus(self, *args, **kwargs):
81        if args[0]:
82            self.HandleDormant()
83        else:
84            self.HandleAwake()
85
86    def FindService(self):
87        self.service = self.flim.FindElementByPropertySubstring('Service',
88                                                                'Type',
89                                                                'cellular')
90
91    def CheckService(self):
92        self.FindService()
93        if self.state == State.REGISTERING and self.service:
94            self.HandleRegistered()
95
96    def HandleDisabled(self):
97        if self.state != State.DISABLING:
98            raise error.TestFail('Disabled while not in state Disabling')
99        print 'Disabled'
100        self.countdown()
101        self.enable()
102
103    def HandleEnabled(self):
104        if self.state != State.ENABLING:
105            raise error.TestFail('Enabled while not in state Enabling')
106        print 'Enabled'
107        self.state = State.REGISTERING
108        print 'Waiting for registration...'
109        self.CheckService()
110
111    def HandleRegistered(self):
112        if self.state != State.REGISTERING:
113            raise error.TestFail('Registered while not in state Registering')
114        print 'Registered'
115        self.connect()
116
117    def HandleConnected(self):
118        if self.state != State.CONNECTING:
119            raise error.TestFail('Connected while not in state Connecting')
120        print 'Connected'
121        self.state = State.WAITING
122        print 'Waiting for dormancy...'
123
124    def HandleDormant(self):
125        if self.state != State.WAITING:
126            print 'Dormant while not in state Waiting; ignoring.'
127            return
128        print 'Dormant'
129        self.disconnect()
130
131    def HandleAwake(self):
132        print 'Awake'
133
134    def HandleDisconnected(self):
135        if self.state != State.DISCONNECTING:
136            raise error.TestFail(
137                'Disconnected while not in state Disconnecting')
138        print 'Disconnected'
139        self.disable()
140
141    def idle(self):
142        connected = False
143        powered = False
144
145        device_props = self.device.GetProperties(utf8_strings = True)
146
147        self.FindService()
148        if self.service:
149            service_props = self.service.GetProperties(utf8_strings = True)
150            if service_props['State'] in ['online', 'portal', 'ready']:
151                connected = True
152            print 'Service exists, and state is %s.' % (service_props['State'],)
153        else:
154            print 'Service does not exist.'
155
156        if device_props['Powered']:
157            print 'Device is powered.'
158            powered = True
159        else:
160            print 'Device is unpowered.'
161
162        if powered and connected:
163            print 'Starting with Disconnect.'
164            self.disconnect()
165        elif powered and (not connected):
166            print 'Starting with Disable.'
167            self.disable()
168        elif (not powered) and (not connected):
169            print 'Starting with Enable.'
170            self.enable()
171        else:
172            raise error.TestFail('Service online but device unpowered!')
173
174
175
176class cellular_GobiDormancyDance(test.test):
177    version = 1
178
179    def FindModemPath(self):
180        for modem in mm.EnumerateDevices():
181            (obj, path) = modem
182            try:
183                if path.index('/org/chromium/ModemManager/Gobi') == 0:
184                    return path
185            except ValueError:
186                pass
187        return None
188
189    def RequestDormancyEvents(self, modem_path):
190        modem = dbus.Interface(
191            self.bus.get_object('org.chromium.ModemManager', modem_path),
192            dbus_interface='org.chromium.ModemManager.Modem.Gobi')
193        modem.RequestEvents('+dormancy')
194
195    def PropertyChanged(self, *args, **kwargs):
196        self.tester.PropertyChanged(*args, **kwargs)
197
198    def DormancyStatus(self, *args, **kwargs):
199        self.tester.DormancyStatus(*args, **kwargs)
200
201    def run_once(self, name='wwan', loops=20, seed=None):
202        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
203        self.bus = dbus.SystemBus()
204
205        main_loop = gobject.MainLoop()
206
207        modem_path = self.FindModemPath()
208        if not modem_path:
209            raise error.TestFail('No Gobi modem found.')
210        print 'Modem: %s' % (modem_path,)
211        self.RequestDormancyEvents(modem_path)
212
213        flim = flimflam.FlimFlam()
214        device = flim.FindElementByNameSubstring('Device', name)
215
216        if not device:
217            device = flim.FindElementByPropertySubstring('Device',
218                                                         'Interface',
219                                                          name)
220        self.bus.add_signal_receiver(self.PropertyChanged,
221                                     signal_name='PropertyChanged')
222        self.bus.add_signal_receiver(self.DormancyStatus,
223                                     signal_name='DormancyStatus')
224        self.tester = DormancyTester(main_loop=main_loop,
225                                     loops=loops, flim=flim, device=device)
226        self.tester.run()
227