1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import json, logging, os, pwd, shutil, subprocess, time
6
7import dbus
8
9from autotest_lib.client.bin import utils
10from autotest_lib.client.common_lib import error
11from autotest_lib.client.cros import semiauto_framework
12from autotest_lib.client.cros import sys_power
13
14_USER_TIMEOUT_TIME = 321  # Seconds a tester has to respond to prompts
15_DEVICE_TIMEOUT_TIME = 321  # Seconds a tester has to pair or connect device
16_ADAPTER_INTERFACE = 'org.bluez.Adapter1' # Name of adapter in DBus interface
17_DEVICE_INTERFACE = 'org.bluez.Device1' # Name of a device in DBus interface
18_TIME_FORMAT = '%d %b %Y %H:%M:%S' # Human-readable time format for logs
19_SECTION_BREAK = '='*75
20
21
22class BluetoothSemiAutoHelper(semiauto_framework.semiauto_test):
23    """Generic Bluetooth SemiAutoTest.
24
25    Contains functions needed to implement an actual Bluetooth SemiAutoTest,
26    such as accessing the state of Bluetooth adapter/devices via dbus,
27    opening dialogs with tester via Telemetry browser, and getting log data.
28    """
29    version = 1
30
31    # Boards without Bluetooth support.
32    _INVALID_BOARDS = ['x86-alex', 'x86-alex_he', 'lumpy']
33
34    def _err(self, message):
35        """Raise error after first collecting more information.
36
37        @param message: error message to raise and add to logs.
38
39        """
40        self.collect_logs('ERROR HAS OCCURED: %s' % message)
41        raise error.TestError(message)
42
43    def supports_bluetooth(self):
44        """Return True if this device has Bluetooth capabilities; else False."""
45        device = utils.get_board()
46        if device in self._INVALID_BOARDS:
47            logging.info('%s does not have Bluetooth.', device)
48            return False
49        return True
50
51    def _get_objects(self):
52        """Return the managed objects for this chromebook."""
53        manager = dbus.Interface(
54                self._bus.get_object('org.bluez', '/'),
55                dbus_interface='org.freedesktop.DBus.ObjectManager')
56        return manager.GetManagedObjects()
57
58    def _get_adapter_info(self):
59        """Return the adapter interface objects, or None if not found."""
60        objects = self._get_objects()
61        for path, interfaces in objects.items():
62            if _ADAPTER_INTERFACE in interfaces:
63                self._adapter_path = path
64                return interfaces[_ADAPTER_INTERFACE]
65        return None
66
67    def _get_device_info(self, addr):
68        """Return the device interface objects, or None if not found."""
69        objects = self._get_objects()
70        for _, interfaces in objects.items():
71            if _DEVICE_INTERFACE in interfaces:
72                if interfaces[_DEVICE_INTERFACE]['Address'] == addr:
73                    return interfaces[_DEVICE_INTERFACE]
74        return None
75
76    def _verify_adapter_power(self, adapter_power_status):
77        """Return True/False if adapter power status matches given value."""
78        info = self._get_adapter_info()
79        if not info:
80            self._err('No adapter found!')
81        return True if info['Powered'] == adapter_power_status else False
82
83    def _verify_device_connection(self, addr, paired_status=True,
84                                  connected_status=True):
85        """Return True/False if device statuses match given values."""
86        def _check_info():
87            info = self._get_device_info(addr)
88            if info:
89                if (info['Paired'] != paired_status or
90                    info['Connected'] != connected_status):
91                    return False
92                return True
93            # Return True if no entry was found for an unpaired device
94            return not paired_status and not connected_status
95
96        results = _check_info()
97
98        # To avoid spotting brief connections, sleep and check again.
99        if results:
100            time.sleep(0.5)
101            results = _check_info()
102        return results
103
104    def set_adapter_power(self, adapter_power_status):
105        """Set adapter power status to match given value via dbus call.
106
107        Block until the power is set.
108
109        @param adapter_power_status: True to turn adapter on; False for off.
110
111        """
112        info = self._get_adapter_info()
113        if not info:
114            self._err('No adapter found!')
115        properties = dbus.Interface(
116                self._bus.get_object('org.bluez', self._adapter_path),
117                dbus_interface='org.freedesktop.DBus.Properties')
118        properties.Set(_ADAPTER_INTERFACE, 'Powered', adapter_power_status)
119
120        self.poll_adapter_power(adapter_power_status)
121
122    def poll_adapter_presence(self):
123        """Raise error if adapter is not found after some time."""
124        complete = lambda: self._get_adapter_info() is not None
125        try:
126            utils.poll_for_condition(
127                    condition=complete, timeout=15, sleep_interval=1)
128        except utils.TimeoutError:
129            self._err('No adapter found after polling!')
130
131    def poll_adapter_power(self, adapter_power_status=True):
132        """Wait until adapter power status matches given value.
133
134        @param adapter_power_status: True for adapter is on; False for off.
135
136        """
137        complete = lambda: self._verify_adapter_power(
138                adapter_power_status=adapter_power_status)
139        adapter_str = 'ON' if adapter_power_status else 'OFF'
140        utils.poll_for_condition(
141                condition=complete, timeout=_DEVICE_TIMEOUT_TIME,
142                sleep_interval=1,
143                desc=('Timeout for Bluetooth Adapter to be %s' % adapter_str))
144
145    def _poll_connection(self, addr, paired_status, connected_status):
146        """Wait until device statuses match given values."""
147        paired_str = 'PAIRED' if paired_status else 'NOT PAIRED'
148        conn_str = 'CONNECTED' if connected_status else 'NOT CONNECTED'
149        message = 'Waiting for device %s to be %s and %s' % (addr, paired_str,
150                                                             conn_str)
151        logging.info(message)
152
153        complete = lambda: self._verify_device_connection(
154                addr, paired_status=paired_status,
155                connected_status=connected_status)
156        utils.poll_for_condition(
157                condition=complete, timeout=_DEVICE_TIMEOUT_TIME,
158                sleep_interval=1, desc=('Timeout while %s' % message))
159
160    def poll_connections(self, paired_status=True, connected_status=True):
161        """Wait until all Bluetooth devices have the given statues.
162
163        @param paired_status: True for device paired; False for unpaired.
164        @param connected_status: True for device connected; False for not.
165
166        """
167        for addr in self._addrs:
168            self._poll_connection(addr, paired_status=paired_status,
169                                  connected_status=connected_status)
170
171    def login_and_open_browser(self):
172        """Log in to machine, open browser, and navigate to dialog template.
173
174        Assumes the existence of 'client/cros/audio/music.mp3' file, and will
175        fail if not found.
176
177        """
178        # Open browser and interactive tab
179        self.login_and_open_interactive_tab()
180
181        # Find mounted home directory
182        user_home = None
183        for udir in os.listdir(os.path.join('/', 'home', 'user')):
184            d = os.path.join('/', 'home', 'user', udir)
185            if os.path.ismount(d):
186                user_home = d
187        if user_home is None:
188            raise error.TestError('Could not find mounted home directory')
189
190        # Setup Audio File
191        audio_dir = os.path.join(self.bindir, '..', '..', 'cros', 'audio')
192        loop_file = os.path.join(audio_dir, 'loop.html')
193        music_file = os.path.join(audio_dir, 'music.mp3')
194        dl_dir = os.path.join(user_home, 'Downloads')
195        self._added_loop_file = os.path.join(dl_dir, 'loop.html')
196        self._added_music_file = os.path.join(dl_dir, 'music.mp3')
197        shutil.copyfile(loop_file, self._added_loop_file)
198        shutil.copyfile(music_file, self._added_music_file)
199        uid = pwd.getpwnam('chronos').pw_uid
200        gid = pwd.getpwnam('chronos').pw_gid
201        os.chmod(self._added_loop_file, 0755)
202        os.chmod(self._added_music_file, 0755)
203        os.chown(self._added_loop_file, uid, gid)
204        os.chown(self._added_music_file, uid, gid)
205
206        # Open Test Dialog tab, Settings tab, and Audio file
207        self._settings_tab = self._browser.tabs.New()
208        self._settings_tab.Navigate('chrome://settings/search#Bluetooth')
209        music_tab = self._browser.tabs.New()
210        music_tab.Navigate('file:///home/chronos/user/Downloads/loop.html')
211
212    def ask_user(self, message):
213        """Ask the user a yes or no question in an open tab.
214
215        Reset dialog page to be a question (message param) with 'PASS' and
216        'FAIL' buttons.  Wait for answer.  If no, ask for more information.
217
218        @param message: string sent to the user via browswer interaction.
219
220        """
221        logging.info('Asking user "%s"', message)
222        sandbox = 'SANDBOX:<input type="text"/>'
223        html = '<h3>%s</h3>%s' % (message, sandbox)
224        self.set_tab_with_buttons(html, buttons=['PASS', 'FAIL'])
225
226        # Intepret results.
227        result = self.wait_for_tab_result(timeout=_USER_TIMEOUT_TIME)
228        if result == 1:
229            # Ask for more information on error.
230            html='<h3>Please provide more info:</h3>'
231            self.set_tab_with_textbox(html)
232
233            # Get explanation of error, clear output, and raise error.
234            result = self.wait_for_tab_result(timeout=_USER_TIMEOUT_TIME)
235            self.clear_output()
236            self._err('Testing %s. "%s".' % (self._test_type, result))
237        elif result != 0:
238            raise error.TestError('Bad dialog value: %s' % result)
239        logging.info('Answer was PASS')
240
241        # Clear user screen.
242        self.clear_output()
243
244    def tell_user(self, message):
245        """Tell the user the given message in an open tab.
246
247        @param message: the text string to be displayed.
248
249        """
250        logging.info('Telling user "%s"', message)
251        html = '<h3>%s</h3>' % message
252        self.set_tab(html)
253
254    def check_working(self, message=None):
255        """Steps to check that all devices are functioning.
256
257        Ask user to connect all devices, verify connections, and ask for
258        user input if they are working.
259
260        @param message: string of text the user is asked.  Defaults to asking
261                        the user to connect all devices.
262
263        """
264        if not message:
265            message = ('Please connect all devices.<br>(You may need to '
266                       'click mice, press keyboard keys, or use the '
267                       'Connect button in Settings.)')
268        self.tell_user(message)
269        self.poll_adapter_power(True)
270        self.poll_connections(paired_status=True, connected_status=True)
271        self.ask_user('Are all Bluetooth devices working?<br>'
272                       'Is audio playing only through Bluetooth devices?<br>'
273                       'Do onboard keyboard and trackpad work?')
274
275    def ask_not_working(self):
276        """Ask the user pre-defined message about NOT working."""
277        self.ask_user('No Bluetooth devices work.<br>Audio is NOT playing '
278                      'through onboard speakers or wired headphones.')
279
280    def start_dump(self, message=''):
281        """Run btmon in subprocess.
282
283        Kill previous btmon (if needed) and start new one using current
284        test type as base filename.  Dumps stored in results folder.
285
286        @param message: string of text added to top of log entry.
287
288        """
289        if hasattr(self, '_dump') and self._dump:
290            self._dump.kill()
291        if not hasattr(self, '_test_type'):
292            self._test_type = 'test'
293        logging.info('Starting btmon')
294        filename = '%s_btmon' % self._test_type
295        path = os.path.join(self.resultsdir, filename)
296        with open(path, 'a') as f:
297            f.write('%s\n' % _SECTION_BREAK)
298            f.write('%s: Starting btmon\n' % time.strftime(_TIME_FORMAT))
299            f.write('%s\n' % message)
300            f.flush()
301            btmon_path = '/usr/bin/btmon'
302            try:
303                self._dump = subprocess.Popen([btmon_path], stdout=f,
304                                              stderr=subprocess.PIPE)
305            except Exception as e:
306                raise error.TestError('btmon: %s' % e)
307
308    def collect_logs(self, message=''):
309        """Store results of dbus GetManagedObjects and hciconfig.
310
311        Use current test type as base filename.  Stored in results folder.
312
313        @param message: string of text added to top of log entry.
314
315        """
316        logging.info('Collecting dbus info')
317        if not hasattr(self, '_test_type'):
318            self._test_type = 'test'
319        filename = '%s_dbus' % self._test_type
320        path = os.path.join(self.resultsdir, filename)
321        with open(path, 'a') as f:
322            f.write('%s\n' % _SECTION_BREAK)
323            f.write('%s: %s\n' % (time.strftime(_TIME_FORMAT), message))
324            f.write(json.dumps(self._get_objects().items(), indent=2))
325            f.write('\n')
326
327        logging.info('Collecting hciconfig info')
328        filename = '%s_hciconfig' % self._test_type
329        path = os.path.join(self.resultsdir, filename)
330        with open(path, 'a') as f:
331            f.write('%s\n' % _SECTION_BREAK)
332            f.write('%s: %s\n' % (time.strftime(_TIME_FORMAT), message))
333            f.flush()
334            hciconfig_path = '/usr/bin/hciconfig'
335            try:
336                subprocess.check_call([hciconfig_path, '-a'], stdout=f)
337            except Exception as e:
338                raise error.TestError('hciconfig: %s' % e)
339
340    def os_idle_time_set(self, reset=False):
341        """Function to set short idle time or to reset to normal.
342
343        Not using sys_power so that user can use Bluetooth to wake machine.
344
345        @param reset: true to reset to normal idle time, false for short.
346
347        """
348        powerd_path = '/usr/bin/set_short_powerd_timeouts'
349        flag = '--reset' if reset else ''
350        try:
351            subprocess.check_call([powerd_path, flag])
352        except Exception as e:
353            raise error.TestError('idle cmd: %s' % e)
354
355    def os_suspend(self):
356        """Function to suspend ChromeOS using sys_power."""
357        sys_power.do_suspend(5)
358
359        # Sleep
360        time.sleep(5)
361
362    def initialize(self):
363        self._bus = dbus.SystemBus()
364
365    def warmup(self, addrs='', test_phase='client', close_browser=True):
366        """Warmup setting paramters for semi-automated Bluetooth Test.
367
368        Actual test steps are implemened in run_once() function.
369
370        @param: addrs: list of MAC address of Bluetooth devices under test.
371        @param: test_phase: for use by server side tests to, for example, call
372                            the same test before and after a reboot.
373        @param: close_browser: True if client side test should close browser
374                               at end of test.
375
376        """
377        self.login_and_open_browser()
378
379        self._addrs = addrs
380        self._test_type = 'start'
381        self._test_phase = test_phase
382        self._will_close_browser = close_browser
383
384    def cleanup(self):
385        """Cleanup of various files/processes opened during test.
386
387        Closes running btmon, closes browser (if asked to at start), and
388        deletes files added during test.
389
390        """
391        if hasattr(self, '_dump'):
392            self._dump.kill()
393        if hasattr(self, '_will_close_browser') and self._will_close_browser:
394            self.close_browser()
395        if (hasattr(self, '_added_loop_file')
396                and os.path.exists(self._added_loop_file)):
397            os.remove(self._added_loop_file)
398        if (hasattr(self, '_added_music_file')
399                and os.path.exists(self._added_music_file)):
400            os.remove(self._added_music_file)
401