1# Copyright 2016 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Server side bluetooth adapter subtests."""
6
7import functools
8import logging
9import re
10import time
11
12from autotest_lib.client.bin import utils
13from autotest_lib.client.bin.input import input_event_recorder as recorder
14from autotest_lib.client.common_lib import error
15from autotest_lib.server import test
16from autotest_lib.client.bin.input.linux_input import (
17        BTN_LEFT, BTN_RIGHT, EV_KEY, EV_REL, REL_X, REL_Y, REL_WHEEL)
18
19Event = recorder.Event
20
21
22# Delay binding the methods since host is only available at run time.
23SUPPORTED_DEVICE_TYPES = {
24        'MOUSE': lambda host: host.chameleon.get_bluetooh_hid_mouse}
25
26
27def _run_method(method, method_name, *args, **kwargs):
28    """Run a target method and capture exceptions if any.
29
30    This is just a wrapper of the target method so that we do not need to
31    write the exception capturing structure repeatedly. The method could
32    be either a device method or a facade method.
33
34    @param method: the method to run
35    @param method_name: the name of the method
36
37    @returns: the return value of target method() if successful.
38              False otherwise.
39
40    """
41    result = False
42    try:
43        result = method(*args, **kwargs)
44    except Exception as e:
45        logging.error('%s: %s', method_name, e)
46    except:
47        logging.error('%s: unexpected error', method_name)
48    return result
49
50
51def get_bluetooth_emulated_device(host, device_type):
52    """Get the bluetooth emulated device object.
53
54    @param host: the DUT, usually a chromebook
55    @param device_type : the bluetooth HID device type, e.g., 'MOUSE'
56
57    @returns: the bluetooth device object
58
59    """
60
61    def _retry_device_method(method_name):
62        """retry the emulated device's method.
63
64        The method is invoked as device.xxxx() e.g., device.GetChipName().
65
66        Note that the method name string is provided to get the device's actual
67        method object at run time through getattr(). The rebinding is required
68        because a new device may have been created previously or during the
69        execution of fix_serial_device().
70
71        Given a device's method, it is not feasible to get the method name
72        through __name__ attribute. This limitation is due to the fact that
73        the device is a dotted object of an XML RPC server proxy.
74        As an example, with the method name 'GetChipName', we could derive the
75        correspoinding method device.GetChipName. On the contrary, given
76        device.GetChipName, it is not feasible to get the method name by
77        device.GetChipName.__name__
78
79        Also note that if the device method fails at the first time, we would
80        try to fix the problem by re-creating the serial device and see if the
81        problem is fixed. If not, we will reboot the chameleon board and see
82        if the problem is fixed. If yes, execute the target method the second
83        time.
84
85        @param method_name: the string of the method name.
86
87        @returns: the result returned by the device's method.
88
89        """
90        result = _run_method(getattr(device, method_name), method_name)
91        if _is_successful(result):
92            return result
93
94        logging.error('%s failed the 1st time. Try to fix the serial device.',
95                      method_name)
96
97        # Try to fix the serial device if possible.
98        if not fix_serial_device(host, device):
99            return False
100
101        logging.info('%s: retry the 2nd time.', method_name)
102        return _run_method(getattr(device, method_name), method_name)
103
104
105    if device_type not in SUPPORTED_DEVICE_TYPES:
106        raise error.TestError('The device type is not supported: %s',
107                              device_type)
108
109    # Get the bluetooth device object and query some important properties.
110    device = SUPPORTED_DEVICE_TYPES[device_type](host)()
111
112    _retry_device_method('Init')
113    logging.info('device type: %s', device_type)
114
115    device.name = _retry_device_method('GetChipName')
116    logging.info('device name: %s', device.name)
117
118    device.address = _retry_device_method('GetLocalBluetoothAddress')
119    logging.info('address: %s', device.address)
120
121    device.pin = _retry_device_method('GetPinCode')
122    logging.info('pin: %s', device.pin)
123
124    device.class_of_service = _retry_device_method('GetClassOfService')
125    logging.info('class of service: 0x%04X', device.class_of_service)
126
127    device.class_of_device = _retry_device_method('GetClassOfDevice')
128    logging.info('class of device: 0x%04X', device.class_of_device)
129
130    device.device_type = _retry_device_method('GetHIDDeviceType')
131    logging.info('device type: %s', device.device_type)
132
133    device.authenticaiton_mode = _retry_device_method('GetAuthenticationMode')
134    logging.info('authentication mode: %s', device.authenticaiton_mode)
135
136    device.port = _retry_device_method('GetPort')
137    logging.info('serial port: %s\n', device.port)
138
139    return device
140
141
142def recreate_serial_device(device):
143    """Create and connect to a new serial device.
144
145    @param device: the bluetooth HID device
146
147    @returns: True if the serial device is re-created successfully.
148
149    """
150    logging.info('Remove the old serial device and create a new one.')
151    if device is not None:
152        try:
153            device.Close()
154        except:
155            logging.error('failed to close the serial device.')
156            return False
157    try:
158        device.CreateSerialDevice()
159        return True
160    except:
161        logging.error('failed to invoke CreateSerialDevice.')
162        return False
163
164
165def _reboot_chameleon(host, device):
166    REBOOT_SLEEP_SECS = 40
167
168    # Close the bluetooth peripheral device and reboot the chameleon board.
169    device.Close()
170    logging.info('rebooting chameleon...')
171    host.chameleon.reboot()
172
173    # Every chameleon reboot would take a bit more than REBOOT_SLEEP_SECS.
174    # Sleep REBOOT_SLEEP_SECS and then begin probing the chameleon board.
175    time.sleep(REBOOT_SLEEP_SECS)
176
177    # Check if the serial device could initialize, connect, and
178    # enter command mode correctly.
179    logging.info('Checking device status...')
180    if not _run_method(device.Init, 'Init'):
181        logging.info('device.Init: failed after reboot')
182        return False
183    if not device.CheckSerialConnection():
184        logging.info('device.CheckSerialConnection: failed after reboot')
185        return False
186    if not _run_method(device.EnterCommandMode, 'EnterCommandMode'):
187        logging.info('device.EnterCommandMode: failed after reboot')
188        return False
189    logging.info('The device is created successfully after reboot.')
190    return True
191
192
193def _is_successful(result):
194    """Is the method result successful?
195
196    @param result: a method result
197
198    @returns: True if bool(result) is True or result is 0.
199              Some method result, e.g., class_of_service, may be 0
200              which is considered a valid result.
201
202    """
203    return bool(result) or result is 0
204
205
206def fix_serial_device(host, device):
207    """Fix the serial device.
208
209    This function tries to fix the serial device by
210    (1) re-creating a serial device, or
211    (2) rebooting the chameleon board.
212
213    @param host: the DUT, usually a chromebook
214    @param device: the bluetooth HID device
215
216    @returns: True if the serial device is fixed. False otherwise.
217
218    """
219    # Check the serial connection. Fix it if needed.
220    if device.CheckSerialConnection():
221        # The USB serial connection still exists.
222        # Re-connection suffices to solve the problem. The problem
223        # is usually caused by serial port change. For example,
224        # the serial port changed from /dev/ttyUSB0 to /dev/ttyUSB1.
225        logging.info('retry: creating a new serial device...')
226        if not recreate_serial_device(device):
227            return False
228
229    # Check if recreate_serial_device() above fixes the problem.
230    # If not, reboot the chameleon board including creation of a new
231    # bluetooth device. Check if reboot fixes the problem.
232    # If not, return False.
233    result = _run_method(device.EnterCommandMode, 'EnterCommandMode')
234    return _is_successful(result) or _reboot_chameleon(host, device)
235
236
237def retry(test_method, instance, *args, **kwargs):
238    """Execute the target facade test_method(). Retry if failing the first time.
239
240    A test_method is something like self.test_xxxx() in BluetoothAdapterTests,
241    e.g., BluetoothAdapterTests.test_bluetoothd_running().
242
243    @param test_method: the test method to retry
244
245    @returns: True if the return value of test_method() is successful.
246              False otherwise.
247
248    """
249    if _is_successful(_run_method(test_method, test_method.__name__,
250                                  instance, *args, **kwargs)):
251        return True
252
253    # Try to fix the serial device if applicable.
254    logging.error('%s failed at the 1st time.', test_method.__name__)
255
256    # If this test does not use any attached serial device, just re-run
257    # the test.
258    logging.info('%s: retry the 2nd time.', test_method.__name__)
259    time.sleep(1)
260    if not hasattr(instance, 'device_type'):
261        return _is_successful(_run_method(test_method, test_method.__name__,
262                                          instance, *args, **kwargs))
263
264    host = instance.host
265    device = instance.devices[instance.device_type]
266    if not fix_serial_device(host, device):
267        return False
268
269    logging.info('%s: retry the 2nd time.', test_method.__name__)
270    return _is_successful(_run_method(test_method, test_method.__name__,
271                                      instance, *args, **kwargs))
272
273
274def _test_retry_and_log(test_method_or_retry_flag):
275    """A decorator that logs test results, collects error messages, and retries
276       on request.
277
278    @param test_method_or_retry_flag: either the test_method or a retry_flag.
279        There are some possibilities of this argument:
280        1. the test_method to conduct and retry: should retry the test_method.
281            This occurs with
282            @_test_retry_and_log
283        2. the retry flag is True. Should retry the test_method.
284            This occurs with
285            @_test_retry_and_log(True)
286        3. the retry flag is False. Do not retry the test_method.
287            This occurs with
288            @_test_retry_and_log(False)
289
290    @returns: a wrapper of the test_method with test log. The retry mechanism
291        would depend on the retry flag.
292
293    """
294
295    def decorator(test_method):
296        """A decorator wrapper of the decorated test_method.
297
298        @param test_method: the test method being decorated.
299
300        @returns the wrapper of the test method.
301
302        """
303        @functools.wraps(test_method)
304        def wrapper(instance, *args, **kwargs):
305            """A wrapper of the decorated method.
306
307            @param instance: an BluetoothAdapterTests instance
308
309            @returns the result of the test method
310
311            """
312            instance.results = None
313            if callable(test_method_or_retry_flag) or test_method_or_retry_flag:
314                test_result = retry(test_method, instance, *args, **kwargs)
315            else:
316                test_result = test_method(instance, *args, **kwargs)
317
318            if test_result:
319                logging.info('[*** passed: %s]', test_method.__name__)
320            else:
321                fail_msg = '[--- failed: %s (%s)]' % (test_method.__name__,
322                                                      str(instance.results))
323                logging.error(fail_msg)
324                instance.fails.append(fail_msg)
325            return test_result
326        return wrapper
327
328    if callable(test_method_or_retry_flag):
329        # If the decorator function comes with no argument like
330        # @_test_retry_and_log
331        return decorator(test_method_or_retry_flag)
332    else:
333        # If the decorator function comes with an argument like
334        # @_test_retry_and_log(False)
335        return decorator
336
337
338def test_case_log(method):
339    """A decorator for test case methods.
340
341    The main purpose of this decorator is to display the test case name
342    in the test log which looks like
343
344        <... test_case_RA3_CD_SI200_CD_PC_CD_UA3 ...>
345
346    @param method: the test case method to decorate.
347
348    @returns: a wrapper function of the decorated method.
349
350    """
351    @functools.wraps(method)
352    def wrapper(instance, *args, **kwargs):
353        logging.info('\n<... %s ...>', method.__name__)
354        method(instance, *args, **kwargs)
355    return wrapper
356
357
358class BluetoothAdapterTests(test.test):
359    """Server side bluetooth adapter tests.
360
361    This test class tries to thoroughly verify most of the important work
362    states of a bluetooth adapter.
363
364    The various test methods are supposed to be invoked by actual autotest
365    tests such as server/cros/site_tests/bluetooth_Adapter*.
366
367    """
368    version = 1
369    ADAPTER_POWER_STATE_TIMEOUT_SECS = 5
370    ADAPTER_ACTION_SLEEP_SECS = 1
371    ADAPTER_PAIRING_TIMEOUT_SECS = 60
372    ADAPTER_CONNECTION_TIMEOUT_SECS = 30
373    ADAPTER_DISCONNECTION_TIMEOUT_SECS = 30
374    ADAPTER_PAIRING_POLLING_SLEEP_SECS = 3
375    ADAPTER_DISCOVER_TIMEOUT_SECS = 60          # 30 seconds too short sometimes
376    ADAPTER_DISCOVER_POLLING_SLEEP_SECS = 1
377    ADAPTER_DISCOVER_NAME_TIMEOUT_SECS = 30
378
379    HID_REPORT_SLEEP_SECS = 1
380
381    # hci0 is the default hci device if there is no external bluetooth dongle.
382    EXPECTED_HCI = 'hci0'
383
384    CLASS_OF_SERVICE_MASK = 0xFFE000
385    CLASS_OF_DEVICE_MASK = 0x001FFF
386
387    # Constants about advertising.
388    DAFAULT_MIN_ADVERTISEMENT_INTERVAL_MS = 1280
389    DAFAULT_MAX_ADVERTISEMENT_INTERVAL_MS = 1280
390    ADVERTISING_INTERVAL_UNIT = 0.625
391
392    # Error messages about advertising dbus methods.
393    ERROR_MAX_ADVERTISEMENTS = (
394            'org.bluez.Error.Failed: Maximum advertisements reached')
395    ERROR_INVALID_ADVERTISING_INTERVALS = (
396            'org.bluez.Error.InvalidArguments: Invalid arguments')
397
398    # Supported profiles by chrome os.
399    SUPPORTED_UUIDS = {
400            'HSP_AG_UUID': '00001112-0000-1000-8000-00805f9b34fb',
401            'GATT_UUID': '00001801-0000-1000-8000-00805f9b34fb',
402            'A2DP_SOURCE_UUID': '0000110a-0000-1000-8000-00805f9b34fb',
403            'HFP_AG_UUID': '0000111f-0000-1000-8000-00805f9b34fb',
404            'PNP_UUID': '00001200-0000-1000-8000-00805f9b34fb',
405            'GAP_UUID': '00001800-0000-1000-8000-00805f9b34fb'}
406
407
408    def get_device(self, device_type):
409        """Get the bluetooth device object.
410
411        @param device_type : the bluetooth HID device type, e.g., 'MOUSE'
412
413        @returns: the bluetooth device object
414
415        """
416        self.device_type = device_type
417        if self.devices[device_type] is None:
418            self.devices[device_type] = get_bluetooth_emulated_device(
419                    self.host, device_type)
420        return self.devices[device_type]
421
422
423    # -------------------------------------------------------------------
424    # Adater standalone tests
425    # -------------------------------------------------------------------
426
427
428    @_test_retry_and_log
429    def test_bluetoothd_running(self):
430        """Test that bluetoothd is running."""
431        return self.bluetooth_facade.is_bluetoothd_running()
432
433
434    @_test_retry_and_log
435    def test_start_bluetoothd(self):
436        """Test that bluetoothd could be started successfully."""
437        return self.bluetooth_facade.start_bluetoothd()
438
439
440    @_test_retry_and_log
441    def test_stop_bluetoothd(self):
442        """Test that bluetoothd could be stopped successfully."""
443        return self.bluetooth_facade.stop_bluetoothd()
444
445
446    @_test_retry_and_log
447    def test_adapter_work_state(self):
448        """Test that the bluetooth adapter is in the correct working state.
449
450        This includes that the adapter is detectable, is powered on,
451        and its hci device is hci0.
452        """
453        has_adapter = self.bluetooth_facade.has_adapter()
454        is_powered_on = self.bluetooth_facade.is_powered_on()
455        hci = self.bluetooth_facade.get_hci() == self.EXPECTED_HCI
456        self.results = {
457                'has_adapter': has_adapter,
458                'is_powered_on': is_powered_on,
459                'hci': hci}
460        return all(self.results.values())
461
462
463    @_test_retry_and_log
464    def test_power_on_adapter(self):
465        """Test that the adapter could be powered on successfully."""
466        power_on = self.bluetooth_facade.set_powered(True)
467        is_powered_on = False
468        try:
469            utils.poll_for_condition(
470                    condition=self.bluetooth_facade.is_powered_on,
471                    timeout=self.ADAPTER_POWER_STATE_TIMEOUT_SECS,
472                    desc='Waiting for adapter powered on')
473            is_powered_on = True
474        except utils.TimeoutError as e:
475            logging.error('test_power_on_adapter: %s', e)
476        except:
477            logging.error('test_power_on_adapter: unexpected error')
478
479        self.results = {'power_on': power_on, 'is_powered_on': is_powered_on}
480        return all(self.results.values())
481
482
483    @_test_retry_and_log
484    def test_power_off_adapter(self):
485        """Test that the adapter could be powered off successfully."""
486        power_off = self.bluetooth_facade.set_powered(False)
487        is_powered_off = False
488        try:
489            utils.poll_for_condition(
490                    condition=(lambda:
491                               not self.bluetooth_facade.is_powered_on()),
492                    timeout=self.ADAPTER_POWER_STATE_TIMEOUT_SECS,
493                    desc='Waiting for adapter powered off')
494            is_powered_off = True
495        except utils.TimeoutError as e:
496            logging.error('test_power_off_adapter: %s', e)
497        except:
498            logging.error('test_power_off_adapter: unexpected error')
499
500        self.results = {
501                'power_off': power_off,
502                'is_powered_off': is_powered_off}
503        return all(self.results.values())
504
505
506    @_test_retry_and_log
507    def test_reset_on_adapter(self):
508        """Test that the adapter could be reset on successfully.
509
510        This includes restarting bluetoothd, and removing the settings
511        and cached devices.
512        """
513        reset_on = self.bluetooth_facade.reset_on()
514        is_powered_on = False
515        try:
516            utils.poll_for_condition(
517                    condition=self.bluetooth_facade.is_powered_on,
518                    timeout=self.ADAPTER_POWER_STATE_TIMEOUT_SECS,
519                    desc='Waiting for adapter reset on')
520            is_powered_on = True
521        except utils.TimeoutError as e:
522            logging.error('test_reset_on_adapter: %s', e)
523        except:
524            logging.error('test_reset_on_adapter: unexpected error')
525
526        self.results = {'reset_on': reset_on, 'is_powered_on': is_powered_on}
527        return all(self.results.values())
528
529
530    @_test_retry_and_log
531    def test_reset_off_adapter(self):
532        """Test that the adapter could be reset off successfully.
533
534        This includes restarting bluetoothd, and removing the settings
535        and cached devices.
536        """
537        reset_off = self.bluetooth_facade.reset_off()
538        is_powered_off = False
539        try:
540            utils.poll_for_condition(
541                    condition=(lambda:
542                               not self.bluetooth_facade.is_powered_on()),
543                    timeout=self.ADAPTER_POWER_STATE_TIMEOUT_SECS,
544                    desc='Waiting for adapter reset off')
545            is_powered_off = True
546        except utils.TimeoutError as e:
547            logging.error('test_reset_off_adapter: %s', e)
548        except:
549            logging.error('test_reset_off_adapter: unexpected error')
550
551        self.results = {
552                'reset_off': reset_off,
553                'is_powered_off': is_powered_off}
554        return all(self.results.values())
555
556
557    @_test_retry_and_log
558    def test_UUIDs(self):
559        """Test that basic profiles are supported."""
560        adapter_UUIDs = self.bluetooth_facade.get_UUIDs()
561        self.results = [uuid for uuid in self.SUPPORTED_UUIDS.values()
562                        if uuid not in adapter_UUIDs]
563        return not bool(self.results)
564
565
566    @_test_retry_and_log
567    def test_start_discovery(self):
568        """Test that the adapter could start discovery."""
569        start_discovery = self.bluetooth_facade.start_discovery()
570        time.sleep(self.ADAPTER_ACTION_SLEEP_SECS)
571        is_discovering = self.bluetooth_facade.is_discovering()
572        self.results = {
573                'start_discovery': start_discovery,
574                'is_discovering': is_discovering}
575        return all(self.results.values())
576
577
578    @_test_retry_and_log
579    def test_stop_discovery(self):
580        """Test that the adapter could stop discovery."""
581        stop_discovery = self.bluetooth_facade.stop_discovery()
582        time.sleep(self.ADAPTER_ACTION_SLEEP_SECS)
583        is_not_discovering = not self.bluetooth_facade.is_discovering()
584        self.results = {
585                'stop_discovery': stop_discovery,
586                'is_not_discovering': is_not_discovering}
587        return all(self.results.values())
588
589
590    @_test_retry_and_log
591    def test_discoverable(self):
592        """Test that the adapter could be set discoverable."""
593        set_discoverable = self.bluetooth_facade.set_discoverable(True)
594        time.sleep(self.ADAPTER_ACTION_SLEEP_SECS)
595        is_discoverable = self.bluetooth_facade.is_discoverable()
596        self.results = {
597                'set_discoverable': set_discoverable,
598                'is_discoverable': is_discoverable}
599        return all(self.results.values())
600
601
602    @_test_retry_and_log
603    def test_nondiscoverable(self):
604        """Test that the adapter could be set non-discoverable."""
605        set_nondiscoverable = self.bluetooth_facade.set_discoverable(False)
606        time.sleep(self.ADAPTER_ACTION_SLEEP_SECS)
607        is_nondiscoverable = not self.bluetooth_facade.is_discoverable()
608        self.results = {
609                'set_nondiscoverable': set_nondiscoverable,
610                'is_nondiscoverable': is_nondiscoverable}
611        return all(self.results.values())
612
613
614    @_test_retry_and_log
615    def test_pairable(self):
616        """Test that the adapter could be set pairable."""
617        set_pairable = self.bluetooth_facade.set_pairable(True)
618        time.sleep(self.ADAPTER_ACTION_SLEEP_SECS)
619        is_pairable = self.bluetooth_facade.is_pairable()
620        self.results = {
621                'set_pairable': set_pairable,
622                'is_pairable': is_pairable}
623        return all(self.results.values())
624
625
626    @_test_retry_and_log
627    def test_nonpairable(self):
628        """Test that the adapter could be set non-pairable."""
629        set_nonpairable = self.bluetooth_facade.set_pairable(False)
630        time.sleep(self.ADAPTER_ACTION_SLEEP_SECS)
631        is_nonpairable = not self.bluetooth_facade.is_pairable()
632        self.results = {
633                'set_nonpairable': set_nonpairable,
634                'is_nonpairable': is_nonpairable}
635        return all(self.results.values())
636
637
638    # -------------------------------------------------------------------
639    # Tests about general discovering, pairing, and connection
640    # -------------------------------------------------------------------
641
642
643    @_test_retry_and_log
644    def test_discover_device(self, device_address):
645        """Test that the adapter could discover the specified device address.
646
647        @param device_address: Address of the device.
648
649        @returns: True if the device is found. False otherwise.
650
651        """
652        has_device_initially = False
653        start_discovery = False
654        device_discovered = False
655        has_device = self.bluetooth_facade.has_device
656
657        if has_device(device_address):
658            has_device_initially = True
659        elif self.bluetooth_facade.start_discovery():
660            start_discovery = True
661            try:
662                utils.poll_for_condition(
663                        condition=(lambda: has_device(device_address)),
664                        timeout=self.ADAPTER_DISCOVER_TIMEOUT_SECS,
665                        sleep_interval=self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS,
666                        desc='Waiting for discovering %s' % device_address)
667                device_discovered = True
668            except utils.TimeoutError as e:
669                logging.error('test_discover_device: %s', e)
670            except Exception as e:
671                logging.error('test_discover_device: %s', e)
672                err = 'bluetoothd probably crashed. Check out /var/log/messages'
673                logging.error(err)
674            except:
675                logging.error('test_discover_device: unexpected error')
676
677        self.results = {
678                'has_device_initially': has_device_initially,
679                'start_discovery': start_discovery,
680                'device_discovered': device_discovered}
681        return has_device_initially or device_discovered
682
683
684    @_test_retry_and_log
685    def test_pairing(self, device_address, pin, trusted=True):
686        """Test that the adapter could pair with the device successfully.
687
688        @param device_address: Address of the device.
689        @param pin: pin code to pair with the device.
690        @param trusted: indicating whether to set the device trusted.
691
692        @returns: True if pairing succeeds. False otherwise.
693
694        """
695
696        def _pair_device():
697            """Pair to the device.
698
699            @returns: True if it could pair with the device. False otherwise.
700
701            """
702            return self.bluetooth_facade.pair_legacy_device(
703                    device_address, pin, trusted,
704                    self.ADAPTER_PAIRING_TIMEOUT_SECS)
705
706
707        has_device = False
708        paired = False
709        if self.bluetooth_facade.has_device(device_address):
710            has_device = True
711            try:
712                utils.poll_for_condition(
713                        condition=_pair_device,
714                        timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS,
715                        sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
716                        desc='Waiting for pairing %s' % device_address)
717                paired = True
718            except utils.TimeoutError as e:
719                logging.error('test_pairing: %s', e)
720            except:
721                logging.error('test_pairing: unexpected error')
722
723        self.results = {'has_device': has_device, 'paired': paired}
724        return all(self.results.values())
725
726
727    @_test_retry_and_log
728    def test_remove_pairing(self, device_address):
729        """Test that the adapter could remove the paired device.
730
731        @param device_address: Address of the device.
732
733        @returns: True if the device is removed successfully. False otherwise.
734
735        """
736        device_is_paired_initially = self.bluetooth_facade.device_is_paired(
737                device_address)
738        remove_pairing = False
739        pairing_removed = False
740
741        if device_is_paired_initially:
742            remove_pairing = self.bluetooth_facade.remove_device_object(
743                    device_address)
744            pairing_removed = not self.bluetooth_facade.device_is_paired(
745                    device_address)
746
747        self.results = {
748                'device_is_paired_initially': device_is_paired_initially,
749                'remove_pairing': remove_pairing,
750                'pairing_removed': pairing_removed}
751        return all(self.results.values())
752
753
754    def test_set_trusted(self, device_address, trusted=True):
755        """Test whether the device with the specified address is trusted.
756
757        @param device_address: Address of the device.
758        @param trusted : True or False indicating if trusted is expected.
759
760        @returns: True if the device's "Trusted" property is as specified;
761                  False otherwise.
762
763        """
764
765        set_trusted = self.bluetooth_facade.set_trusted(
766                device_address, trusted)
767
768        properties = self.bluetooth_facade.get_device_properties(
769                device_address)
770        actual_trusted = properties.get('Trusted')
771
772        self.results = {
773                'set_trusted': set_trusted,
774                'actual trusted': actual_trusted,
775                'expected trusted': trusted}
776        return actual_trusted == trusted
777
778
779    @_test_retry_and_log
780    def test_connection_by_adapter(self, device_address):
781        """Test that the adapter of dut could connect to the device successfully
782
783        It is the caller's responsibility to pair to the device before
784        doing connection.
785
786        @param device_address: Address of the device.
787
788        @returns: True if connection is performed. False otherwise.
789
790        """
791
792        def _connect_device():
793            """Connect to the device.
794
795            @returns: True if it could connect to the device. False otherwise.
796
797            """
798            return self.bluetooth_facade.connect_device(device_address)
799
800
801        has_device = False
802        connected = False
803        if self.bluetooth_facade.has_device(device_address):
804            has_device = True
805            try:
806                utils.poll_for_condition(
807                        condition=_connect_device,
808                        timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS,
809                        sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
810                        desc='Waiting for connecting to %s' % device_address)
811                connected = True
812            except utils.TimeoutError as e:
813                logging.error('test_connection_by_adapter: %s', e)
814            except:
815                logging.error('test_connection_by_adapter: unexpected error')
816
817        self.results = {'has_device': has_device, 'connected': connected}
818        return all(self.results.values())
819
820
821    @_test_retry_and_log
822    def test_disconnection_by_adapter(self, device_address):
823        """Test that the adapter of dut could disconnect the device successfully
824
825        @param device_address: Address of the device.
826
827        @returns: True if disconnection is performed. False otherwise.
828
829        """
830        return self.bluetooth_facade.disconnect_device(device_address)
831
832
833    def _enter_command_mode(self, device):
834        """Let the device enter command mode.
835
836        Before using the device, need to call this method to make sure
837        it is in the command mode.
838
839        @param device: the bluetooth HID device
840
841        @returns: True if successful. False otherwise.
842
843        """
844        result = _is_successful(_run_method(device.EnterCommandMode,
845                                            'EnterCommandMode'))
846        if not result:
847            logging.error('EnterCommandMode failed')
848        return result
849
850
851    @_test_retry_and_log
852    def test_connection_by_device(self, device):
853        """Test that the device could connect to the adapter successfully.
854
855        This emulates the behavior that a device may initiate a
856        connection request after waking up from power saving mode.
857
858        @param device: the bluetooth HID device
859
860        @returns: True if connection is performed correctly by device and
861                  the adapter also enters connection state.
862                  False otherwise.
863
864        """
865        if not self._enter_command_mode(device):
866            return False
867
868        method_name = 'test_connection_by_device'
869        connection_by_device = False
870        adapter_address = self.bluetooth_facade.address
871        try:
872            device.ConnectToRemoteAddress(adapter_address)
873            connection_by_device = True
874        except Exception as e:
875            logging.error('%s (device): %s', method_name, e)
876        except:
877            logging.error('%s (device): unexpected error', method_name)
878
879        connection_seen_by_adapter = False
880        device_address = device.address
881        device_is_connected = self.bluetooth_facade.device_is_connected
882        try:
883            utils.poll_for_condition(
884                    condition=lambda: device_is_connected(device_address),
885                    timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS,
886                    desc=('Waiting for connection from %s' % device_address))
887            connection_seen_by_adapter = True
888        except utils.TimeoutError as e:
889            logging.error('%s (adapter): %s', method_name, e)
890        except:
891            logging.error('%s (adapter): unexpected error', method_name)
892
893        self.results = {
894                'connection_by_device': connection_by_device,
895                'connection_seen_by_adapter': connection_seen_by_adapter}
896        return all(self.results.values())
897
898
899    @_test_retry_and_log
900    def test_disconnection_by_device(self, device):
901        """Test that the device could disconnect the adapter successfully.
902
903        This emulates the behavior that a device may initiate a
904        disconnection request before going into power saving mode.
905
906        Note: should not try to enter command mode in this method. When
907              a device is connected, there is no way to enter command mode.
908              One could just issue a special disconnect command without
909              entering command mode.
910
911        @param device: the bluetooth HID device
912
913        @returns: True if disconnection is performed correctly by device and
914                  the adapter also observes the disconnection.
915                  False otherwise.
916
917        """
918        method_name = 'test_disconnection_by_device'
919        disconnection_by_device = False
920        try:
921            device.Disconnect()
922            disconnection_by_device = True
923        except Exception as e:
924            logging.error('%s (device): %s', method_name, e)
925        except:
926            logging.error('%s (device): unexpected error', method_name)
927
928        disconnection_seen_by_adapter = False
929        device_address = device.address
930        device_is_connected = self.bluetooth_facade.device_is_connected
931        try:
932            utils.poll_for_condition(
933                    condition=lambda: not device_is_connected(device_address),
934                    timeout=self.ADAPTER_DISCONNECTION_TIMEOUT_SECS,
935                    desc=('Waiting for disconnection from %s' % device_address))
936            disconnection_seen_by_adapter = True
937        except utils.TimeoutError as e:
938            logging.error('%s (adapter): %s', method_name, e)
939        except:
940            logging.error('%s (adapter): unexpected error', method_name)
941
942        self.results = {
943                'disconnection_by_device': disconnection_by_device,
944                'disconnection_seen_by_adapter': disconnection_seen_by_adapter}
945        return all(self.results.values())
946
947
948    def _get_device_name(self, device_address):
949        """Get the device name.
950
951        @returns: True if the device name is derived. None otherwise.
952
953        """
954        properties = self.bluetooth_facade.get_device_properties(
955                device_address)
956        self.discovered_device_name = properties.get('Name')
957        return bool(self.discovered_device_name)
958
959
960    @_test_retry_and_log
961    def test_device_name(self, device_address, expected_device_name):
962        """Test that the device name discovered by the adapter is correct.
963
964        @param device_address: Address of the device.
965        @param expected_device_name: the bluetooth device name
966
967        @returns: True if the discovered_device_name is expected_device_name.
968                  False otherwise.
969
970        """
971        try:
972            utils.poll_for_condition(
973                    condition=lambda: self._get_device_name(device_address),
974                    timeout=self.ADAPTER_DISCOVER_NAME_TIMEOUT_SECS,
975                    sleep_interval=self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS,
976                    desc='Waiting for device name of %s' % device_address)
977        except utils.TimeoutError as e:
978            logging.error('test_device_name: %s', e)
979        except:
980            logging.error('test_device_name: unexpected error')
981
982        self.results = {
983                'expected_device_name': expected_device_name,
984                'discovered_device_name': self.discovered_device_name}
985        return self.discovered_device_name == expected_device_name
986
987
988    @_test_retry_and_log
989    def test_device_class_of_service(self, device_address,
990                                     expected_class_of_service):
991        """Test that the discovered device class of service is as expected.
992
993        @param device_address: Address of the device.
994        @param expected_class_of_service: the expected class of service
995
996        @returns: True if the discovered class of service matches the
997                  expected class of service. False otherwise.
998
999        """
1000        properties = self.bluetooth_facade.get_device_properties(
1001                device_address)
1002        device_class = properties.get('Class')
1003        discovered_class_of_service = (device_class & self.CLASS_OF_SERVICE_MASK
1004                                       if device_class else None)
1005
1006        self.results = {
1007                'device_class': device_class,
1008                'expected_class_of_service': expected_class_of_service,
1009                'discovered_class_of_service': discovered_class_of_service}
1010        return discovered_class_of_service == expected_class_of_service
1011
1012
1013    @_test_retry_and_log
1014    def test_device_class_of_device(self, device_address,
1015                                    expected_class_of_device):
1016        """Test that the discovered device class of device is as expected.
1017
1018        @param device_address: Address of the device.
1019        @param expected_class_of_device: the expected class of device
1020
1021        @returns: True if the discovered class of device matches the
1022                  expected class of device. False otherwise.
1023
1024        """
1025        properties = self.bluetooth_facade.get_device_properties(
1026                device_address)
1027        device_class = properties.get('Class')
1028        discovered_class_of_device = (device_class & self.CLASS_OF_DEVICE_MASK
1029                                      if device_class else None)
1030
1031        self.results = {
1032                'device_class': device_class,
1033                'expected_class_of_device': expected_class_of_device,
1034                'discovered_class_of_device': discovered_class_of_device}
1035        return discovered_class_of_device == expected_class_of_device
1036
1037
1038    def _get_btmon_log(self, method, logging_timespan=1):
1039        """Capture the btmon log when executing the specified method.
1040
1041        @param method: the method to capture log.
1042                       The method would be executed only when it is not None.
1043                       This allows us to simply capture btmon log without
1044                       executing any command.
1045        @param logging_timespan: capture btmon log for logging_timespan seconds.
1046
1047        """
1048        self.bluetooth_le_facade.btmon_start()
1049        self.advertising_msg = method() if method else ''
1050        time.sleep(logging_timespan)
1051        self.bluetooth_le_facade.btmon_stop()
1052
1053
1054    def convert_to_adv_jiffies(self, adv_interval_ms):
1055        """Convert adv interval in ms to jiffies, i.e., multiples of 0.625 ms.
1056
1057        @param adv_interval_ms: an advertising interval
1058
1059        @returns: the equivalent jiffies
1060
1061        """
1062        return adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT
1063
1064
1065    def compute_duration(self, max_adv_interval_ms):
1066        """Compute duration from max_adv_interval_ms.
1067
1068        Advertising duration is calculated approximately as
1069            duration = max_adv_interval_ms / 1000.0 * 1.1
1070
1071        @param max_adv_interval_ms: max advertising interval in milliseconds.
1072
1073        @returns: duration in seconds.
1074
1075        """
1076        return max_adv_interval_ms / 1000.0 * 1.1
1077
1078
1079    def compute_logging_timespan(self, max_adv_interval_ms):
1080        """Compute the logging timespan from max_adv_interval_ms.
1081
1082        The logging timespan is the time needed to record btmon log.
1083
1084        @param max_adv_interval_ms: max advertising interval in milliseconds.
1085
1086        @returns: logging_timespan in seconds.
1087
1088        """
1089        duration = self.compute_duration(max_adv_interval_ms)
1090        logging_timespan = max(self.count_advertisements * duration, 1)
1091        return logging_timespan
1092
1093
1094    @_test_retry_and_log(False)
1095    def test_check_duration_and_intervals(self, min_adv_interval_ms,
1096                                          max_adv_interval_ms,
1097                                          number_advertisements):
1098        """Verify that every advertisements are scheduled according to the
1099        duration and intervals.
1100
1101        An advertisement would be scheduled at the time span of
1102             duration * number_advertisements
1103
1104        @param min_adv_interval_ms: min advertising interval in milliseconds.
1105        @param max_adv_interval_ms: max advertising interval in milliseconds.
1106        @param number_advertisements: the number of existing advertisements
1107
1108        @returns: True if all advertisements are scheduled based on the
1109                duration and intervals.
1110
1111        """
1112
1113
1114        def within_tolerance(a, b, ratio=0.1):
1115            return abs(a - b) / abs(a) <= ratio
1116
1117
1118        start_str = 'Set Advertising Intervals:'
1119        search_strings = ['HCI Command: LE Set Advertising Data', 'Company']
1120        search_str = '|'.join(search_strings)
1121
1122        contents = self.bluetooth_le_facade.btmon_get(search_str=search_str,
1123                                                      start_str=start_str)
1124
1125        # Company string looks like
1126        #   Company: not assigned (65283)
1127        company_pattern = re.compile('Company:.*\((\d*)\)')
1128
1129        # The string with timestamp looks like
1130        #   < HCI Command: LE Set Advertising Data (0x08|0x0008) [hci0] 3.799236
1131        set_adv_time_str = 'LE Set Advertising Data.*\[hci\d\].*(\d+\.\d+)'
1132        set_adv_time_pattern = re.compile(set_adv_time_str)
1133
1134        adv_timestamps = {}
1135        timestamp = None
1136        manufacturer_id = None
1137        for line in contents:
1138            result = set_adv_time_pattern.search(line)
1139            if result:
1140                timestamp = float(result.group(1))
1141
1142            result = company_pattern.search(line)
1143            if result:
1144                manufacturer_id = '0x%04x' % int(result.group(1))
1145
1146            if timestamp and manufacturer_id:
1147                if manufacturer_id not in adv_timestamps:
1148                    adv_timestamps[manufacturer_id] = []
1149                adv_timestamps[manufacturer_id].append(timestamp)
1150                timestamp = None
1151                manufacturer_id = None
1152
1153        duration = self.compute_duration(max_adv_interval_ms)
1154        expected_timespan = duration * number_advertisements
1155
1156        check_duration = True
1157        for manufacturer_id, values in adv_timestamps.iteritems():
1158            logging.debug('manufacturer_id %s: %s', manufacturer_id, values)
1159            timespans = [values[i] - values[i - 1]
1160                         for i in xrange(1, len(values))]
1161            errors = [timespans[i] for i in xrange(len(timespans))
1162                      if not within_tolerance(expected_timespan, timespans[i])]
1163            logging.debug('timespans: %s', timespans)
1164            logging.debug('errors: %s', errors)
1165            if bool(errors):
1166                check_duration = False
1167
1168        # Verify that the advertising intervals are also correct.
1169        min_adv_interval_ms_found, max_adv_interval_ms_found = (
1170                self._verify_advertising_intervals(min_adv_interval_ms,
1171                                                   max_adv_interval_ms))
1172
1173        self.results = {
1174                'check_duration': check_duration,
1175                'max_adv_interval_ms_found': max_adv_interval_ms_found,
1176                'max_adv_interval_ms_found': max_adv_interval_ms_found,
1177        }
1178        return all(self.results.values())
1179
1180
1181    def _get_min_max_intervals_strings(self, min_adv_interval_ms,
1182                                       max_adv_interval_ms):
1183        """Get the min and max advertising intervals strings shown in btmon.
1184
1185        Advertising intervals shown in the btmon log look like
1186            Min advertising interval: 1280.000 msec (0x0800)
1187            Max advertising interval: 1280.000 msec (0x0800)
1188
1189        @param min_adv_interval_ms: min advertising interval in milliseconds.
1190        @param max_adv_interval_ms: max advertising interval in milliseconds.
1191
1192        @returns: the min and max intervals strings.
1193
1194        """
1195        min_str = ('Min advertising interval: %.3f msec (0x%04x)' %
1196                   (min_adv_interval_ms,
1197                    min_adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT))
1198        logging.debug('min_adv_interval_ms: %s', min_str)
1199
1200        max_str = ('Max advertising interval: %.3f msec (0x%04x)' %
1201                   (max_adv_interval_ms,
1202                    max_adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT))
1203        logging.debug('max_adv_interval_ms: %s', max_str)
1204
1205        return (min_str, max_str)
1206
1207
1208    def _verify_advertising_intervals(self, min_adv_interval_ms,
1209                                      max_adv_interval_ms):
1210        """Verify min and max advertising intervals.
1211
1212        Advertising intervals look like
1213            Min advertising interval: 1280.000 msec (0x0800)
1214            Max advertising interval: 1280.000 msec (0x0800)
1215
1216        @param min_adv_interval_ms: min advertising interval in milliseconds.
1217        @param max_adv_interval_ms: max advertising interval in milliseconds.
1218
1219        @returns: a tuple of (True, True) if both min and max advertising
1220            intervals could be found. Otherwise, the corresponding element
1221            in the tuple if False.
1222
1223        """
1224        min_str, max_str = self._get_min_max_intervals_strings(
1225                min_adv_interval_ms, max_adv_interval_ms)
1226
1227        min_adv_interval_ms_found = self.bluetooth_le_facade.btmon_find(min_str)
1228        max_adv_interval_ms_found = self.bluetooth_le_facade.btmon_find(max_str)
1229
1230        return min_adv_interval_ms_found, max_adv_interval_ms_found
1231
1232
1233    @_test_retry_and_log(False)
1234    def test_register_advertisement(self, advertisement_data, instance_id,
1235                                    min_adv_interval_ms, max_adv_interval_ms):
1236        """Verify that an advertisement is registered correctly.
1237
1238        This test verifies the following data:
1239        - advertisement added
1240        - manufacturer data
1241        - service UUIDs
1242        - service data
1243        - advertising intervals
1244        - advertising enabled
1245
1246        @param advertisement_data: the data of an advertisement to register.
1247        @param instance_id: the instance id which starts at 1.
1248        @param min_adv_interval_ms: min_adv_interval in milliseconds.
1249        @param max_adv_interval_ms: max_adv_interval in milliseconds.
1250
1251        @returns: True if the advertisement is registered correctly.
1252                  False otherwise.
1253
1254        """
1255        # When registering a new advertisement, it is possible that another
1256        # instance is advertising. It may need to wait for all other
1257        # advertisements to complete advertising once.
1258        self.count_advertisements += 1
1259        logging_timespan = self.compute_logging_timespan(max_adv_interval_ms)
1260        self._get_btmon_log(
1261                lambda: self.bluetooth_le_facade.register_advertisement(
1262                        advertisement_data),
1263                logging_timespan=logging_timespan)
1264
1265        # Verify that a new advertisement is added.
1266        advertisement_added = self.bluetooth_le_facade.btmon_find(
1267                'Advertising Added: %d' % instance_id)
1268
1269        # Verify that the manufacturer data could be found.
1270        manufacturer_data = advertisement_data.get('ManufacturerData', '')
1271        for manufacturer_id in manufacturer_data:
1272            # The 'not assigned' text below means the manufacturer id
1273            # is not actually assigned to any real manufacturer.
1274            # For real 16-bit manufacturer UUIDs, refer to
1275            #  https://www.bluetooth.com/specifications/assigned-numbers/16-bit-UUIDs-for-Members
1276            manufacturer_data_found = self.bluetooth_le_facade.btmon_find(
1277                    'Company: not assigned (%d)' % int(manufacturer_id, 16))
1278
1279        # Verify that all service UUIDs could be found.
1280        service_uuids_found = True
1281        for uuid in advertisement_data.get('ServiceUUIDs', []):
1282            # Service UUIDs looks like ['0x180D', '0x180F']
1283            #   Heart Rate (0x180D)
1284            #   Battery Service (0x180F)
1285            # For actual 16-bit service UUIDs, refer to
1286            #   https://www.bluetooth.com/specifications/gatt/services
1287            if not self.bluetooth_le_facade.btmon_find('0x%s' % uuid):
1288                service_uuids_found = False
1289                break
1290
1291        # Verify service data.
1292        service_data_found = True
1293        for uuid, data in advertisement_data.get('ServiceData', {}).items():
1294            # A service data looks like
1295            #   Service Data (UUID 0x9999): 0001020304
1296            # while uuid is '9999' and data is [0x00, 0x01, 0x02, 0x03, 0x04]
1297            data_str = ''.join(map(lambda n: '%02x' % n, data))
1298            if not self.bluetooth_le_facade.btmon_find(
1299                    'Service Data (UUID 0x%s): %s' % (uuid, data_str)):
1300                service_data_found = False
1301                break
1302
1303        # Verify that the advertising intervals are correct.
1304        min_adv_interval_ms_found, max_adv_interval_ms_found = (
1305                self._verify_advertising_intervals(min_adv_interval_ms,
1306                                                   max_adv_interval_ms))
1307
1308        # Verify advertising is enabled.
1309        advertising_enabled = self.bluetooth_le_facade.btmon_find(
1310                'Advertising: Enabled (0x01)')
1311
1312        self.results = {
1313                'advertisement_added': advertisement_added,
1314                'manufacturer_data_found': manufacturer_data_found,
1315                'service_uuids_found': service_uuids_found,
1316                'service_data_found': service_data_found,
1317                'min_adv_interval_ms_found': min_adv_interval_ms_found,
1318                'max_adv_interval_ms_found': max_adv_interval_ms_found,
1319                'advertising_enabled': advertising_enabled,
1320        }
1321        return all(self.results.values())
1322
1323
1324    @_test_retry_and_log(False)
1325    def test_fail_to_register_advertisement(self, advertisement_data,
1326                                            min_adv_interval_ms,
1327                                            max_adv_interval_ms):
1328        """Verify that failure is incurred when max advertisements are reached.
1329
1330        This test verifies that a registration failure is incurred when
1331        max advertisements are reached. The error message looks like:
1332
1333            org.bluez.Error.Failed: Maximum advertisements reached
1334
1335        @param advertisement_data: the advertisement to register.
1336        @param min_adv_interval_ms: min_adv_interval in milliseconds.
1337        @param max_adv_interval_ms: max_adv_interval in milliseconds.
1338
1339        @returns: True if the error message is received correctly.
1340                  False otherwise.
1341
1342        """
1343        logging_timespan = self.compute_logging_timespan(max_adv_interval_ms)
1344        self._get_btmon_log(
1345                lambda: self.bluetooth_le_facade.register_advertisement(
1346                        advertisement_data),
1347                logging_timespan=logging_timespan)
1348
1349        # Verify that max advertisements are reached.
1350        max_advertisements_error = (
1351                self.ERROR_MAX_ADVERTISEMENTS in self.advertising_msg)
1352
1353        # Verify that no new advertisement is added.
1354        advertisement_not_added = not self.bluetooth_le_facade.btmon_find(
1355                'Advertising Added:')
1356
1357        # Verify that the advertising intervals are correct.
1358        min_adv_interval_ms_found, max_adv_interval_ms_found = (
1359                self._verify_advertising_intervals(min_adv_interval_ms,
1360                                                   max_adv_interval_ms))
1361
1362        # Verify advertising remains enabled.
1363        advertising_enabled = self.bluetooth_le_facade.btmon_find(
1364                'Advertising: Enabled (0x01)')
1365
1366        self.results = {
1367                'max_advertisements_error': max_advertisements_error,
1368                'advertisement_not_added': advertisement_not_added,
1369                'min_adv_interval_ms_found': min_adv_interval_ms_found,
1370                'max_adv_interval_ms_found': max_adv_interval_ms_found,
1371                'advertising_enabled': advertising_enabled,
1372        }
1373        return all(self.results.values())
1374
1375
1376    @_test_retry_and_log(False)
1377    def test_unregister_advertisement(self, advertisement_data, instance_id,
1378                                      min_adv_interval_ms, max_adv_interval_ms,
1379                                      advertising_disabled):
1380        """Verify that an advertisement is unregistered correctly.
1381
1382        This test verifies the following data:
1383        - advertisement removed
1384        - advertising status: enabled if there are advertisements left;
1385                              disabled otherwise.
1386
1387        @param advertisement_data: the data of an advertisement to unregister.
1388        @param instance_id: the instance id of the advertisement to remove.
1389        @param min_adv_interval_ms: min_adv_interval in milliseconds.
1390        @param max_adv_interval_ms: max_adv_interval in milliseconds.
1391        @param advertising_disabled: is advertising disabled? This happens
1392                only when all advertisements are removed.
1393
1394        @returns: True if the advertisement is unregistered correctly.
1395                  False otherwise.
1396
1397        """
1398        self.count_advertisements -= 1
1399        self._get_btmon_log(
1400                lambda: self.bluetooth_le_facade.unregister_advertisement(
1401                        advertisement_data))
1402
1403        # Verify that the advertisement is removed.
1404        advertisement_removed = self.bluetooth_le_facade.btmon_find(
1405                'Advertising Removed: %d' % instance_id)
1406
1407        # If advertising_disabled is True, there should be no log like
1408        #       'Advertising: Enabled (0x01)'
1409        # If advertising_disabled is False, there should be log like
1410        #       'Advertising: Enabled (0x01)'
1411
1412        # Only need to check advertising status when the last advertisement
1413        # is removed. For any other advertisements prior to the last one,
1414        # we may or may not observe 'Advertising: Enabled (0x01)' message.
1415        # Hence, the test would become flaky if we insist to see that message.
1416        # A possible workaround is to sleep for a while and then check the
1417        # message. The drawback is that we may need to wait up to 10 seconds
1418        # if the advertising duration and intervals are long.
1419        # In a test case, we always run test_check_duration_and_intervals()
1420        # to check if advertising duration and intervals are correct after
1421        # un-registering one or all advertisements, it is safe to do so.
1422        advertising_enabled_found = self.bluetooth_le_facade.btmon_find(
1423                'Advertising: Enabled (0x01)')
1424        advertising_disabled_found = self.bluetooth_le_facade.btmon_find(
1425                'Advertising: Disabled (0x00)')
1426        advertising_status_correct = not advertising_disabled or (
1427                advertising_disabled_found and not advertising_enabled_found)
1428
1429        self.results = {
1430                'advertisement_removed': advertisement_removed,
1431                'advertising_status_correct': advertising_status_correct,
1432        }
1433        return all(self.results.values())
1434
1435
1436    @_test_retry_and_log(False)
1437    def test_set_advertising_intervals(self, min_adv_interval_ms,
1438                                       max_adv_interval_ms):
1439        """Verify that new advertising intervals are set correctly.
1440
1441        Note that setting advertising intervals does not enable/disable
1442        advertising. Hence, there is no need to check the advertising
1443        status.
1444
1445        @param min_adv_interval_ms: the min advertising interval in ms.
1446        @param max_adv_interval_ms: the max advertising interval in ms.
1447
1448        @returns: True if the new advertising intervals are correct.
1449                  False otherwise.
1450
1451        """
1452        self._get_btmon_log(
1453                lambda: self.bluetooth_le_facade.set_advertising_intervals(
1454                        min_adv_interval_ms, max_adv_interval_ms))
1455
1456        # Verify the new advertising intervals.
1457        # With intervals of 200 ms and 200 ms, the log looks like
1458        #   bluetoothd: Set Advertising Intervals: 0x0140, 0x0140
1459        txt = 'bluetoothd: Set Advertising Intervals: 0x%04x, 0x%04x'
1460        adv_intervals_found = self.bluetooth_le_facade.btmon_find(
1461                txt % (self.convert_to_adv_jiffies(min_adv_interval_ms),
1462                       self.convert_to_adv_jiffies(max_adv_interval_ms)))
1463
1464        self.results = {'adv_intervals_found': adv_intervals_found}
1465        return all(self.results.values())
1466
1467
1468    @_test_retry_and_log(False)
1469    def test_fail_to_set_advertising_intervals(
1470            self, invalid_min_adv_interval_ms, invalid_max_adv_interval_ms,
1471            orig_min_adv_interval_ms, orig_max_adv_interval_ms):
1472        """Verify that setting invalid advertising intervals results in error.
1473
1474        If invalid min/max advertising intervals are given, it would incur
1475        the error: 'org.bluez.Error.InvalidArguments: Invalid arguments'.
1476        Note that valid advertising intervals fall between 20 ms and 10,240 ms.
1477
1478        @param invalid_min_adv_interval_ms: the invalid min advertising interval
1479                in ms.
1480        @param invalid_max_adv_interval_ms: the invalid max advertising interval
1481                in ms.
1482        @param orig_min_adv_interval_ms: the original min advertising interval
1483                in ms.
1484        @param orig_max_adv_interval_ms: the original max advertising interval
1485                in ms.
1486
1487        @returns: True if it fails to set invalid advertising intervals.
1488                  False otherwise.
1489
1490        """
1491        self._get_btmon_log(
1492                lambda: self.bluetooth_le_facade.set_advertising_intervals(
1493                        invalid_min_adv_interval_ms,
1494                        invalid_max_adv_interval_ms))
1495
1496        # Verify that the invalid error is observed in the dbus error callback
1497        # message.
1498        invalid_intervals_error = (self.ERROR_INVALID_ADVERTISING_INTERVALS in
1499                                   self.advertising_msg)
1500
1501        # Verify that the min/max advertising intervals remain the same
1502        # after setting the invalid advertising intervals.
1503        #
1504        # In btmon log, we would see the following message first.
1505        #    bluetoothd: Set Advertising Intervals: 0x0010, 0x0010
1506        # And then, we should check if "Min advertising interval" and
1507        # "Max advertising interval" remain the same.
1508        start_str = 'bluetoothd: Set Advertising Intervals: 0x%04x, 0x%04x' % (
1509                self.convert_to_adv_jiffies(invalid_min_adv_interval_ms),
1510                self.convert_to_adv_jiffies(invalid_max_adv_interval_ms))
1511
1512        search_strings = ['Min advertising interval:',
1513                          'Max advertising interval:']
1514        search_str = '|'.join(search_strings)
1515
1516        contents = self.bluetooth_le_facade.btmon_get(search_str=search_str,
1517                                                      start_str=start_str)
1518
1519        # The min/max advertising intervals of all advertisements should remain
1520        # the same as the previous valid ones.
1521        min_max_str = '[Min|Max] advertising interval: (\d*\.\d*) msec'
1522        min_max_pattern = re.compile(min_max_str)
1523        correct_orig_min_adv_interval = True
1524        correct_orig_max_adv_interval = True
1525        for line in contents:
1526            result = min_max_pattern.search(line)
1527            if result:
1528                interval = float(result.group(1))
1529                if 'Min' in line and interval != orig_min_adv_interval_ms:
1530                    correct_orig_min_adv_interval = False
1531                elif 'Max' in line and interval != orig_max_adv_interval_ms:
1532                    correct_orig_max_adv_interval = False
1533
1534        self.results = {
1535                'invalid_intervals_error': invalid_intervals_error,
1536                'correct_orig_min_adv_interval': correct_orig_min_adv_interval,
1537                'correct_orig_max_adv_interval': correct_orig_max_adv_interval}
1538
1539        return all(self.results.values())
1540
1541
1542    @_test_retry_and_log(False)
1543    def test_check_advertising_intervals(self, min_adv_interval_ms,
1544                                         max_adv_interval_ms):
1545        """Verify that the advertising intervals are as expected.
1546
1547        @param min_adv_interval_ms: the min advertising interval in ms.
1548        @param max_adv_interval_ms: the max advertising interval in ms.
1549
1550        @returns: True if the advertising intervals are correct.
1551                  False otherwise.
1552
1553        """
1554        self._get_btmon_log(None)
1555
1556        # Verify that the advertising intervals are correct.
1557        min_adv_interval_ms_found, max_adv_interval_ms_found = (
1558                self._verify_advertising_intervals(min_adv_interval_ms,
1559                                                   max_adv_interval_ms))
1560
1561        self.results = {
1562                'min_adv_interval_ms_found': min_adv_interval_ms_found,
1563                'max_adv_interval_ms_found': max_adv_interval_ms_found,
1564        }
1565        return all(self.results.values())
1566
1567
1568    @_test_retry_and_log(False)
1569    def test_reset_advertising(self, instance_ids=[]):
1570        """Verify that advertising is reset correctly.
1571
1572        Note that reset advertising would set advertising intervals to
1573        the default values. However, we would not be able to observe
1574        the values change until new advertisements are registered.
1575        Therefore, it is required that a test_register_advertisement()
1576        test is conducted after this test.
1577
1578        If instance_ids is [], all advertisements would still be removed
1579        if there are any. However, no need to check 'Advertising Removed'
1580        in btmon log since we may or may not be able to observe the message.
1581        This feature is needed if this test is invoked as the first one in
1582        a test case to reset advertising. In this situation, this test does
1583        not know how many advertisements exist.
1584
1585        @param instance_ids: the list of instance IDs that should be removed.
1586
1587        @returns: True if advertising is reset correctly.
1588                  False otherwise.
1589
1590        """
1591        self.count_advertisements = 0
1592        self._get_btmon_log(
1593                lambda: self.bluetooth_le_facade.reset_advertising())
1594
1595        # Verify that every advertisement is removed. When an advertisement
1596        # with instance id 1 is removed, the log looks like
1597        #   @ Advertising Removed: 1
1598        txt = 'Advertising Removed: %d'
1599        for instance_id in instance_ids:
1600            if not self.bluetooth_le_facade.btmon_find(txt % instance_id):
1601                advertisement_removed = False
1602                logging.error('Failed to remove advertisement instance: %d',
1603                              instance_id)
1604                break
1605        else:
1606            advertisement_removed = True
1607
1608        # Verify that "Reset Advertising Intervals" command has been issued.
1609        reset_advertising_intervals = self.bluetooth_le_facade.btmon_find(
1610                'bluetoothd: Reset Advertising Intervals')
1611
1612        # Verify the advertising is disabled.
1613        advertising_disabled_observied = self.bluetooth_le_facade.btmon_find(
1614                'Advertising: Disabled')
1615        # If there are no existing advertisements, we may not observe
1616        # 'Advertising: Disabled'.
1617        advertising_disabled = (instance_ids == [] or
1618                                advertising_disabled_observied)
1619
1620        self.results = {
1621                'advertisement_removed': advertisement_removed,
1622                'reset_advertising_intervals': reset_advertising_intervals,
1623                'advertising_disabled': advertising_disabled,
1624        }
1625        return all(self.results.values())
1626
1627
1628    # -------------------------------------------------------------------
1629    # Bluetooth mouse related tests
1630    # -------------------------------------------------------------------
1631
1632
1633    def _record_input_events(self, device, gesture):
1634        """Record the input events.
1635
1636        @param device: the bluetooth HID device.
1637        @param gesture: the gesture method to perform.
1638
1639        @returns: the input events received on the DUT.
1640
1641        """
1642        self.input_facade.initialize_input_recorder(device.name)
1643        self.input_facade.start_input_recorder()
1644        time.sleep(self.HID_REPORT_SLEEP_SECS)
1645        gesture()
1646        time.sleep(self.HID_REPORT_SLEEP_SECS)
1647        self.input_facade.stop_input_recorder()
1648        time.sleep(self.HID_REPORT_SLEEP_SECS)
1649        event_values = self.input_facade.get_input_events()
1650        events = [Event(*ev) for ev in event_values]
1651        return events
1652
1653
1654    def _test_mouse_click(self, device, button):
1655        """Test that the mouse click events could be received correctly.
1656
1657        @param device: the meta device containing a bluetooth HID device
1658        @param button: which button to test, 'LEFT' or 'RIGHT'
1659
1660        @returns: True if the report received by the host matches the
1661                  expected one. False otherwise.
1662
1663        """
1664        if button == 'LEFT':
1665            gesture = device.LeftClick
1666        elif button == 'RIGHT':
1667            gesture = device.RightClick
1668        else:
1669            raise error.TestError('Button (%s) is not valid.' % button)
1670
1671        actual_events = self._record_input_events(device, gesture)
1672
1673        linux_input_button = {'LEFT': BTN_LEFT, 'RIGHT': BTN_RIGHT}
1674        expected_events = [
1675                # Button down
1676                recorder.MSC_SCAN_BTN_EVENT[button],
1677                Event(EV_KEY, linux_input_button[button], 1),
1678                recorder.SYN_EVENT,
1679                # Button up
1680                recorder.MSC_SCAN_BTN_EVENT[button],
1681                Event(EV_KEY, linux_input_button[button], 0),
1682                recorder.SYN_EVENT]
1683
1684        self.results = {
1685                'actual_events': map(str, actual_events),
1686                'expected_events': map(str, expected_events)}
1687        return actual_events == expected_events
1688
1689
1690    @_test_retry_and_log
1691    def test_mouse_left_click(self, device):
1692        """Test that the mouse left click events could be received correctly.
1693
1694        @param device: the meta device containing a bluetooth HID device
1695
1696        @returns: True if the report received by the host matches the
1697                  expected one. False otherwise.
1698
1699        """
1700        return self._test_mouse_click(device, 'LEFT')
1701
1702
1703    @_test_retry_and_log
1704    def test_mouse_right_click(self, device):
1705        """Test that the mouse right click events could be received correctly.
1706
1707        @param device: the meta device containing a bluetooth HID device
1708
1709        @returns: True if the report received by the host matches the
1710                  expected one. False otherwise.
1711
1712        """
1713        return self._test_mouse_click(device, 'RIGHT')
1714
1715
1716    def _test_mouse_move(self, device, delta_x=0, delta_y=0):
1717        """Test that the mouse move events could be received correctly.
1718
1719        @param device: the meta device containing a bluetooth HID device
1720        @param delta_x: the distance to move cursor in x axis
1721        @param delta_y: the distance to move cursor in y axis
1722
1723        @returns: True if the report received by the host matches the
1724                  expected one. False otherwise.
1725
1726        """
1727        gesture = lambda: device.Move(delta_x, delta_y)
1728        actual_events = self._record_input_events(device, gesture)
1729
1730        events_x = [Event(EV_REL, REL_X, delta_x)] if delta_x else []
1731        events_y = [Event(EV_REL, REL_Y, delta_y)] if delta_y else []
1732        expected_events = events_x + events_y + [recorder.SYN_EVENT]
1733
1734        self.results = {
1735                'actual_events': map(str, actual_events),
1736                'expected_events': map(str, expected_events)}
1737        return actual_events == expected_events
1738
1739
1740    @_test_retry_and_log
1741    def test_mouse_move_in_x(self, device, delta_x):
1742        """Test that the mouse move events in x could be received correctly.
1743
1744        @param device: the meta device containing a bluetooth HID device
1745        @param delta_x: the distance to move cursor in x axis
1746
1747        @returns: True if the report received by the host matches the
1748                  expected one. False otherwise.
1749
1750        """
1751        return self._test_mouse_move(device, delta_x=delta_x)
1752
1753
1754    @_test_retry_and_log
1755    def test_mouse_move_in_y(self, device, delta_y):
1756        """Test that the mouse move events in y could be received correctly.
1757
1758        @param device: the meta device containing a bluetooth HID device
1759        @param delta_y: the distance to move cursor in y axis
1760
1761        @returns: True if the report received by the host matches the
1762                  expected one. False otherwise.
1763
1764        """
1765        return self._test_mouse_move(device, delta_y=delta_y)
1766
1767
1768    @_test_retry_and_log
1769    def test_mouse_move_in_xy(self, device, delta_x, delta_y):
1770        """Test that the mouse move events could be received correctly.
1771
1772        @param device: the meta device containing a bluetooth HID device
1773        @param delta_x: the distance to move cursor in x axis
1774        @param delta_y: the distance to move cursor in y axis
1775
1776        @returns: True if the report received by the host matches the
1777                  expected one. False otherwise.
1778
1779        """
1780        return self._test_mouse_move(device, delta_x=delta_x, delta_y=delta_y)
1781
1782
1783    def _test_mouse_scroll(self, device, units):
1784        """Test that the mouse wheel events could be received correctly.
1785
1786        @param device: the meta device containing a bluetooth HID device
1787        @param units: the units to scroll in y axis
1788
1789        @returns: True if the report received by the host matches the
1790                  expected one. False otherwise.
1791
1792        """
1793        gesture = lambda: device.Scroll(units)
1794        actual_events = self._record_input_events(device, gesture)
1795        expected_events = [Event(EV_REL, REL_WHEEL, units), recorder.SYN_EVENT]
1796        self.results = {
1797                'actual_events': map(str, actual_events),
1798                'expected_events': map(str, expected_events)}
1799        return actual_events == expected_events
1800
1801
1802    @_test_retry_and_log
1803    def test_mouse_scroll_down(self, device, delta_y):
1804        """Test that the mouse wheel events could be received correctly.
1805
1806        @param device: the meta device containing a bluetooth HID device
1807        @param delta_y: the units to scroll down in y axis;
1808                        should be a postive value
1809
1810        @returns: True if the report received by the host matches the
1811                  expected one. False otherwise.
1812
1813        """
1814        if delta_y > 0:
1815            return self._test_mouse_scroll(device, delta_y)
1816        else:
1817            raise error.TestError('delta_y (%d) should be a positive value',
1818                                  delta_y)
1819
1820
1821    @_test_retry_and_log
1822    def test_mouse_scroll_up(self, device, delta_y):
1823        """Test that the mouse wheel events could be received correctly.
1824
1825        @param device: the meta device containing a bluetooth HID device
1826        @param delta_y: the units to scroll up in y axis;
1827                        should be a postive value
1828
1829        @returns: True if the report received by the host matches the
1830                  expected one. False otherwise.
1831
1832        """
1833        if delta_y > 0:
1834            return self._test_mouse_scroll(device, -delta_y)
1835        else:
1836            raise error.TestError('delta_y (%d) should be a positive value',
1837                                  delta_y)
1838
1839
1840    @_test_retry_and_log
1841    def test_mouse_click_and_drag(self, device, delta_x, delta_y):
1842        """Test that the mouse click-and-drag events could be received
1843        correctly.
1844
1845        @param device: the meta device containing a bluetooth HID device
1846        @param delta_x: the distance to drag in x axis
1847        @param delta_y: the distance to drag in y axis
1848
1849        @returns: True if the report received by the host matches the
1850                  expected one. False otherwise.
1851
1852        """
1853        gesture = lambda: device.ClickAndDrag(delta_x, delta_y)
1854        actual_events = self._record_input_events(device, gesture)
1855
1856        button = 'LEFT'
1857        expected_events = (
1858                [# Button down
1859                 recorder.MSC_SCAN_BTN_EVENT[button],
1860                 Event(EV_KEY, BTN_LEFT, 1),
1861                 recorder.SYN_EVENT] +
1862                # cursor movement in x and y
1863                ([Event(EV_REL, REL_X, delta_x)] if delta_x else []) +
1864                ([Event(EV_REL, REL_Y, delta_y)] if delta_y else []) +
1865                [recorder.SYN_EVENT] +
1866                # Button up
1867                [recorder.MSC_SCAN_BTN_EVENT[button],
1868                 Event(EV_KEY, BTN_LEFT, 0),
1869                 recorder.SYN_EVENT])
1870
1871        self.results = {
1872                'actual_events': map(str, actual_events),
1873                'expected_events': map(str, expected_events)}
1874        return actual_events == expected_events
1875
1876
1877    # -------------------------------------------------------------------
1878    # Autotest methods
1879    # -------------------------------------------------------------------
1880
1881
1882    def initialize(self):
1883        """Initialize bluetooth adapter tests."""
1884        # Run through every tests and collect failed tests in self.fails.
1885        self.fails = []
1886
1887        # If a test depends on multiple conditions, write the results of
1888        # the conditions in self.results so that it is easy to know
1889        # what conditions failed by looking at the log.
1890        self.results = None
1891
1892        # Some tests may instantiate a peripheral device for testing.
1893        self.devices = dict()
1894        for device_type in SUPPORTED_DEVICE_TYPES:
1895            self.devices[device_type] = None
1896
1897        # The count of registered advertisements.
1898        self.count_advertisements = 0
1899
1900    def run_once(self, *args, **kwargs):
1901        """This method should be implemented by children classes.
1902
1903        Typically, the run_once() method would look like:
1904
1905        factory = remote_facade_factory.RemoteFacadeFactory(host)
1906        self.bluetooth_facade = factory.create_bluetooth_hid_facade()
1907
1908        self.test_bluetoothd_running()
1909        # ...
1910        # invoke more self.test_xxx() tests.
1911        # ...
1912
1913        if self.fails:
1914            raise error.TestFail(self.fails)
1915
1916        """
1917        raise NotImplementedError
1918
1919
1920    def cleanup(self):
1921        """Clean up bluetooth adapter tests."""
1922        # Close the device properly if a device is instantiated.
1923        # Note: do not write something like the following statements
1924        #           if self.devices[device_type]:
1925        #       or
1926        #           if bool(self.devices[device_type]):
1927        #       Otherwise, it would try to invoke bluetooth_mouse.__nonzero__()
1928        #       which just does not exist.
1929        for device_type in SUPPORTED_DEVICE_TYPES:
1930            if self.devices[device_type] is not None:
1931                self.devices[device_type].Close()
1932