1# Lint as: python2, python3
2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import json, logging, os, pwd, shutil, subprocess, time
7
8import dbus
9
10import common
11from autotest_lib.client.bin import utils
12from autotest_lib.client.common_lib import error
13from autotest_lib.client.cros import semiauto_framework
14from autotest_lib.client.cros.power import sys_power
15
16_USER_TIMEOUT_TIME = 321  # Seconds a tester has to respond to prompts
17_DEVICE_TIMEOUT_TIME = 321  # Seconds a tester has to pair or connect device
18_ADAPTER_INTERFACE = 'org.bluez.Adapter1' # Name of adapter in DBus interface
19_DEVICE_INTERFACE = 'org.bluez.Device1' # Name of a device in DBus interface
20_TIME_FORMAT = '%d %b %Y %H:%M:%S' # Human-readable time format for logs
21_SECTION_BREAK = '='*75
22
23
24class BluetoothSemiAutoHelper(semiauto_framework.semiauto_test):
25    """Generic Bluetooth SemiAutoTest.
26
27    Contains functions needed to implement an actual Bluetooth SemiAutoTest,
28    such as accessing the state of Bluetooth adapter/devices via dbus,
29    opening dialogs with tester via Telemetry browser, and getting log data.
30    """
31    version = 1
32
33    # Boards without Bluetooth support.
34    _INVALID_BOARDS = ['x86-alex', 'x86-alex_he', 'lumpy']
35
36    def _err(self, message):
37        """Raise error after first collecting more information.
38
39        @param message: error message to raise and add to logs.
40
41        """
42        self.collect_logs('ERROR HAS OCCURED: %s' % message)
43        raise error.TestError(message)
44
45    def supports_bluetooth(self):
46        """Return True if this device has Bluetooth capabilities; else False."""
47        device = utils.get_board()
48        if device in self._INVALID_BOARDS:
49            logging.info('%s does not have Bluetooth.', device)
50            return False
51        return True
52
53    def _get_objects(self):
54        """Return the managed objects for this chromebook."""
55        manager = dbus.Interface(
56                self._bus.get_object('org.bluez', '/'),
57                dbus_interface='org.freedesktop.DBus.ObjectManager')
58        return manager.GetManagedObjects()
59
60    def _get_adapter_info(self):
61        """Return the adapter interface objects, or None if not found."""
62        objects = self._get_objects()
63        for path, interfaces in objects.items():
64            if _ADAPTER_INTERFACE in interfaces:
65                self._adapter_path = path
66                return interfaces[_ADAPTER_INTERFACE]
67        return None
68
69    def _get_device_info(self, addr):
70        """Return the device interface objects, or None if not found."""
71        objects = self._get_objects()
72        for _, interfaces in objects.items():
73            if _DEVICE_INTERFACE in interfaces:
74                if interfaces[_DEVICE_INTERFACE]['Address'] == addr:
75                    return interfaces[_DEVICE_INTERFACE]
76        return None
77
78    def _verify_adapter_power(self, adapter_power_status):
79        """Return True/False if adapter power status matches given value."""
80        info = self._get_adapter_info()
81        if not info:
82            self._err('No adapter found!')
83        return True if info['Powered'] == adapter_power_status else False
84
85    def _verify_device_connection(self, addr, paired_status=True,
86                                  connected_status=True):
87        """Return True/False if device statuses match given values."""
88        def _check_info():
89            info = self._get_device_info(addr)
90            if info:
91                if (info['Paired'] != paired_status or
92                    info['Connected'] != connected_status):
93                    return False
94                return True
95            # Return True if no entry was found for an unpaired device
96            return not paired_status and not connected_status
97
98        results = _check_info()
99
100        # To avoid spotting brief connections, sleep and check again.
101        if results:
102            time.sleep(0.5)
103            results = _check_info()
104        return results
105
106    def set_adapter_power(self, adapter_power_status):
107        """Set adapter power status to match given value via dbus call.
108
109        Block until the power is set.
110
111        @param adapter_power_status: True to turn adapter on; False for off.
112
113        """
114        info = self._get_adapter_info()
115        if not info:
116            self._err('No adapter found!')
117        properties = dbus.Interface(
118                self._bus.get_object('org.bluez', self._adapter_path),
119                dbus_interface='org.freedesktop.DBus.Properties')
120        properties.Set(_ADAPTER_INTERFACE, 'Powered', adapter_power_status)
121
122        self.poll_adapter_power(adapter_power_status)
123
124    def poll_adapter_presence(self):
125        """Raise error if adapter is not found after some time."""
126        complete = lambda: self._get_adapter_info() is not None
127        try:
128            utils.poll_for_condition(
129                    condition=complete, timeout=15, sleep_interval=1)
130        except utils.TimeoutError:
131            self._err('No adapter found after polling!')
132
133    def poll_adapter_power(self, adapter_power_status=True):
134        """Wait until adapter power status matches given value.
135
136        @param adapter_power_status: True for adapter is on; False for off.
137
138        """
139        complete = lambda: self._verify_adapter_power(
140                adapter_power_status=adapter_power_status)
141        adapter_str = 'ON' if adapter_power_status else 'OFF'
142        utils.poll_for_condition(
143                condition=complete, timeout=_DEVICE_TIMEOUT_TIME,
144                sleep_interval=1,
145                desc=('Timeout for Bluetooth Adapter to be %s' % adapter_str))
146
147    def _poll_connection(self, addr, paired_status, connected_status):
148        """Wait until device statuses match given values."""
149        paired_str = 'PAIRED' if paired_status else 'NOT PAIRED'
150        conn_str = 'CONNECTED' if connected_status else 'NOT CONNECTED'
151        message = 'Waiting for device %s to be %s and %s' % (addr, paired_str,
152                                                             conn_str)
153        logging.info(message)
154
155        complete = lambda: self._verify_device_connection(
156                addr, paired_status=paired_status,
157                connected_status=connected_status)
158        utils.poll_for_condition(
159                condition=complete, timeout=_DEVICE_TIMEOUT_TIME,
160                sleep_interval=1, desc=('Timeout while %s' % message))
161
162    def poll_connections(self, paired_status=True, connected_status=True):
163        """Wait until all Bluetooth devices have the given statues.
164
165        @param paired_status: True for device paired; False for unpaired.
166        @param connected_status: True for device connected; False for not.
167
168        """
169        for addr in self._addrs:
170            self._poll_connection(addr, paired_status=paired_status,
171                                  connected_status=connected_status)
172
173    def login_and_open_browser(self):
174        """Log in to machine, open browser, and navigate to dialog template.
175
176        Assumes the existence of 'client/cros/audio/music.mp3' file, and will
177        fail if not found.
178
179        """
180        # Open browser and interactive tab
181        self.login_and_open_interactive_tab()
182
183        # Find mounted home directory
184        user_home = None
185        for udir in os.listdir(os.path.join('/', 'home', 'user')):
186            d = os.path.join('/', 'home', 'user', udir)
187            if os.path.ismount(d):
188                user_home = d
189        if user_home is None:
190            raise error.TestError('Could not find mounted home directory')
191
192        # Setup Audio File
193        audio_dir = os.path.join(self.bindir, '..', '..', 'cros', 'audio')
194        loop_file = os.path.join(audio_dir, 'loop.html')
195        music_file = os.path.join(audio_dir, 'music.mp3')
196        dl_dir = os.path.join(user_home, 'Downloads')
197        self._added_loop_file = os.path.join(dl_dir, 'loop.html')
198        self._added_music_file = os.path.join(dl_dir, 'music.mp3')
199        shutil.copyfile(loop_file, self._added_loop_file)
200        shutil.copyfile(music_file, self._added_music_file)
201        uid = pwd.getpwnam('chronos').pw_uid
202        gid = pwd.getpwnam('chronos').pw_gid
203        os.chmod(self._added_loop_file, 0o755)
204        os.chmod(self._added_music_file, 0o755)
205        os.chown(self._added_loop_file, uid, gid)
206        os.chown(self._added_music_file, uid, gid)
207
208        # Open Test Dialog tab, Settings tab, and Audio file
209        self._settings_tab = self._browser.tabs.New()
210        self._settings_tab.Navigate('chrome://settings/search#Bluetooth')
211        music_tab = self._browser.tabs.New()
212        music_tab.Navigate('file:///home/chronos/user/Downloads/loop.html')
213
214    def ask_user(self, message):
215        """Ask the user a yes or no question in an open tab.
216
217        Reset dialog page to be a question (message param) with 'PASS' and
218        'FAIL' buttons.  Wait for answer.  If no, ask for more information.
219
220        @param message: string sent to the user via browswer interaction.
221
222        """
223        logging.info('Asking user "%s"', message)
224        sandbox = 'SANDBOX:<input type="text"/>'
225        html = '<h3>%s</h3>%s' % (message, sandbox)
226        self.set_tab_with_buttons(html, buttons=['PASS', 'FAIL'])
227
228        # Intepret results.
229        result = self.wait_for_tab_result(timeout=_USER_TIMEOUT_TIME)
230        if result == 1:
231            # Ask for more information on error.
232            html='<h3>Please provide more info:</h3>'
233            self.set_tab_with_textbox(html)
234
235            # Get explanation of error, clear output, and raise error.
236            result = self.wait_for_tab_result(timeout=_USER_TIMEOUT_TIME)
237            self.clear_output()
238            self._err('Testing %s. "%s".' % (self._test_type, result))
239        elif result != 0:
240            raise error.TestError('Bad dialog value: %s' % result)
241        logging.info('Answer was PASS')
242
243        # Clear user screen.
244        self.clear_output()
245
246    def tell_user(self, message):
247        """Tell the user the given message in an open tab.
248
249        @param message: the text string to be displayed.
250
251        """
252        logging.info('Telling user "%s"', message)
253        html = '<h3>%s</h3>' % message
254        self.set_tab(html)
255
256    def check_working(self, message=None):
257        """Steps to check that all devices are functioning.
258
259        Ask user to connect all devices, verify connections, and ask for
260        user input if they are working.
261
262        @param message: string of text the user is asked.  Defaults to asking
263                        the user to connect all devices.
264
265        """
266        if not message:
267            message = ('Please connect all devices.<br>(You may need to '
268                       'click mice, press keyboard keys, or use the '
269                       'Connect button in Settings.)')
270        self.tell_user(message)
271        self.poll_adapter_power(True)
272        self.poll_connections(paired_status=True, connected_status=True)
273        self.ask_user('Are all Bluetooth devices working?<br>'
274                       'Is audio playing only through Bluetooth devices?<br>'
275                       'Do onboard keyboard and trackpad work?')
276
277    def ask_not_working(self):
278        """Ask the user pre-defined message about NOT working."""
279        self.ask_user('No Bluetooth devices work.<br>Audio is NOT playing '
280                      'through onboard speakers or wired headphones.')
281
282    def start_dump(self, message=''):
283        """Run btmon in subprocess.
284
285        Kill previous btmon (if needed) and start new one using current
286        test type as base filename.  Dumps stored in results folder.
287
288        @param message: string of text added to top of log entry.
289
290        """
291        if hasattr(self, '_dump') and self._dump:
292            self._dump.kill()
293        if not hasattr(self, '_test_type'):
294            self._test_type = 'test'
295        logging.info('Starting btmon')
296        filename = '%s_btmon' % self._test_type
297        path = os.path.join(self.resultsdir, filename)
298        with open(path, 'a') as f:
299            f.write('%s\n' % _SECTION_BREAK)
300            f.write('%s: Starting btmon\n' % time.strftime(_TIME_FORMAT))
301            f.write('%s\n' % message)
302            f.flush()
303            btmon_path = '/usr/bin/btmon'
304            try:
305                self._dump = subprocess.Popen([btmon_path], stdout=f,
306                                              stderr=subprocess.PIPE)
307            except Exception as e:
308                raise error.TestError('btmon: %s' % e)
309
310    def collect_logs(self, message=''):
311        """Store results of dbus GetManagedObjects and hciconfig.
312
313        Use current test type as base filename.  Stored in results folder.
314
315        @param message: string of text added to top of log entry.
316
317        """
318        logging.info('Collecting dbus info')
319        if not hasattr(self, '_test_type'):
320            self._test_type = 'test'
321        filename = '%s_dbus' % self._test_type
322        path = os.path.join(self.resultsdir, filename)
323        with open(path, 'a') as f:
324            f.write('%s\n' % _SECTION_BREAK)
325            f.write('%s: %s\n' % (time.strftime(_TIME_FORMAT), message))
326            f.write(json.dumps(list(self._get_objects().items()), indent=2))
327            f.write('\n')
328
329        logging.info('Collecting hciconfig info')
330        filename = '%s_hciconfig' % self._test_type
331        path = os.path.join(self.resultsdir, filename)
332        with open(path, 'a') as f:
333            f.write('%s\n' % _SECTION_BREAK)
334            f.write('%s: %s\n' % (time.strftime(_TIME_FORMAT), message))
335            f.flush()
336            hciconfig_path = '/usr/bin/hciconfig'
337            try:
338                subprocess.check_call([hciconfig_path, '-a'], stdout=f)
339            except Exception as e:
340                raise error.TestError('hciconfig: %s' % e)
341
342    def os_idle_time_set(self, reset=False):
343        """Function to set short idle time or to reset to normal.
344
345        Not using sys_power so that user can use Bluetooth to wake machine.
346
347        @param reset: true to reset to normal idle time, false for short.
348
349        """
350        powerd_path = '/usr/bin/set_short_powerd_timeouts'
351        flag = '--reset' if reset else ''
352        try:
353            subprocess.check_call([powerd_path, flag])
354        except Exception as e:
355            raise error.TestError('idle cmd: %s' % e)
356
357    def os_suspend(self):
358        """Function to suspend ChromeOS using sys_power."""
359        sys_power.do_suspend(5)
360
361        # Sleep
362        time.sleep(5)
363
364    def initialize(self):
365        self._bus = dbus.SystemBus()
366
367    def warmup(self, addrs='', test_phase='client', close_browser=True):
368        """Warmup setting paramters for semi-automated Bluetooth Test.
369
370        Actual test steps are implemened in run_once() function.
371
372        @param: addrs: list of MAC address of Bluetooth devices under test.
373        @param: test_phase: for use by server side tests to, for example, call
374                            the same test before and after a reboot.
375        @param: close_browser: True if client side test should close browser
376                               at end of test.
377
378        """
379        self.login_and_open_browser()
380
381        self._addrs = addrs
382        self._test_type = 'start'
383        self._test_phase = test_phase
384        self._will_close_browser = close_browser
385
386    def cleanup(self):
387        """Cleanup of various files/processes opened during test.
388
389        Closes running btmon, closes browser (if asked to at start), and
390        deletes files added during test.
391
392        """
393        if hasattr(self, '_dump'):
394            self._dump.kill()
395        if hasattr(self, '_will_close_browser') and self._will_close_browser:
396            self.close_browser()
397        if (hasattr(self, '_added_loop_file')
398                and os.path.exists(self._added_loop_file)):
399            os.remove(self._added_loop_file)
400        if (hasattr(self, '_added_music_file')
401                and os.path.exists(self._added_music_file)):
402            os.remove(self._added_music_file)
403