1# Lint as: python2, python3
2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
6import json, logging, os, pwd, shutil, subprocess, time
8import dbus
10import common
11from autotest_lib.client.bin import utils
12from autotest_lib.client.common_lib import error
13from autotest_lib.client.cros import semiauto_framework
14from autotest_lib.client.cros.power import sys_power
16_USER_TIMEOUT_TIME = 321  # Seconds a tester has to respond to prompts
17_DEVICE_TIMEOUT_TIME = 321  # Seconds a tester has to pair or connect device
18_ADAPTER_INTERFACE = 'org.bluez.Adapter1' # Name of adapter in DBus interface
19_DEVICE_INTERFACE = 'org.bluez.Device1' # Name of a device in DBus interface
20_TIME_FORMAT = '%d %b %Y %H:%M:%S' # Human-readable time format for logs
21_SECTION_BREAK = '='*75
24class BluetoothSemiAutoHelper(semiauto_framework.semiauto_test):
25    """Generic Bluetooth SemiAutoTest.
27    Contains functions needed to implement an actual Bluetooth SemiAutoTest,
28    such as accessing the state of Bluetooth adapter/devices via dbus,
29    opening dialogs with tester via Telemetry browser, and getting log data.
30    """
31    version = 1
33    # Boards without Bluetooth support.
34    _INVALID_BOARDS = ['x86-alex', 'x86-alex_he', 'lumpy']
36    def _err(self, message):
37        """Raise error after first collecting more information.
39        @param message: error message to raise and add to logs.
41        """
42        self.collect_logs('ERROR HAS OCCURED: %s' % message)
43        raise error.TestError(message)
45    def supports_bluetooth(self):
46        """Return True if this device has Bluetooth capabilities; else False."""
47        device = utils.get_board()
48        if device in self._INVALID_BOARDS:
49            logging.info('%s does not have Bluetooth.', device)
50            return False
51        return True
53    def _get_objects(self):
54        """Return the managed objects for this chromebook."""
55        manager = dbus.Interface(
56                self._bus.get_object('org.bluez', '/'),
57                dbus_interface='org.freedesktop.DBus.ObjectManager')
58        return manager.GetManagedObjects()
60    def _get_adapter_info(self):
61        """Return the adapter interface objects, or None if not found."""
62        objects = self._get_objects()
63        for path, interfaces in objects.items():
64            if _ADAPTER_INTERFACE in interfaces:
65                self._adapter_path = path
66                return interfaces[_ADAPTER_INTERFACE]
67        return None
69    def _get_device_info(self, addr):
70        """Return the device interface objects, or None if not found."""
71        objects = self._get_objects()
72        for _, interfaces in objects.items():
73            if _DEVICE_INTERFACE in interfaces:
74                if interfaces[_DEVICE_INTERFACE]['Address'] == addr:
75                    return interfaces[_DEVICE_INTERFACE]
76        return None
78    def _verify_adapter_power(self, adapter_power_status):
79        """Return True/False if adapter power status matches given value."""
80        info = self._get_adapter_info()
81        if not info:
82            self._err('No adapter found!')
83        return True if info['Powered'] == adapter_power_status else False
85    def _verify_device_connection(self, addr, paired_status=True,
86                                  connected_status=True):
87        """Return True/False if device statuses match given values."""
88        def _check_info():
89            info = self._get_device_info(addr)
90            if info:
91                if (info['Paired'] != paired_status or
92                    info['Connected'] != connected_status):
93                    return False
94                return True
95            # Return True if no entry was found for an unpaired device
96            return not paired_status and not connected_status
98        results = _check_info()
100        # To avoid spotting brief connections, sleep and check again.
101        if results:
102            time.sleep(0.5)
103            results = _check_info()
104        return results
106    def set_adapter_power(self, adapter_power_status):
107        """Set adapter power status to match given value via dbus call.
109        Block until the power is set.
111        @param adapter_power_status: True to turn adapter on; False for off.
113        """
114        info = self._get_adapter_info()
115        if not info:
116            self._err('No adapter found!')
117        properties = dbus.Interface(
118                self._bus.get_object('org.bluez', self._adapter_path),
119                dbus_interface='org.freedesktop.DBus.Properties')
120        properties.Set(_ADAPTER_INTERFACE, 'Powered', adapter_power_status)
122        self.poll_adapter_power(adapter_power_status)
124    def poll_adapter_presence(self):
125        """Raise error if adapter is not found after some time."""
126        complete = lambda: self._get_adapter_info() is not None
127        try:
128            utils.poll_for_condition(
129                    condition=complete, timeout=15, sleep_interval=1)
130        except utils.TimeoutError:
131            self._err('No adapter found after polling!')
133    def poll_adapter_power(self, adapter_power_status=True):
134        """Wait until adapter power status matches given value.
136        @param adapter_power_status: True for adapter is on; False for off.
138        """
139        complete = lambda: self._verify_adapter_power(
140                adapter_power_status=adapter_power_status)
141        adapter_str = 'ON' if adapter_power_status else 'OFF'
142        utils.poll_for_condition(
143                condition=complete, timeout=_DEVICE_TIMEOUT_TIME,
144                sleep_interval=1,
145                desc=('Timeout for Bluetooth Adapter to be %s' % adapter_str))
147    def _poll_connection(self, addr, paired_status, connected_status):
148        """Wait until device statuses match given values."""
149        paired_str = 'PAIRED' if paired_status else 'NOT PAIRED'
150        conn_str = 'CONNECTED' if connected_status else 'NOT CONNECTED'
151        message = 'Waiting for device %s to be %s and %s' % (addr, paired_str,
152                                                             conn_str)
153        logging.info(message)
155        complete = lambda: self._verify_device_connection(
156                addr, paired_status=paired_status,
157                connected_status=connected_status)
158        utils.poll_for_condition(
159                condition=complete, timeout=_DEVICE_TIMEOUT_TIME,
160                sleep_interval=1, desc=('Timeout while %s' % message))
162    def poll_connections(self, paired_status=True, connected_status=True):
163        """Wait until all Bluetooth devices have the given statues.
165        @param paired_status: True for device paired; False for unpaired.
166        @param connected_status: True for device connected; False for not.
168        """
169        for addr in self._addrs:
170            self._poll_connection(addr, paired_status=paired_status,
171                                  connected_status=connected_status)
173    def login_and_open_browser(self):
174        """Log in to machine, open browser, and navigate to dialog template.
176        Assumes the existence of 'client/cros/audio/music.mp3' file, and will
177        fail if not found.
179        """
180        # Open browser and interactive tab
181        self.login_and_open_interactive_tab()
183        # Find mounted home directory
184        user_home = None
185        for udir in os.listdir(os.path.join('/', 'home', 'user')):
186            d = os.path.join('/', 'home', 'user', udir)
187            if os.path.ismount(d):
188                user_home = d
189        if user_home is None:
190            raise error.TestError('Could not find mounted home directory')
192        # Setup Audio File
193        audio_dir = os.path.join(self.bindir, '..', '..', 'cros', 'audio')
194        loop_file = os.path.join(audio_dir, 'loop.html')
195        music_file = os.path.join(audio_dir, 'music.mp3')
196        dl_dir = os.path.join(user_home, 'Downloads')
197        self._added_loop_file = os.path.join(dl_dir, 'loop.html')
198        self._added_music_file = os.path.join(dl_dir, 'music.mp3')
199        shutil.copyfile(loop_file, self._added_loop_file)
200        shutil.copyfile(music_file, self._added_music_file)
201        uid = pwd.getpwnam('chronos').pw_uid
202        gid = pwd.getpwnam('chronos').pw_gid
203        os.chmod(self._added_loop_file, 0o755)
204        os.chmod(self._added_music_file, 0o755)
205        os.chown(self._added_loop_file, uid, gid)
206        os.chown(self._added_music_file, uid, gid)
208        # Open Test Dialog tab, Settings tab, and Audio file
209        self._settings_tab = self._browser.tabs.New()
210        self._settings_tab.Navigate('chrome://settings/search#Bluetooth')
211        music_tab = self._browser.tabs.New()
212        music_tab.Navigate('file:///home/chronos/user/Downloads/loop.html')
214    def ask_user(self, message):
215        """Ask the user a yes or no question in an open tab.
217        Reset dialog page to be a question (message param) with 'PASS' and
218        'FAIL' buttons.  Wait for answer.  If no, ask for more information.
220        @param message: string sent to the user via browswer interaction.
222        """
223        logging.info('Asking user "%s"', message)
224        sandbox = 'SANDBOX:<input type="text"/>'
225        html = '<h3>%s</h3>%s' % (message, sandbox)
226        self.set_tab_with_buttons(html, buttons=['PASS', 'FAIL'])
228        # Intepret results.
229        result = self.wait_for_tab_result(timeout=_USER_TIMEOUT_TIME)
230        if result == 1:
231            # Ask for more information on error.
232            html='<h3>Please provide more info:</h3>'
233            self.set_tab_with_textbox(html)
235            # Get explanation of error, clear output, and raise error.
236            result = self.wait_for_tab_result(timeout=_USER_TIMEOUT_TIME)
237            self.clear_output()
238            self._err('Testing %s. "%s".' % (self._test_type, result))
239        elif result != 0:
240            raise error.TestError('Bad dialog value: %s' % result)
241        logging.info('Answer was PASS')
243        # Clear user screen.
244        self.clear_output()
246    def tell_user(self, message):
247        """Tell the user the given message in an open tab.
249        @param message: the text string to be displayed.
251        """
252        logging.info('Telling user "%s"', message)
253        html = '<h3>%s</h3>' % message
254        self.set_tab(html)
256    def check_working(self, message=None):
257        """Steps to check that all devices are functioning.
259        Ask user to connect all devices, verify connections, and ask for
260        user input if they are working.
262        @param message: string of text the user is asked.  Defaults to asking
263                        the user to connect all devices.
265        """
266        if not message:
267            message = ('Please connect all devices.<br>(You may need to '
268                       'click mice, press keyboard keys, or use the '
269                       'Connect button in Settings.)')
270        self.tell_user(message)
271        self.poll_adapter_power(True)
272        self.poll_connections(paired_status=True, connected_status=True)
273        self.ask_user('Are all Bluetooth devices working?<br>'
274                       'Is audio playing only through Bluetooth devices?<br>'
275                       'Do onboard keyboard and trackpad work?')
277    def ask_not_working(self):
278        """Ask the user pre-defined message about NOT working."""
279        self.ask_user('No Bluetooth devices work.<br>Audio is NOT playing '
280                      'through onboard speakers or wired headphones.')
282    def start_dump(self, message=''):
283        """Run btmon in subprocess.
285        Kill previous btmon (if needed) and start new one using current
286        test type as base filename.  Dumps stored in results folder.
288        @param message: string of text added to top of log entry.
290        """
291        if hasattr(self, '_dump') and self._dump:
292            self._dump.kill()
293        if not hasattr(self, '_test_type'):
294            self._test_type = 'test'
295        logging.info('Starting btmon')
296        filename = '%s_btmon' % self._test_type
297        path = os.path.join(self.resultsdir, filename)
298        with open(path, 'a') as f:
299            f.write('%s\n' % _SECTION_BREAK)
300            f.write('%s: Starting btmon\n' % time.strftime(_TIME_FORMAT))
301            f.write('%s\n' % message)
302            f.flush()
303            btmon_path = '/usr/bin/btmon'
304            try:
305                self._dump = subprocess.Popen([btmon_path], stdout=f,
306                                              stderr=subprocess.PIPE)
307            except Exception as e:
308                raise error.TestError('btmon: %s' % e)
310    def collect_logs(self, message=''):
311        """Store results of dbus GetManagedObjects and hciconfig.
313        Use current test type as base filename.  Stored in results folder.
315        @param message: string of text added to top of log entry.
317        """
318        logging.info('Collecting dbus info')
319        if not hasattr(self, '_test_type'):
320            self._test_type = 'test'
321        filename = '%s_dbus' % self._test_type
322        path = os.path.join(self.resultsdir, filename)
323        with open(path, 'a') as f:
324            f.write('%s\n' % _SECTION_BREAK)
325            f.write('%s: %s\n' % (time.strftime(_TIME_FORMAT), message))
326            f.write(json.dumps(list(self._get_objects().items()), indent=2))
327            f.write('\n')
329        logging.info('Collecting hciconfig info')
330        filename = '%s_hciconfig' % self._test_type
331        path = os.path.join(self.resultsdir, filename)
332        with open(path, 'a') as f:
333            f.write('%s\n' % _SECTION_BREAK)
334            f.write('%s: %s\n' % (time.strftime(_TIME_FORMAT), message))
335            f.flush()
336            hciconfig_path = '/usr/bin/hciconfig'
337            try:
338                subprocess.check_call([hciconfig_path, '-a'], stdout=f)
339            except Exception as e:
340                raise error.TestError('hciconfig: %s' % e)
342    def os_idle_time_set(self, reset=False):
343        """Function to set short idle time or to reset to normal.
345        Not using sys_power so that user can use Bluetooth to wake machine.
347        @param reset: true to reset to normal idle time, false for short.
349        """
350        powerd_path = '/usr/bin/set_short_powerd_timeouts'
351        flag = '--reset' if reset else ''
352        try:
353            subprocess.check_call([powerd_path, flag])
354        except Exception as e:
355            raise error.TestError('idle cmd: %s' % e)
357    def os_suspend(self):
358        """Function to suspend ChromeOS using sys_power."""
359        sys_power.do_suspend(5)
361        # Sleep
362        time.sleep(5)
364    def initialize(self):
365        self._bus = dbus.SystemBus()
367    def warmup(self, addrs='', test_phase='client', close_browser=True):
368        """Warmup setting paramters for semi-automated Bluetooth Test.
370        Actual test steps are implemened in run_once() function.
372        @param: addrs: list of MAC address of Bluetooth devices under test.
373        @param: test_phase: for use by server side tests to, for example, call
374                            the same test before and after a reboot.
375        @param: close_browser: True if client side test should close browser
376                               at end of test.
378        """
379        self.login_and_open_browser()
381        self._addrs = addrs
382        self._test_type = 'start'
383        self._test_phase = test_phase
384        self._will_close_browser = close_browser
386    def cleanup(self):
387        """Cleanup of various files/processes opened during test.
389        Closes running btmon, closes browser (if asked to at start), and
390        deletes files added during test.
392        """
393        if hasattr(self, '_dump'):
394            self._dump.kill()
395        if hasattr(self, '_will_close_browser') and self._will_close_browser:
396            self.close_browser()
397        if (hasattr(self, '_added_loop_file')
398                and os.path.exists(self._added_loop_file)):
399            os.remove(self._added_loop_file)
400        if (hasattr(self, '_added_music_file')
401                and os.path.exists(self._added_music_file)):
402            os.remove(self._added_music_file)