1# Lint as: python2, python3
2# Copyright 2016 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Server side bluetooth adapter subtests."""
7
8from __future__ import absolute_import
9from __future__ import division
10from __future__ import print_function
11
12from datetime import datetime, timedelta
13import errno
14import functools
15import six.moves.http_client
16import inspect
17import logging
18import multiprocessing
19import os
20import re
21import socket
22import threading
23import time
24
25import common
26from autotest_lib.client.bin import utils
27from autotest_lib.client.bin.input import input_event_recorder as recorder
28from autotest_lib.client.common_lib import error
29from autotest_lib.client.common_lib.cros.bluetooth import bluetooth_socket
30from autotest_lib.client.cros.chameleon import chameleon
31from autotest_lib.server.cros.bluetooth import bluetooth_peer_update
32from autotest_lib.server.cros.bluetooth import bluetooth_test_utils
33from autotest_lib.server import test
34
35from autotest_lib.client.bin.input.linux_input import (
36        BTN_LEFT, BTN_RIGHT, EV_KEY, EV_REL, REL_X, REL_Y, REL_WHEEL,
37        REL_WHEEL_HI_RES, KEY_PLAYCD, KEY_PAUSECD, KEY_STOPCD, KEY_NEXTSONG,
38        KEY_PREVIOUSSONG)
39from autotest_lib.server.cros.bluetooth.bluetooth_gatt_client_utils import (
40        GATT_ClientFacade, GATT_Application, GATT_HIDApplication)
41from autotest_lib.server.cros.multimedia import remote_facade_factory
42import six
43from six.moves import map
44from six.moves import range
45from six.moves import zip
46
47
48Event = recorder.Event
49
50CHIPSET_TO_VIDPID = { 'BRCM-4354':[('0x002d','0x4354')],
51                      'MVL-8897':[('0x02df','0x912d')],
52                      'MVL-8997':[('0x1b4b','0x2b42')],
53                      'QCA-9462': [('0x168c', '0x0034')],
54                      'QCA-6174A-5':[('0x168c','0x003e')],
55                      'QCA-6174A-3':[('0x271','0x050a')],   # UART
56                      'Intel-AX200':[('0x8086', '0x2723')], # CcP2
57                      'Intel-AX201':[('0x8086','0x02f0')],  # HrP2
58                      'Intel-AC9260':[('0x8086','0x2526')], # ThP2
59                      'Intel-AC9560':[('0x8086','0x31dc'),  # JfP2
60                                      ('0x8086','0x9df0')],
61                      'Intel-AC7260':[('0x8086','0x08b1'),  # WP2
62                                      ('0x8086','0x08b2')],
63                      'Intel-AC7265':[('0x8086','0x095a'),  # StP2
64                                      ('0x8086','0x095b')],
65                      'Realtek-RTL8822C-USB':[('0x10ec','0xc822')] }
66
67# We have a number of chipsets that are no longer supported. Known issues
68# related to firmware will be ignored on these devices (b/169328792).
69UNSUPPORTED_CHIPSETS = [
70        'BRCM-4354', 'MVL-8897', 'MVL-8997', 'Intel-AC7260', 'Intel-AC7265'
71]
72
73# Location of data traces relative to this (bluetooth_adapter_tests.py) file
74BT_ADAPTER_TEST_PATH = os.path.dirname(__file__)
75TRACE_LOCATION = os.path.join(BT_ADAPTER_TEST_PATH, 'input_traces/keyboard')
76
77RESUME_DELTA = -5
78
79# Delay binding the methods since host is only available at run time.
80SUPPORTED_DEVICE_TYPES = {
81    'MOUSE': lambda btpeer: btpeer.get_bluetooth_hid_mouse,
82    'KEYBOARD': lambda btpeer: btpeer.get_bluetooth_hid_keyboard,
83    'BLE_MOUSE': lambda btpeer: btpeer.get_ble_mouse,
84    'BLE_KEYBOARD': lambda btpeer: btpeer.get_ble_keyboard,
85    # Tester allows us to test DUT's discoverability, etc. from a peer
86    'BLUETOOTH_TESTER': lambda btpeer: btpeer.get_bluetooth_tester,
87    # This is a base object that does not emulate any Bluetooth device.
88    # This object is preferred when only a pure XMLRPC server is needed
89    # on the btpeer host, e.g., to perform servod methods.
90    'BLUETOOTH_BASE': lambda btpeer: btpeer.get_bluetooth_base,
91    # on the chameleon host, e.g., to perform servod methods.
92    'BLUETOOTH_BASE': lambda chameleon: chameleon.get_bluetooth_base,
93    # A phone device that supports Bluetooth
94    'BLE_PHONE': lambda chameleon: chameleon.get_ble_phone,
95    # A Bluetooth audio device emulating a headphone
96    'BLUETOOTH_AUDIO': lambda chameleon: chameleon.get_bluetooth_audio,
97}
98
99COMMON_FAILURES = {
100        'Freeing adapter /org/bluez/hci': 'adapter_freed',
101        '/var/spool/crash/bluetoothd': 'bluetoothd_crashed',
102}
103
104# TODO(b/150898182) - Don't run some tests on tablet form factors
105# This list was generated by looking for tablet models on Goldeneye and removing
106# the ones that were not launched
107TABLET_MODELS = ['kakadu', 'kodama', 'krane', 'dru', 'druwl', 'dumo']
108
109# TODO(b/161005264) - Some tests rely on software rotation to pass, so we must
110# know which models don't use software rotation. Use a static list until we can
111# query the bluez API instead. Extended advertising is supported on platforms
112# on 4.19 and 5.4, with HrP2, JfP2, CcP2, RTL8822C, or QCN3991 chipsets.
113EXT_ADV_MODELS = ['ezkinil', 'trembyle', 'drawcia', 'drawlat', 'drawman',
114                  'maglia', 'magolor', 'sarien', 'arcada', 'akemi',
115                  'drallion', 'drallion360', 'hatch', 'stryke', 'helios',
116                  'dragonair', 'dratini', 'duffy', 'jinlon', 'kaisa',
117                  'kindred', 'kled', 'puff', 'kohaku', 'nightfury', 'morphius',
118                  'lazor', 'trogdor']
119
120# TODO(b/158336394) Realtek: Powers down during suspend due to high power usage
121#                            during S3.
122# TODO(b/168152910) Marvell: Powers down during suspend due to flakiness when
123#                            entering suspend.  This will also skip the tests
124#                            for Veyron (which don't power down right now) but
125#                            reconnect tests are still enabled for that platform
126#                            to check for suspend stability.
127SUSPEND_POWER_DOWN_CHIPSETS = ['Realtek-RTL8822C-USB', 'MVL-8897', 'MVL-8997']
128
129
130def method_name():
131    """Get the method name of a class.
132
133    This function is supposed to be invoked inside a class and will
134    return current method name who invokes this function.
135
136    @returns: the string of the method name inside the class.
137    """
138    return inspect.getouterframes(inspect.currentframe())[1][3]
139
140
141def _run_method(method, method_name, *args, **kwargs):
142    """Run a target method and capture exceptions if any.
143
144    This is just a wrapper of the target method so that we do not need to
145    write the exception capturing structure repeatedly. The method could
146    be either a device method or a facade method.
147
148    @param method: the method to run
149    @param method_name: the name of the method
150
151    @returns: the return value of target method() if successful.
152              False otherwise.
153
154    """
155    result = False
156    try:
157        result = method(*args, **kwargs)
158    except Exception as e:
159        logging.error('%s: %s', method_name, e)
160    except:
161        logging.error('%s: unexpected error', method_name)
162    return result
163
164
165def get_bluetooth_emulated_device(btpeer, device_type):
166    """Get the bluetooth emulated device object.
167
168    @param btpeer: the Bluetooth peer device
169    @param device_type : the bluetooth device type, e.g., 'MOUSE'
170
171    @returns: the bluetooth device object
172
173    """
174
175    def _retry_device_method(method_name, legal_falsy_values=[]):
176        """retry the emulated device's method.
177
178        The method is invoked as device.xxxx() e.g., device.GetAdvertisedName().
179
180        Note that the method name string is provided to get the device's actual
181        method object at run time through getattr(). The rebinding is required
182        because a new device may have been created previously or during the
183        execution of fix_serial_device().
184
185        Given a device's method, it is not feasible to get the method name
186        through __name__ attribute. This limitation is due to the fact that
187        the device is a dotted object of an XML RPC server proxy.
188        As an example, with the method name 'GetAdvertisedName', we could
189        derive the correspoinding method device.GetAdvertisedName. On the
190        contrary, given device.GetAdvertisedName, it is not feasible to get the
191        method name by device.GetAdvertisedName.__name__
192
193        Also note that if the device method fails, we would try remediation
194        step and retry the device method. The remediation steps are
195         1) re-creating the serial device.
196         2) reset (powercycle) the bluetooth dongle.
197         3) reboot Bluetooth peer.
198        If the device method still fails after these steps, we fail the test
199
200        The default values exist for uses of this function before the options
201        were added, ideally we should change zero_ok to False.
202
203        @param method_name: the string of the method name.
204        @param legal_falsy_values: Values that are falsy but might be OK.
205
206        @returns: the result returned by the device's method if the call was
207                  successful
208
209        @raises: TestError if the devices's method fails or if repair of
210                 peripheral kit fails
211
212        """
213
214        action_table = [('recreate' , 'Fixing the serial device'),
215                        ('reset', 'Power cycle the peer device'),
216                        ('reboot', 'Reboot the chamleond host')]
217
218        for i, (action, description) in enumerate(action_table):
219            logging.info('Attempt %s : %s ', i+1, method_name)
220
221            result = _run_method(getattr(device, method_name), method_name)
222            if _is_successful(result, legal_falsy_values):
223                return result
224
225            logging.error('%s failed the %s time. Attempting to %s',
226                          method_name,i,description)
227            if not fix_serial_device(btpeer, device, action):
228                logging.info('%s failed', description)
229            else:
230                logging.info('%s successful', description)
231
232        #try it last time after fix it by last action
233        result = _run_method(getattr(device, method_name), method_name)
234        if _is_successful(result, legal_falsy_values):
235            return result
236
237        raise error.TestError('Failed to execute %s. Bluetooth peer device is'
238                              'not working' % method_name)
239
240
241    if device_type not in SUPPORTED_DEVICE_TYPES:
242        raise error.TestError('The device type is not supported: %s',
243                              device_type)
244
245    # Get the bluetooth device object and query some important properties.
246    device = SUPPORTED_DEVICE_TYPES[device_type](btpeer)()
247
248    # Get some properties of the kit
249    # NOTE: Strings updated here must be kept in sync with Btpeer.
250    device._capabilities = _retry_device_method('GetCapabilities')
251    device._transports = device._capabilities["CAP_TRANSPORTS"]
252    device._is_le_only = ("TRANSPORT_LE" in device._transports and
253                          len(device._transports) == 1)
254    device._has_pin = device._capabilities["CAP_HAS_PIN"]
255    device.can_init_connection = device._capabilities["CAP_INIT_CONNECT"]
256
257    _retry_device_method('Init')
258    logging.info('device type: %s', device_type)
259
260    device.name = _retry_device_method('GetAdvertisedName')
261    logging.info('device name: %s', device.name)
262
263    device.address = _retry_device_method('GetLocalBluetoothAddress')
264    logging.info('address: %s', device.address)
265
266    pin_falsy_values = [] if device._has_pin else [None]
267    device.pin = _retry_device_method('GetPinCode', pin_falsy_values)
268    logging.info('pin: %s', device.pin)
269
270    class_falsy_values = [None] if device._is_le_only else [0]
271
272    # Class of service is None for LE-only devices. Don't fail or parse it.
273    device.class_of_service = _retry_device_method('GetClassOfService',
274                                                   class_falsy_values)
275    if device._is_le_only:
276        parsed_class_of_service = device.class_of_service
277    else:
278        parsed_class_of_service = "0x%04X" % device.class_of_service
279    logging.info('class of service: %s', parsed_class_of_service)
280
281    device.class_of_device = _retry_device_method('GetClassOfDevice',
282                                                  class_falsy_values)
283    # Class of device is None for LE-only devices. Don't fail or parse it.
284    if device._is_le_only:
285        parsed_class_of_device = device.class_of_device
286    else:
287        parsed_class_of_device = "0x%04X" % device.class_of_device
288    logging.info('class of device: %s', parsed_class_of_device)
289
290    device.device_type = _retry_device_method('GetDeviceType')
291    logging.info('device type: %s', device.device_type)
292
293    device.authentication_mode = None
294    if not device._is_le_only:
295        device.authentication_mode = _retry_device_method('GetAuthenticationMode')
296        logging.info('authentication mode: %s', device.authentication_mode)
297
298    device.port = _retry_device_method('GetPort')
299    logging.info('serial port: %s\n', device.port)
300
301    return device
302
303
304def recreate_serial_device(device):
305    """Create and connect to a new serial device.
306
307    @param device: the bluetooth HID device
308
309    @returns: True if the serial device is re-created successfully.
310
311    """
312    logging.info('Remove the old serial device and create a new one.')
313    if device is not None:
314        try:
315            device.Close()
316        except:
317            logging.error('failed to close the serial device.')
318            return False
319    try:
320        device.CreateSerialDevice()
321        return True
322    except:
323        logging.error('failed to invoke CreateSerialDevice.')
324        return False
325
326
327def _check_device_init(device, operation):
328    # Check if the serial device could initialize, connect, and
329    # enter command mode correctly.
330    logging.info('Checking device status...')
331    if not _run_method(device.Init, 'Init'):
332        logging.info('device.Init: failed after %s', operation)
333        return False
334    if not device.CheckSerialConnection():
335        logging.info('device.CheckSerialConnection: failed after %s', operation)
336        return False
337    if not _run_method(device.EnterCommandMode, 'EnterCommandMode'):
338        logging.info('device.EnterCommandMode: failed after %s', operation)
339        return False
340    logging.info('The device is created successfully after %s.', operation)
341    return True
342
343def _reboot_btpeer(btpeer, device):
344    """ Reboot Bluetooth peer device.
345
346    Also power cycle the device since reboot may not do that.."""
347
348    # Chameleond fizz hosts should have write protect removed and
349    # set_gbb_flags set to 0 to minimize boot time
350    REBOOT_SLEEP_SECS = 10
351    RESET_SLEEP_SECS = 1
352
353    # Close the bluetooth peripheral device and reboot the chameleon board.
354    device.Close()
355    logging.info("Powercycling the device")
356    device.PowerCycle()
357    time.sleep(RESET_SLEEP_SECS)
358    logging.info('rebooting Bluetooth peer...')
359    btpeer.reboot()
360
361    # Every btpeer reboot would take a bit more than REBOOT_SLEEP_SECS.
362    # Sleep REBOOT_SLEEP_SECS and then begin probing the btpeer board.
363    time.sleep(REBOOT_SLEEP_SECS)
364    return _check_device_init(device, 'reboot')
365
366def _reset_device_power(device):
367    """Power cycle the device."""
368    RESET_SLEEP_SECS = 1
369    try:
370        if not device.PowerCycle():
371            logging.info('device.PowerCycle() failed')
372            return False
373    except:
374        logging.error('exception in device.PowerCycle')
375    else:
376        logging.info('device powercycled')
377    time.sleep(RESET_SLEEP_SECS)
378    return _check_device_init(device, 'reset')
379
380def _is_successful(result, legal_falsy_values=[]):
381    """Is the method result considered successful?
382
383    Some method results, for example that of class_of_service, may be 0 which is
384    considered a valid result. Occassionally, None is acceptable.
385
386    The default values exist for uses of this function before the options were
387    added, ideally we should change zero_ok to False.
388
389    @param result: a method result
390    @param legal_falsy_values: Values that are falsy but might be OK.
391
392    @returns: True if bool(result) is True, or if result is 0 and zero_ok, or if
393              result is None and none_ok.
394    """
395    truthiness_of_result = bool(result)
396    return truthiness_of_result or result in legal_falsy_values
397
398
399def _flag_common_failures(instance):
400    """Checks if a common failure has occurred during the test run
401
402    Scans system logs for known signs of failure. If a failure is discovered,
403    it is added to the test results, to make it easier to identify common root
404    causes from Stainless
405    """
406
407    for fail_tag, fail_log in COMMON_FAILURES.items():
408        if instance.bluetooth_facade.messages_find(fail_tag):
409            logging.error('Detected failure tag: %s', fail_tag)
410            # We mark this instance's results with the discovered failure
411            if type(instance.results) is dict:
412                instance.results[fail_log] = True
413
414
415def fix_serial_device(btpeer, device, operation='reset'):
416    """Fix the serial device.
417
418    This function tries to fix the serial device by
419    (1) re-creating a serial device, or
420    (2) power cycling the usb port to which device is connected
421    (3) rebooting the Bluetooth peeer
422
423    Argument operation determine which of the steps above are perform
424
425    Note that rebooting the btpeer board or resetting the device will remove
426    the state on the peripheral which might cause test failures. Please use
427    reset/reboot only before or after a test.
428
429    @param btpeer: the Bluetooth peer
430    @param device: the bluetooth device.
431    @param operation: Recovery operation to perform 'recreate/reset/reboot'
432
433    @returns: True if the serial device is fixed. False otherwise.
434
435    """
436
437    if operation == 'recreate':
438        # Check the serial connection. Fix it if needed.
439        if device.CheckSerialConnection():
440            # The USB serial connection still exists.
441            # Re-connection suffices to solve the problem. The problem
442            # is usually caused by serial port change. For example,
443            # the serial port changed from /dev/ttyUSB0 to /dev/ttyUSB1.
444            logging.info('retry: creating a new serial device...')
445            return recreate_serial_device(device)
446        else:
447            # Recreate the bluetooth peer device
448            return _check_device_init(device, operation)
449
450    elif operation == 'reset':
451        # Powercycle the USB port where the bluetooth peer device is connected.
452        # RN-42 and RN-52 share the same vid:pid so both will be powercycled.
453        # This will only work on fizz host with write protection removed.
454        # Note that the state on the device will be lost.
455        return _reset_device_power(device)
456
457    elif operation == 'reboot':
458        # Reboot the Bluetooth peer device.
459        # The device is power cycled before rebooting Bluetooth peer device
460        return _reboot_btpeer(btpeer, device)
461
462    else:
463        logging.error('fix_serial_device Invalid operation %s', operation)
464        return False
465
466
467def retry(test_method, instance, *args, **kwargs):
468    """Execute the target facade test_method(). Retry if failing the first time.
469
470    A test_method is something like self.test_xxxx() in BluetoothAdapterTests,
471    e.g., BluetoothAdapterTests.test_bluetoothd_running().
472
473    @param test_method: the test method to retry
474
475    @returns: True if the return value of test_method() is successful.
476              False otherwise.
477
478    """
479    if _is_successful(_run_method(test_method, test_method.__name__,
480                                  instance, *args, **kwargs)):
481        return True
482
483    # Try to fix the serial device if applicable.
484    logging.error('%s failed at the 1st time: (%s)', test_method.__name__,
485                  str(instance.results))
486
487    # If this test does not use any attached serial device, just re-run
488    # the test.
489    logging.info('%s: retry the 2nd time.', test_method.__name__)
490    time.sleep(1)
491
492
493    if not hasattr(instance, 'use_btpeer'):
494        return _is_successful(_run_method(test_method, test_method.__name__,
495                                          instance, *args, **kwargs))
496    for device_type in SUPPORTED_DEVICE_TYPES:
497        for device in getattr(instance, 'devices')[device_type]:
498            #fix_serial_device in 'recreate' mode doesn't require btpeer
499            #so just pass None for convenient.
500            if not fix_serial_device(None, device, "recreate"):
501                return False
502
503    logging.info('%s: retry the 2nd time.', test_method.__name__)
504    return _is_successful(_run_method(test_method, test_method.__name__,
505                                      instance, *args, **kwargs))
506
507
508def test_retry_and_log(test_method_or_retry_flag,
509                       messages_start=True,
510                       messages_stop=True):
511    """A decorator that logs test results, collects error messages, and retries
512       on request.
513
514    @param test_method_or_retry_flag: either the test_method or a retry_flag.
515        There are some possibilities of this argument:
516        1. the test_method to conduct and retry: should retry the test_method.
517            This occurs with
518            @test_retry_and_log
519        2. the retry flag is True. Should retry the test_method.
520            This occurs with
521            @test_retry_and_log(True)
522        3. the retry flag is False. Do not retry the test_method.
523            This occurs with
524            @test_retry_and_log(False)
525
526    @param messages_start: Start collecting messages before running the test
527    @param messages_stop: Stop collecting messages after running the test and
528                          analyze the results.
529
530    @returns: a wrapper of the test_method with test log. The retry mechanism
531        would depend on the retry flag.
532
533    """
534
535    def decorator(test_method):
536        """A decorator wrapper of the decorated test_method.
537
538        @param test_method: the test method being decorated.
539
540        @returns the wrapper of the test method.
541
542        """
543        @functools.wraps(test_method)
544        def wrapper(instance, *args, **kwargs):
545            """A wrapper of the decorated method.
546
547            @param instance: an BluetoothAdapterTests instance
548
549            @returns the result of the test method
550
551            """
552            instance.results = None
553            fail_msg = None
554            test_result = False
555            should_raise = hasattr(instance, 'fail_fast') and instance.fail_fast
556
557            instance.last_test_method = test_method.__name__
558            syslog_captured = False
559
560            try:
561                if messages_start:
562                    # Grab /var/log/messages output during test run
563                    instance.bluetooth_facade.messages_start()
564
565                if callable(test_method_or_retry_flag
566                            ) or test_method_or_retry_flag:
567                    test_result = retry(test_method, instance, *args, **kwargs)
568                else:
569                    test_result = test_method(instance, *args, **kwargs)
570
571                if messages_stop:
572                    syslog_captured = instance.bluetooth_facade.messages_stop()
573
574                if test_result:
575                    logging.info('[*** passed: {}]'.format(
576                            test_method.__name__))
577                else:
578                    if syslog_captured:
579                        _flag_common_failures(instance)
580                    fail_msg = '[--- failed: {} ({})]'.format(
581                            test_method.__name__, str(instance.results))
582                    logging.error(fail_msg)
583                    instance.fails.append(fail_msg)
584            # Log TestError and TestNA and let the quicktest wrapper catch it.
585            # Those errors should skip out of the testcase entirely.
586            except error.TestNAError as e:
587                fail_msg = '[--- TESTNA {} ({})]'.format(
588                        test_method.__name__, str(e))
589                logging.error(fail_msg)
590                raise
591            except error.TestError as e:
592                fail_msg = '[--- ERROR {} ({})]'.format(
593                        test_method.__name__, str(e))
594                logging.error(fail_msg)
595                raise
596            except error.TestFail as e:
597                fail_msg = '[--- failed {} ({})]'.format(
598                        test_method.__name__, str(e))
599                logging.error(fail_msg)
600                instance.fails.append(fail_msg)
601                should_raise = True
602
603            # Check whether we should fail fast
604            if fail_msg and should_raise:
605                logging.info('Fail fast')
606                raise error.TestFail(instance.fails)
607
608            return test_result
609
610        return wrapper
611
612    if callable(test_method_or_retry_flag):
613        # If the decorator function comes with no argument like
614        # @test_retry_and_log
615        return decorator(test_method_or_retry_flag)
616    else:
617        # If the decorator function comes with an argument like
618        # @test_retry_and_log(False)
619        return decorator
620
621
622def test_case_log(method):
623    """A decorator for test case methods.
624
625    The main purpose of this decorator is to display the test case name
626    in the test log which looks like
627
628        <... test_case_RA3_CD_SI200_CD_PC_CD_UA3 ...>
629
630    @param method: the test case method to decorate.
631
632    @returns: a wrapper function of the decorated method.
633
634    """
635    @functools.wraps(method)
636    def wrapper(instance, *args, **kwargs):
637        """Log the name of the wrapped method before execution"""
638        logging.info('\n<... %s ...>', method.__name__)
639        method(instance, *args, **kwargs)
640    return wrapper
641
642
643class BluetoothAdapterTests(test.test):
644    """Server side bluetooth adapter tests.
645
646    This test class tries to thoroughly verify most of the important work
647    states of a bluetooth adapter.
648
649    The various test methods are supposed to be invoked by actual autotest
650    tests such as server/cros/site_tests/bluetooth_Adapter*.
651
652    """
653    version = 1
654    ADAPTER_ACTION_SLEEP_SECS = 1
655    ADAPTER_PAIRING_TIMEOUT_SECS = 60
656    ADAPTER_CONNECTION_TIMEOUT_SECS = 30
657    # Wait after connect for input device to be ready for use
658    ADAPTER_HID_INPUT_DELAY = 5
659    ADAPTER_DISCONNECTION_TIMEOUT_SECS = 30
660    ADAPTER_PAIRING_POLLING_SLEEP_SECS = 3
661    ADAPTER_DISCOVER_TIMEOUT_SECS = 60          # 30 seconds too short sometimes
662    ADAPTER_DISCOVER_POLLING_SLEEP_SECS = 1
663    ADAPTER_DISCOVER_NAME_TIMEOUT_SECS = 30
664    ADAPTER_WAKE_ENABLE_TIMEOUT_SECS = 30
665
666    ADAPTER_WAIT_DEFAULT_TIMEOUT_SECS = 10
667    ADAPTER_POLLING_DEFAULT_SLEEP_SECS = 1
668
669    HID_REPORT_SLEEP_SECS = 1
670
671
672    DEFAULT_START_DELAY_SECS = 0
673    DEFAULT_HOLD_INTERVAL_SECS = 10
674    DEFAULT_HOLD_TIMEOUT_SECS = 60
675    DEFAULT_HOLD_SLEEP_SECS = 1
676
677    # Default suspend time in seconds for suspend resume.
678    SUSPEND_TIME_SECS=10
679    SUSPEND_ENTER_SECS=10
680    RESUME_TIME_SECS=30
681    RESUME_INTERNAL_TIMEOUT_SECS = 180
682
683    # Minimum RSSI required for peer devices during testing
684    MIN_RSSI = -70
685
686    # hci0 is the default hci device if there is no external bluetooth dongle.
687    EXPECTED_HCI = 'hci0'
688
689    CLASS_OF_SERVICE_MASK = 0xFFE000
690    CLASS_OF_DEVICE_MASK = 0x001FFF
691
692    # Constants about advertising.
693    DAFAULT_MIN_ADVERTISEMENT_INTERVAL_MS = 181.25
694    DAFAULT_MAX_ADVERTISEMENT_INTERVAL_MS = 181.25
695    ADVERTISING_INTERVAL_UNIT = 0.625
696
697    # Error messages about advertising dbus methods.
698    ERROR_FAILED_TO_REGISTER_ADVERTISEMENT = (
699            'org.bluez.Error.NotPermitted: Maximum advertisements reached')
700    ERROR_INVALID_ADVERTISING_INTERVALS = (
701            'org.bluez.Error.InvalidArguments: Invalid arguments')
702
703    # Supported profiles by chrome os.
704    SUPPORTED_UUIDS = {
705            'GATT_UUID': '00001801-0000-1000-8000-00805f9b34fb',
706            'A2DP_SOURCE_UUID': '0000110a-0000-1000-8000-00805f9b34fb',
707            'HFP_AG_UUID': '0000111f-0000-1000-8000-00805f9b34fb',
708            'PNP_UUID': '00001200-0000-1000-8000-00805f9b34fb',
709            'GAP_UUID': '00001800-0000-1000-8000-00805f9b34fb'}
710
711    # Board list for name/ID test check. These devices don't need to be tested
712    REFERENCE_BOARDS = [
713            'rambi', 'nyan', 'oak', 'reef', 'yorp', 'bip', 'volteer',
714            'volteer2'
715    ]
716
717    # Path for btmon logs
718    BTMON_DIR_LOG_PATH = '/var/log/btmon'
719
720    # Path for usbmon logs
721    USBMON_DIR_LOG_PATH = '/var/log/usbmon'
722
723    # The agent capability of various device types.
724    AGENT_CAPABILITY = {
725            'BLUETOOTH_AUDIO': 'NoInputNoOutput',
726    }
727
728    def assert_on_fail(self, result, raiseNA=False):
729        """ If the called function returns a false-like value, raise an error.
730
731        Call test methods (i.e. with @test_retry_and_log) wrapped with this
732        function and failures will raise instead of continuing the test.
733
734        For example:
735            self.assert_on_fail(self.test_pairing(...))
736
737        @param result: Result of test method called.
738        @param raiseNA: Whether to raise TestNAError instead of TestFail
739
740        @raises error.TestNAError
741        @raises error.TestFail
742        """
743        if not result:
744            failure_msg = 'Assert on fail: {}'.format(self.last_test_method)
745            logging.error(failure_msg)
746            if raiseNA:
747                raise error.TestNAError(failure_msg)
748            else:
749                raise error.TestFail(failure_msg)
750
751
752    # TODO(b/131170539) remove when sarien/arcada no longer have _signed
753    # postfix
754    def get_base_platform_name(self):
755        """Returns the DUT platform name
756
757        If the DUT is a DVT device, _signed or _unsigned may be appended
758            to the device name, which we should ignore in our BT tests
759
760        @returns: String name of the DUT's platform with _signed or
761                _unsigned removed
762        """
763
764        platform = self.host.get_platform()
765
766        return platform.replace('_signed', '').replace('_unsigned', '')
767
768
769    def group_btpeers_type(self):
770        """Group all Bluetooth peers by the type of their detected device."""
771
772        # Use previously created btpeer_group instead of creating new
773        if len(self.btpeer_group_copy) > 0:
774            logging.info('Using previously created btpeer group')
775            for device_type in SUPPORTED_DEVICE_TYPES:
776                self.btpeer_group[device_type] = \
777                    self.btpeer_group_copy[device_type][:]
778            return
779
780        # Create new btpeer_group
781        for device_type in SUPPORTED_DEVICE_TYPES:
782            self.btpeer_group[device_type] = list()
783            # Create copy of btpeer_group
784            self.btpeer_group_copy[device_type] = list()
785
786        for idx, btpeer in enumerate(self.host.btpeer_list):
787            for device_type,gen_device_func in SUPPORTED_DEVICE_TYPES.items():
788                try:
789                    device = gen_device_func(btpeer)()
790                    if device.CheckSerialConnection():
791                        self.btpeer_group[device_type].append(btpeer)
792                        logging.info('%d-th btpeer find device %s', \
793                                     idx, device_type)
794                        # Create copy of btpeer_group
795                        self.btpeer_group_copy[device_type].append(btpeer)
796                except:
797                    logging.debug('Error with initializing %s on %d-th'
798                                  'btpeer', device_type, idx)
799            if len(self.btpeer_group[device_type]) == 0:
800                logging.error('No device is detected on %d-th btpeer', idx)
801
802        logging.debug("self.bt_group is %s",self.btpeer_group)
803
804
805    def wait_for_device(self, device, timeout=10):
806        """Waits for device to become available again
807
808        We reset raspberry pi peer between tests. This method helps us wait to
809        prevent us from trying to use the device before it comes back up again.
810
811        @param device: proxy object of peripheral device
812        """
813
814        def is_device_ready():
815            """Tries to use a service of the device
816
817            @returns: True if device is available to provide service
818                      False otherwise
819            """
820
821            try:
822                # Call a simple (fast) function to determine if device is online
823                # and reachable. If we can query this property, we know the
824                # device is available for us to use
825                getattr(device, 'GetCapabilities')()
826
827            except Exception as e:
828                return False
829
830            return True
831
832
833        try:
834            utils.poll_for_condition(condition=is_device_ready,
835                                     desc='wait_for_device',
836                                     timeout=timeout)
837
838        except utils.TimeoutError as e:
839            raise error.TestError('Peer is not available after waiting')
840
841
842    def clear_raspi_device(self, device):
843        """Clears a device on a raspi peer by resetting bluetooth stack
844
845        @param device: proxy object of peripheral device
846        """
847
848        try:
849            device.ResetStack()
850
851        except socket.error as e:
852            # Ignore conn reset, expected during stack reset
853            if e.errno != errno.ECONNRESET:
854                raise
855
856        except chameleon.ChameleonConnectionError as e:
857            # Ignore chameleon conn reset, expected during stack reset
858            if str(errno.ECONNRESET) not in str(e):
859                raise
860
861        except six.moves.http_client.BadStatusLine as e:
862            # BadStatusLine occurs occasionally when chameleon
863            # is restarted. We ignore it here
864            logging.error('Ignoring badstatusline exception')
865            pass
866
867        # Catch generic Fault exception by rpc server, ignore
868        # method not available as it indicates platform didn't
869        # support method and that's ok
870        except Exception as e:
871            if not (e.__class__.__name__ == 'Fault' and
872                'is not supported' in str(e)):
873                raise
874
875        # Ensure device is back online before continuing
876        self.wait_for_device(device, timeout=30)
877
878    def device_set_powered(self, device, powered):
879        """Set raspi BT powered state.
880
881        @param powered: Powered state to set on Raspi.
882        """
883        if powered:
884            device.AdapterPowerOn()
885        else:
886            device.AdapterPowerOff()
887
888    def get_device_rasp(self, device_num, on_start=True):
889        """Get all bluetooth device objects from Bluetooth peer devices
890        This method should be called only after group_btpeers_type
891        @param device_num : dict of {device_type:number}, to specify the number
892                            of device needed for each device_type.
893
894        @param on_start: boolean describing whether the requested clear is for a
895                            new test, or in the middle of a current one
896
897        @returns: True if Success.
898        """
899
900        logging.info("in get_device_rasp %s onstart %s", device_num, on_start)
901        total_num_devices = sum(device_num.values())
902        if total_num_devices > len(self.host.btpeer_list):
903            logging.error(
904                    'Total number of devices %s is greater than the'
905                    ' number of Bluetooth peers %s', total_num_devices,
906                    len(self.host.btpeer_list))
907            return False
908
909        for device_type, number in device_num.items():
910            total_num_devices += number
911            if len(self.btpeer_group[device_type]) < number:
912                logging.error('Number of Bluetooth peers with device type'
913                      '%s is %d, which is less then needed %d', device_type,
914                      len(self.btpeer_group[device_type]), number)
915                return False
916
917            for btpeer in self.btpeer_group[device_type][:number]:
918                logging.info("getting emulated %s", device_type)
919                device = self.reset_device(btpeer, device_type, on_start)
920
921                self.devices[device_type].append(device)
922
923                # Remove this btpeer from btpeer_group since it is already
924                # configured as a specific device
925                for temp_device in SUPPORTED_DEVICE_TYPES:
926                    if btpeer in self.btpeer_group[temp_device]:
927                        self.btpeer_group[temp_device].remove(btpeer)
928
929        return True
930
931
932    def get_device(self, device_type, on_start=True):
933        """Get the bluetooth device object.
934
935        @param device_type : the bluetooth device type, e.g., 'MOUSE'
936
937        @param on_start: boolean describing whether the requested clear is for a
938                            new test, or in the middle of a current one
939
940        @returns: the bluetooth device object
941
942        """
943
944        self.devices[device_type].append(
945                self.reset_device(self.host.btpeer, device_type, on_start))
946
947        return self.devices[device_type][-1]
948
949
950    def reset_device(self, peer, device_type, clear_device=True):
951        """Reset the peer device in order to be used as a different type.
952
953        @param peer: the peer device to reset with new device type
954        @param device_type : the new bluetooth device type, e.g., 'MOUSE'
955        @param clear_device: whether to clear the device state
956
957        @returns: the bluetooth device object
958
959        """
960        device = get_bluetooth_emulated_device(peer, device_type)
961
962        # Re-fresh device to clean state if test is starting
963        if clear_device:
964            self.clear_raspi_device(device)
965
966        try:
967            # Tell generic chameleon to bind to this device type
968            device.SpecifyDeviceType(device_type)
969
970        # Catch generic Fault exception by rpc server, ignore method not
971        # available as it indicates platform didn't support method and that's
972        # ok
973        except Exception as e:
974            if not (e.__class__.__name__ == 'Fault' and
975                    'is not supported' in str(e)):
976                logging.error("got exception %s", str(e))
977                raise
978
979        return device
980
981
982    def is_device_available(self, btpeer, device_type):
983        """Determines if the named device is available on the linked peer
984
985        @param device_type: the bluetooth HID device type, e.g., 'MOUSE'
986
987        @returns: True if it is able to resolve the device, false otherwise
988        """
989
990        device = SUPPORTED_DEVICE_TYPES[device_type](btpeer)()
991        try:
992            # The proxy prevents us from checking if the object is None directly
993            # so instead we call a fast method that any peripheral must support.
994            # This will fail if the object over the proxy doesn't exist
995            getattr(device, 'GetCapabilities')()
996
997        except Exception as e:
998            return False
999
1000        return True
1001
1002
1003    def list_devices_available(self):
1004        """Queries which devices are available on btpeer(s)
1005
1006        @returns: dict mapping HID device types to number of supporting peers
1007                  available, e.g. {'MOUSE':1, 'KEYBOARD':1}
1008        """
1009        devices_available = {}
1010        for device_type in SUPPORTED_DEVICE_TYPES:
1011            for btpeer in self.host.btpeer_list:
1012                if self.is_device_available(btpeer, device_type):
1013                    devices_available[device_type] = \
1014                        devices_available.get(device_type, 0) + 1
1015
1016        logging.debug("devices available are %s", devices_available)
1017        return devices_available
1018
1019
1020    def suspend_resume(self, suspend_time=SUSPEND_TIME_SECS):
1021        """Suspend the DUT for a while and then resume.
1022
1023        @param suspend_time: the suspend time in secs
1024        @raises errors.TestFail if the device reboots during suspend
1025
1026        """
1027        boot_id = self.host.get_boot_id()
1028        suspend = self.suspend_async(suspend_time=suspend_time)
1029        start_time = self.bluetooth_facade.get_device_time()
1030
1031        # Give the system some time to enter suspend
1032        self.test_suspend_and_wait_for_sleep(
1033                suspend, sleep_timeout=self.SUSPEND_ENTER_SECS)
1034
1035        # Wait for resume - since we're not testing suspend itself, we are
1036        # lenient with the resume time here
1037        self.test_wait_for_resume(boot_id,
1038                                  suspend,
1039                                  resume_timeout=suspend_time,
1040                                  test_start_time=start_time)
1041
1042
1043    def reboot(self):
1044        """Reboot the DUT and recreate necessary processes and variables"""
1045        self.host.reboot()
1046
1047        # We need to recreate the bluetooth_facade after a reboot.
1048        # Delete the proxy first so it won't delete the old one, which
1049        # invokes disconnection, after creating the new one.
1050        if hasattr(self, 'factory'):
1051            del self.factory
1052        if hasattr(self, 'bluetooth_facade'):
1053            del self.bluetooth_facade
1054        if hasattr(self, 'input_facade'):
1055            del self.input_facade
1056        self.factory = remote_facade_factory.RemoteFacadeFactory(
1057                self.host, disable_arc=True, no_chrome=not self.start_browser)
1058        self.bluetooth_facade = self.factory.create_bluetooth_facade()
1059        self.input_facade = self.factory.create_input_facade()
1060
1061        # Re-enable debugging verbose since Chrome will set it to
1062        # default(disable).
1063        self.enable_disable_debug_log(enable=True)
1064
1065        # Re-disable cellular
1066        self.enable_disable_cellular(enable=False)
1067
1068        # Re-disable ui
1069        self.enable_disable_ui(enable=False)
1070
1071        self.start_new_btmon()
1072        self.start_new_usbmon()
1073
1074
1075    def _wait_till_condition_holds(self, func, method_name,
1076                                   timeout=DEFAULT_HOLD_TIMEOUT_SECS,
1077                                   sleep_interval=DEFAULT_HOLD_SLEEP_SECS,
1078                                   hold_interval=DEFAULT_HOLD_INTERVAL_SECS,
1079                                   start_delay=DEFAULT_START_DELAY_SECS):
1080        """ Wait for the func() to hold true for a period of time
1081
1082
1083        @param func: the function to wait for.
1084        @param method_name: the invoking class method.
1085        @param timeout: number of seconds to wait before giving up.
1086        @param sleep_interval: the interval in seconds to sleep between
1087                invoking func().
1088        @param hold_interval: the interval in seconds for the condition to
1089                             remain true
1090        @param start_delay: interval in seconds to wait before starting
1091
1092        @returns: True if the condition is met,
1093                  False otherwise
1094
1095        """
1096        if start_delay > 0:
1097            logging.debug('waiting for %s secs before checking %s',start_delay,
1098                          method_name)
1099            time.sleep(start_delay)
1100
1101        try:
1102            utils.poll_till_condition_holds(condition=func,
1103                                            timeout=timeout,
1104                                            sleep_interval=sleep_interval,
1105                                            hold_interval = hold_interval,
1106                                            desc=('Waiting %s' % method_name))
1107            return True
1108        except utils.TimeoutError as e:
1109            logging.error('%s: %s', method_name, e)
1110        except Exception as e:
1111            logging.error('%s: %s', method_name, e)
1112            err = 'bluetoothd possibly crashed. Check out /var/log/messages.'
1113            logging.error(err)
1114        except:
1115            logging.error('%s: unexpected error', method_name)
1116        return False
1117
1118
1119    def _wait_for_condition(self, func, method_name,
1120                            timeout=ADAPTER_WAIT_DEFAULT_TIMEOUT_SECS,
1121                            sleep_interval=ADAPTER_POLLING_DEFAULT_SLEEP_SECS,
1122                            start_delay=DEFAULT_START_DELAY_SECS):
1123        """Wait for the func() to become True.
1124
1125        @param func: the function to wait for.
1126        @param method_name: the invoking class method.
1127        @param timeout: number of seconds to wait before giving up.
1128        @param sleep_interval: the interval in seconds to sleep between
1129                invoking func().
1130        @param start_delay: interval in seconds to wait before starting
1131
1132        @returns: True if the condition is met,
1133                  False otherwise
1134
1135        """
1136
1137        if start_delay > 0:
1138            logging.debug('waiting for %s secs before checking %s',start_delay,
1139                          method_name)
1140            time.sleep(start_delay)
1141
1142        try:
1143            utils.poll_for_condition(condition=func,
1144                                     timeout=timeout,
1145                                     sleep_interval=sleep_interval,
1146                                     desc=('Waiting %s' % method_name))
1147            return True
1148        except utils.TimeoutError as e:
1149            logging.error('%s: %s', method_name, e)
1150        except Exception as e:
1151            logging.error('%s: %s', method_name, e)
1152            err = 'bluetoothd possibly crashed. Check out /var/log/messages.'
1153            logging.error(err)
1154        except:
1155            logging.error('%s: unexpected error', method_name)
1156        return False
1157
1158    def ignore_failure(instance, test_method, *args, **kwargs):
1159        """ Wrapper to prevent a test_method failure from failing the test batch
1160
1161        Sometimes a test method needs to be used as a normal function, for its
1162        result. This wrapper prevent test_method failure being recorded in
1163        instance.fails and causing a failure of the quick test batch.
1164
1165        @param test_method: test_method
1166        @returns: result of the test_method
1167        """
1168
1169        original_fails = instance.fails[:]
1170        test_result = test_method(*args, **kwargs)
1171        if not test_result:
1172            logging.info("%s failure is ignored",test_method.__name__)
1173            instance.fails = original_fails
1174        return test_result
1175
1176
1177    def start_agent(self, device):
1178        """Start the pairing agent of the device if applicable.
1179
1180        @param device: the peer device
1181        """
1182        dev_type = device.GetDeviceType()
1183        capability = self.AGENT_CAPABILITY.get(dev_type)
1184        if capability:
1185            device.StartPairingAgent(capability)
1186
1187
1188    def stop_agent(self, device):
1189        """Stop the pairing agent of the device if applicable.
1190
1191        @param device: the peer device
1192        """
1193        dev_type = device.GetDeviceType()
1194        capability = self.AGENT_CAPABILITY.get(dev_type)
1195        if capability:
1196            device.StopPairingAgent()
1197
1198
1199    # -------------------------------------------------------------------
1200    # Adater standalone tests
1201    # -------------------------------------------------------------------
1202
1203
1204    def service_exists(self, service_name):
1205        """Checks if a service exists on the DUT
1206
1207        @param service_name: name of the service
1208
1209        @returns: True if service status can be queried, else False
1210        """
1211
1212        status_cmd = 'initctl status {}'.format(service_name)
1213        try:
1214            # Querying the status of a non-existent service throws an
1215            # AutoservRunError exception.  If no exception is thrown, we know
1216            # the service exists
1217            self.host.run(status_cmd)
1218
1219        except error.AutoservRunError:
1220            return False
1221
1222        return True
1223
1224
1225    def service_enabled(self, service_name):
1226        """Checks if a service is running on the DUT
1227
1228        @param service_name: name of the service
1229
1230        @throws: AutoservRunError is thrown if there is no service with the
1231                provided name installed on the DUT.
1232
1233        @returns: True if service is currently running, else False
1234        """
1235
1236        status_cmd = 'initctl status {}'.format(service_name)
1237        output = self.host.run(status_cmd).stdout
1238
1239        return 'start/running' in output
1240
1241
1242    def _initctl_services(self, services, command):
1243        """Use initctl to control service on the DUT
1244
1245        @param services: list of string service names
1246        @param command: initctl command on the services
1247                        'start': to start the service
1248                        'stop': to stop the service
1249                        'restart': to restart the service
1250
1251        @returns: True if services were set successfully, else False
1252        """
1253        for service in services:
1254            # Some platforms will not support all services. In these cases,
1255            # no need to fail, since they won't interfere with our tests
1256            if not self.service_exists(service):
1257                logging.debug('Service %s does not exist on DUT', service)
1258                continue
1259
1260            # A sample call to enable or disable a service is as follows:
1261            # "initctl stop modemfwd"
1262            if command in ['start', 'stop']:
1263                enable = command == 'start'
1264                if self.service_enabled(service) != enable:
1265                    self.host.run('initctl {} {}'.format(command, service))
1266
1267                if self.service_enabled(service) != enable:
1268                    logging.error('Failed to set initctl service to state %d',
1269                                  enable)
1270                    return False
1271
1272                if enable:
1273                    logging.info('Service {} enabled'.format(service))
1274                else:
1275                    logging.info('Service {} disabled'.format(service))
1276
1277            elif command == 'restart':
1278                if self.service_enabled(service):
1279                    self.host.run('initctl {} {}'.format(command, service))
1280                else:
1281                    # Just start a stopped job.
1282                    self.host.run('initctl {} {}'.format('start', service))
1283                logging.info('Service {} restarted'.format(service))
1284
1285            else:
1286                logging.error('unknown command {} on services {}'.format(
1287                              command, services))
1288                return False
1289
1290        return True
1291
1292
1293    def enable_disable_services(self, services, enable):
1294        """Enable or disable service on the DUT
1295
1296        @param services: list of string service names
1297        @param enable: True to enable services, False to disable
1298
1299        @returns: True if services were set successfully, else False
1300        """
1301        command = 'start' if enable else 'stop'
1302        return self._initctl_services(services, command)
1303
1304
1305    def enable_disable_cellular(self, enable):
1306        """Enable cellular services on the DUT
1307
1308        @param enable: True to enable cellular services
1309                       False to disable cellular services
1310
1311        @returns: True if services were set successfully, else False
1312        """
1313        cellular_services = ['modemmanager', 'modemfwd']
1314
1315        return self.enable_disable_services(cellular_services, enable)
1316
1317
1318    def enable_disable_ui(self, enable):
1319        """Enable UI service on the DUT
1320
1321        @param enable: True to enable UI services
1322                       False to disable UI services
1323
1324        @returns: True if services were set successfully, else False
1325        """
1326        ui_services = ['ui']
1327
1328        return self.enable_disable_services(ui_services, enable)
1329
1330
1331    def restart_services(self, services):
1332        """Restart a service on the DUT
1333
1334        @param services: the services, e.g., ['cras',]
1335
1336        @returns: True if services were set successfully, else False
1337        """
1338        return self._initctl_services(services, 'restart')
1339
1340
1341    def restart_cras(self):
1342        """Restart the cras service on the DUT
1343
1344        @returns: True if cras was restart successfully, else False
1345        """
1346        return self.restart_services(['cras', ])
1347
1348
1349    def enable_disable_debug_log(self, enable):
1350        """Enable or disable debug log in DUT
1351        @param enable: True to enable all of the debug log,
1352                       False to disable all of the debug log.
1353        """
1354        level = int(enable)
1355        self.bluetooth_facade.set_debug_log_levels(level, level, level, level)
1356
1357
1358    def start_new_btmon(self):
1359        """ Start a new btmon process and save the log """
1360
1361        # Kill all btmon process before creating a new one
1362        self.host.run('pkill btmon || true')
1363
1364        # Make sure the directory exists
1365        self.host.run('mkdir -p %s' % self.BTMON_DIR_LOG_PATH)
1366
1367        # Time format. Ex, 2020_02_20_17_52_45
1368        now = time.strftime("%Y_%m_%d_%H_%M_%S")
1369        file_name = 'btsnoop_%s' % now
1370        self.host.run_background('btmon -SAw %s/%s' % (self.BTMON_DIR_LOG_PATH,
1371                                                       file_name))
1372
1373    def start_new_usbmon(self):
1374        """ Start a new USBMON process and save the log """
1375
1376        # Kill all usbmon process before creating a new one
1377        self.host.run('pkill tcpdump || true')
1378
1379        # Make sure the directory exists
1380        self.host.run('mkdir -p %s' % self.USBMON_DIR_LOG_PATH)
1381
1382        # Time format. Ex, 2020_02_20_17_52_45
1383        now = time.strftime("%Y_%m_%d_%H_%M_%S")
1384        file_name = 'usbmon_%s' % now
1385        self.host.run_background('tcpdump -i usbmon0 -w %s/%s' %
1386                                 (self.USBMON_DIR_LOG_PATH, file_name))
1387
1388
1389    def log_message(self, msg):
1390        """ Write a string to log."""
1391        self.bluetooth_facade.log_message(msg)
1392
1393    def is_wrt_supported(self):
1394        """ Check if Bluetooth adapter support WRT logs. """
1395        return self.bluetooth_facade.is_wrt_supported()
1396
1397    def enable_wrt_logs(self):
1398        """ Enable WRT logs from Intel Adapters."""
1399        return self.bluetooth_facade.enable_wrt_logs()
1400
1401    def collect_wrt_logs(self):
1402        """ Collect WRT logs from Intel Adapters."""
1403        return self.bluetooth_facade.collect_wrt_logs()
1404
1405    def get_num_connected_devices(self):
1406        """ Return number of remote devices currently connected to the DUT.
1407
1408        @returns: The number of devices known to bluez with the Connected
1409            property active
1410        """
1411
1412        num_connected_devices = 0
1413        for dev in self.bluetooth_facade.get_devices():
1414            if dev and dev.get('Connected', 0):
1415                num_connected_devices += 1
1416
1417        return num_connected_devices
1418
1419    @test_retry_and_log
1420    def test_bluetoothd_running(self):
1421        """Test that bluetoothd is running."""
1422        return self.bluetooth_facade.is_bluetoothd_running()
1423
1424
1425    @test_retry_and_log
1426    def test_start_bluetoothd(self):
1427        """Test that bluetoothd could be started successfully."""
1428        return self.bluetooth_facade.start_bluetoothd()
1429
1430
1431    @test_retry_and_log
1432    def test_stop_bluetoothd(self):
1433        """Test that bluetoothd could be stopped successfully."""
1434        return self.bluetooth_facade.stop_bluetoothd()
1435
1436
1437    @test_retry_and_log
1438    def test_has_adapter(self):
1439        """Verify that there is an adapter. This will return True only if both
1440        the kernel and bluetooth daemon see the adapter.
1441        """
1442        return self.bluetooth_facade.has_adapter()
1443
1444    @test_retry_and_log
1445    def test_adapter_work_state(self):
1446        """Test that the bluetooth adapter is in the correct working state.
1447
1448        This includes that the adapter is detectable, is powered on,
1449        and its hci device is hci0.
1450        """
1451        has_adapter = self.bluetooth_facade.has_adapter()
1452        is_powered_on = self._wait_for_condition(
1453                self.bluetooth_facade.is_powered_on, method_name())
1454        hci = self.bluetooth_facade.get_hci() == self.EXPECTED_HCI
1455        self.results = {
1456                'has_adapter': has_adapter,
1457                'is_powered_on': is_powered_on,
1458                'hci': hci}
1459        return all(self.results.values())
1460
1461    @test_retry_and_log(False)
1462    def test_adapter_wake_enabled(self):
1463        """Test that the bluetooth adapter is wakeup enabled.
1464        """
1465        wake_enabled = self._wait_for_condition(
1466                self.bluetooth_facade.is_wake_enabled, method_name(),
1467                timeout=self.ADAPTER_WAKE_ENABLE_TIMEOUT_SECS)
1468
1469        self.results = { 'wake_enabled': wake_enabled }
1470        return any(self.results.values())
1471
1472    @test_retry_and_log(False)
1473    def test_device_wake_allowed(self, device_address):
1474        """Test that given device can wake the system."""
1475        self.results = {
1476                'Wake allowed':
1477                self.bluetooth_facade.get_device_property(
1478                        device_address, 'WakeAllowed')
1479        }
1480
1481        return all(self.results.values())
1482
1483    @test_retry_and_log(False)
1484    def test_device_wake_not_allowed(self, device_address):
1485        """Test that given device cannot wake the system."""
1486        self.results = {
1487                'Wake not allowed':
1488                not self.bluetooth_facade.get_device_property(
1489                        device_address, 'WakeAllowed')
1490        }
1491
1492        return all(self.results.values())
1493
1494    @test_retry_and_log(False)
1495    def test_adapter_set_wake_disabled(self):
1496        """Disable wake and verify it was written. """
1497        success = self.bluetooth_facade.set_wake_enabled(False)
1498        self.results = { 'disable_wake': success }
1499        return all(self.results.values())
1500
1501    @test_retry_and_log
1502    def test_power_on_adapter(self):
1503        """Test that the adapter could be powered on successfully."""
1504        power_on = self.bluetooth_facade.set_powered(True)
1505        is_powered_on = self._wait_for_condition(
1506                self.bluetooth_facade.is_powered_on, method_name())
1507
1508        self.results = {'power_on': power_on, 'is_powered_on': is_powered_on}
1509        return all(self.results.values())
1510
1511
1512    @test_retry_and_log
1513    def test_power_off_adapter(self):
1514        """Test that the adapter could be powered off successfully."""
1515        power_off = self.bluetooth_facade.set_powered(False)
1516        is_powered_off = self._wait_for_condition(
1517                lambda: not self.bluetooth_facade.is_powered_on(),
1518                method_name())
1519
1520        self.results = {
1521                'power_off': power_off,
1522                'is_powered_off': is_powered_off}
1523        return all(self.results.values())
1524
1525
1526    @test_retry_and_log
1527    def test_reset_on_adapter(self):
1528        """Test that the adapter could be reset on successfully.
1529
1530        This includes restarting bluetoothd, and removing the settings
1531        and cached devices.
1532        """
1533        reset_on = self.bluetooth_facade.reset_on()
1534        is_powered_on = self._wait_for_condition(
1535                self.bluetooth_facade.is_powered_on, method_name())
1536
1537        self.results = {'reset_on': reset_on, 'is_powered_on': is_powered_on}
1538        return all(self.results.values())
1539
1540
1541    @test_retry_and_log
1542    def test_reset_off_adapter(self):
1543        """Test that the adapter could be reset off successfully.
1544
1545        This includes restarting bluetoothd, and removing the settings
1546        and cached devices.
1547        """
1548        reset_off = self.bluetooth_facade.reset_off()
1549        is_powered_off = self._wait_for_condition(
1550                lambda: not self.bluetooth_facade.is_powered_on(),
1551                method_name())
1552
1553        self.results = {
1554                'reset_off': reset_off,
1555                'is_powered_off': is_powered_off}
1556        return all(self.results.values())
1557
1558
1559    def test_is_powered_off(self):
1560        """Check if the adapter is powered off."""
1561        is_powered_off = not self.bluetooth_facade.is_powered_on()
1562        self.results = {'is_powered_off': is_powered_off}
1563        return all(self.results.values())
1564
1565
1566    @test_retry_and_log(False)
1567    def test_is_facade_valid(self):
1568        """Checks whether the bluetooth facade is in a good state.
1569
1570        If bluetoothd restarts (i.e. due to a crash), the object proxies will no
1571        longer be valid (because the session will be closed). Check whether the
1572        session failed and wait for a new session if it did.
1573        """
1574        initially_ok = self.bluetooth_facade.is_bluetoothd_valid()
1575        bluez_started = initially_ok or self.bluetooth_facade.start_bluetoothd()
1576
1577        self.results = {
1578                'initially_ok': initially_ok,
1579                'bluez_started': bluez_started
1580        }
1581        return all(self.results.values())
1582
1583
1584    @test_retry_and_log(False)
1585    def test_is_adapter_valid(self):
1586        """Verify the bluetooth adapter is retrievable at test start
1587
1588        @raises: error.TestNAError if we fail to retrieve the adapter on
1589                    an unsupported chipset
1590                 error.TestFail if we fail to retrieve the adapter on any other
1591                    platform
1592
1593        @returns: True if the adapter was located properly
1594        """
1595
1596        if not self.bluetooth_facade.has_adapter():
1597            logging.error('No adapter available, rebooting to recover')
1598
1599            self.reboot()
1600
1601            chipset = self.get_chipset_name()
1602
1603            if not chipset:
1604                raise error.TestFail('Unknown adapter is missing')
1605
1606            # A missing adapter is a rare but known issue on several platforms
1607            # that have no vendor support (b/169328792). Since there is no fix
1608            # possible, we forgive these failures by raising a TestNA.
1609            if chipset in UNSUPPORTED_CHIPSETS:
1610                raise error.TestNAError('Unsupported adapter is missing')
1611
1612            raise error.TestFail('Adapter is missing')
1613
1614        return True
1615
1616    @test_retry_and_log
1617    def test_UUIDs(self):
1618        """Test that basic profiles are supported."""
1619        adapter_UUIDs = self.bluetooth_facade.get_UUIDs()
1620        self.results = [uuid for uuid in self.SUPPORTED_UUIDS.values()
1621                        if uuid not in adapter_UUIDs]
1622        return not bool(self.results)
1623
1624
1625    @test_retry_and_log
1626    def test_start_discovery(self):
1627        """Test that the adapter could start discovery."""
1628        start_discovery, _ = self.bluetooth_facade.start_discovery()
1629        is_discovering = self._wait_for_condition(
1630                self.bluetooth_facade.is_discovering, method_name())
1631
1632        self.results = {
1633                'start_discovery': start_discovery,
1634                'is_discovering': is_discovering}
1635        return all(self.results.values())
1636
1637    @test_retry_and_log(False)
1638    def test_is_discovering(self):
1639        """Test that the adapter is already discovering."""
1640        is_discovering = self._wait_for_condition(
1641                self.bluetooth_facade.is_discovering, method_name())
1642
1643        self.results = {'is_discovering': is_discovering}
1644        return all(self.results.values())
1645
1646    @test_retry_and_log
1647    def test_stop_discovery(self):
1648        """Test that the adapter could stop discovery."""
1649        if not self.bluetooth_facade.is_discovering():
1650            return True
1651
1652        stop_discovery, _ = self.bluetooth_facade.stop_discovery()
1653        is_not_discovering = self._wait_for_condition(
1654                lambda: not self.bluetooth_facade.is_discovering(),
1655                method_name())
1656
1657        self.results = {
1658                'stop_discovery': stop_discovery,
1659                'is_not_discovering': is_not_discovering}
1660        return all(self.results.values())
1661
1662
1663    @test_retry_and_log
1664    def test_discoverable(self):
1665        """Test that the adapter could be set discoverable."""
1666        set_discoverable = self.bluetooth_facade.set_discoverable(True)
1667        is_discoverable = self._wait_for_condition(
1668                self.bluetooth_facade.is_discoverable, method_name())
1669
1670        self.results = {
1671                'set_discoverable': set_discoverable,
1672                'is_discoverable': is_discoverable}
1673        return all(self.results.values())
1674
1675    @test_retry_and_log(False)
1676    def test_is_discoverable(self):
1677        """Test that the adapter is discoverable."""
1678        is_discoverable = self._wait_for_condition(
1679                self.bluetooth_facade.is_discoverable, method_name())
1680
1681        self.results = {'is_discoverable': is_discoverable}
1682        return all(self.results.values())
1683
1684
1685    def _test_timeout_property(self, set_property, check_property, set_timeout,
1686                              get_timeout, property_name,
1687                              timeout_values = [0, 60, 180]):
1688        """Common method to test (Discoverable/Pairable)Timeout .
1689
1690        This is used to test
1691        - DiscoverableTimeout property
1692        - PairableTimeout property
1693
1694        The test performs the following
1695           - Set PropertyTimeout
1696           - Read PropertyTimeout and make sure values match
1697           - Set adapter propety
1698           - In a loop check if property is active
1699           - Test fails property is false before timeout
1700           - Test fails property is True after timeout
1701           Repeat the test for different values for timeout
1702
1703           Note : Value of 0 mean it never timeouts, so the test will
1704                 end after 30 seconds.
1705        """
1706        def check_timeout(timeout):
1707            """Check for timeout value in loop while recording failures."""
1708            actual_timeout = get_timeout()
1709            if timeout != actual_timeout:
1710                logging.debug('%s timeout value read %s does not '
1711                              'match value set %s, yet', property_name,
1712                              actual_timeout, timeout)
1713                return False
1714            else:
1715                return True
1716
1717        def _test_timeout_property(timeout):
1718            # minium time after timeout before checking property
1719            MIN_DELTA_SECS = 3
1720            # Time between checking  property
1721            WAIT_TIME_SECS = 2
1722
1723            # Set and read back the timeout value
1724            if not set_timeout(timeout):
1725                logging.error('Setting the %s timeout failed',property_name)
1726                return False
1727
1728
1729            if not self._wait_for_condition(lambda : check_timeout(timeout),
1730                                            'check_'+property_name):
1731                logging.error('checking %s_timeout value timed out',
1732                              property_name)
1733                return False
1734
1735            #
1736            # Check that the timeout works
1737            # Check property is true until timeout
1738            # and then it is not
1739
1740            property_set = set_property(True)
1741            property_is_true = self._wait_for_condition(check_property,
1742                                                        method_name())
1743
1744            self.results = { 'set_%s' % property_name : property_set,
1745                             'is_%s' % property_name: property_is_true}
1746            logging.debug(self.results)
1747
1748            if not all(self.results.values()):
1749                logging.error('Setting %s failed',property_name)
1750                return False
1751
1752            start_time = time.time()
1753            while True:
1754                time.sleep(WAIT_TIME_SECS)
1755                cur_time = time.time()
1756                property_set = check_property()
1757                time_elapsed = cur_time - start_time
1758
1759                # Ignore check_property results made near the timeout
1760                # to avoid spurious failures.
1761                if abs(int(timeout - time_elapsed)) < MIN_DELTA_SECS:
1762                    continue
1763
1764                # Timeout of zero seconds mean that the adapter never times out
1765                # Check for 30 seconds and then exit the test.
1766                if timeout == 0:
1767                    if not property_set:
1768                        logging.error('Adapter is not %s after %.2f '
1769                                      'secs with a timeout of zero ',
1770                                      property_name, time_elapsed)
1771                        return False
1772                    elif time_elapsed > 30:
1773                        logging.debug('Adapter %s after %.2f seconds '
1774                                      'with timeout of zero as expected' ,
1775                                      property_name, time_elapsed)
1776                        return True
1777                    continue
1778
1779                #
1780                # Check if property is true till timeout ends and
1781                # false afterwards
1782                #
1783                if time_elapsed < timeout:
1784                    if not property_set:
1785                        logging.error('Adapter is not %s after %.2f '
1786                                      'secs before timeout of %.2f',
1787                                      property_name, time_elapsed, timeout)
1788                        return False
1789                else:
1790                    if property_set:
1791                        logging.error('Adapter is still %s after '
1792                                      ' %.2f secs with timeout of %.2f',
1793                                      property_name, time_elapsed, timeout)
1794                        return False
1795                    else:
1796                        logging.debug('Adapter not %s after %.2f '
1797                                      'secs with timeout of %.2f as expected ',
1798                                      property_name, time_elapsed, timeout)
1799                        return True
1800
1801        default_value = check_property()
1802        default_timeout = get_timeout()
1803
1804        result = []
1805        try:
1806            for timeout in timeout_values:
1807                result.append(_test_timeout_property(timeout))
1808            logging.debug("Test returning %s", all(result))
1809            return all(result)
1810        except:
1811            logging.error("exception in test_%s_timeout",property_name)
1812            raise
1813        finally:
1814            # Set the property back to default value permanently before
1815            # exiting the test
1816            set_timeout(0)
1817            set_property(default_value)
1818            # Set the timeout back to default value before exiting the test
1819            set_timeout(default_timeout)
1820
1821
1822    @test_retry_and_log
1823    def test_discoverable_timeout(self, timeout_values = [0, 60, 180]):
1824        """Test adapter dbus property DiscoverableTimeout."""
1825        return self._test_timeout_property(
1826            set_property = self.bluetooth_facade.set_discoverable,
1827            check_property = self.bluetooth_facade.is_discoverable,
1828            set_timeout = self.bluetooth_facade.set_discoverable_timeout,
1829            get_timeout = self.bluetooth_facade.get_discoverable_timeout,
1830            property_name = 'discoverable',
1831            timeout_values = timeout_values)
1832
1833    @test_retry_and_log
1834    def test_pairable_timeout(self, timeout_values = [0, 60, 180]):
1835        """Test adapter dbus property PairableTimeout."""
1836        return self._test_timeout_property(
1837            set_property = self.bluetooth_facade.set_pairable,
1838            check_property = self.bluetooth_facade.is_pairable,
1839            set_timeout = self.bluetooth_facade.set_pairable_timeout,
1840            get_timeout = self.bluetooth_facade.get_pairable_timeout,
1841            property_name = 'pairable',
1842            timeout_values = timeout_values)
1843
1844
1845    @test_retry_and_log
1846    def test_nondiscoverable(self):
1847        """Test that the adapter could be set non-discoverable."""
1848        set_nondiscoverable = self.bluetooth_facade.set_discoverable(False)
1849        is_nondiscoverable = self._wait_for_condition(
1850                lambda: not self.bluetooth_facade.is_discoverable(),
1851                method_name())
1852
1853        self.results = {
1854                'set_nondiscoverable': set_nondiscoverable,
1855                'is_nondiscoverable': is_nondiscoverable}
1856        return all(self.results.values())
1857
1858
1859    @test_retry_and_log
1860    def test_pairable(self):
1861        """Test that the adapter could be set pairable."""
1862        set_pairable = self.bluetooth_facade.set_pairable(True)
1863        is_pairable = self._wait_for_condition(
1864                self.bluetooth_facade.is_pairable, method_name())
1865
1866        self.results = {
1867                'set_pairable': set_pairable,
1868                'is_pairable': is_pairable}
1869        return all(self.results.values())
1870
1871
1872    @test_retry_and_log
1873    def test_nonpairable(self):
1874        """Test that the adapter could be set non-pairable."""
1875        set_nonpairable = self.bluetooth_facade.set_pairable(False)
1876        is_nonpairable = self._wait_for_condition(
1877                lambda: not self.bluetooth_facade.is_pairable(), method_name())
1878
1879        self.results = {
1880                'set_nonpairable': set_nonpairable,
1881                'is_nonpairable': is_nonpairable}
1882        return all(self.results.values())
1883
1884
1885    @test_retry_and_log(False)
1886    def test_check_valid_adapter_id(self):
1887        """Fail if the Bluetooth ID is not in the correct format.
1888
1889        @returns True if adapter ID follows expected format, False otherwise
1890        """
1891
1892        device = self.get_base_platform_name()
1893        adapter_info = self.get_adapter_properties()
1894
1895        # Don't complete test if this is a reference board
1896        if device in self.REFERENCE_BOARDS:
1897            return True
1898
1899        modalias = adapter_info['Modalias']
1900        logging.debug('Saw Bluetooth ID of: %s', modalias)
1901
1902        # Valid Device ID is:
1903        # <00E0(Google)>/<C405(Chrome OS)>/<non-zero versionNumber>
1904        bt_format = 'bluetooth:v00E0pC405d(?!0000)'
1905
1906        if not re.match(bt_format, modalias):
1907            return False
1908
1909        return True
1910
1911
1912    @test_retry_and_log(False)
1913    def test_check_valid_alias(self):
1914        """Fail if the Bluetooth alias is not in the correct format.
1915
1916        @returns True if adapter alias follows expected format, False otherwise
1917        """
1918
1919        device = self.get_base_platform_name()
1920        adapter_info = self.get_adapter_properties()
1921
1922        # Don't complete test if this is a reference board
1923        if device in self.REFERENCE_BOARDS:
1924            return True
1925
1926        alias = adapter_info['Alias']
1927        logging.debug('Saw Bluetooth Alias of: %s', alias)
1928
1929        device_type = self.host.get_board_type().lower()
1930        alias_format = '%s_[a-z0-9]{4}' % device_type
1931
1932        self.results = {}
1933
1934        alias_was_correct = True
1935        if not re.match(alias_format, alias.lower()):
1936            alias_was_correct = False
1937            logging.info('unexpected alias %s found', alias)
1938            self.results['alias_found'] = alias
1939
1940        self.results['alias_was_correct'] = alias_was_correct
1941        return all(self.results.values())
1942
1943
1944    # -------------------------------------------------------------------
1945    # Tests about general discovering, pairing, and connection
1946    # -------------------------------------------------------------------
1947
1948
1949    @test_retry_and_log(False)
1950    def test_discover_device(self,
1951                             device_address,
1952                             start_discovery=True,
1953                             stop_discovery=True):
1954        """Test that the adapter could discover the specified device address.
1955
1956        @param device_address: Address of the device.
1957        @param start_discovery: Whether to start discovery. Set to False if you
1958                                call start_discovery before calling this.
1959        @param stop_discovery: Whether to stop discovery at the end. If this is
1960                               set to False, make sure to call
1961                               test_stop_discovery afterwards.
1962
1963        @returns: True if the device is found. False otherwise.
1964
1965        """
1966        has_device_initially = False
1967        discovery_stopped = False
1968        is_not_discovering = False
1969        device_discovered = False
1970        # If start discovery is not set, discovery must already be started
1971        discovery_started = not start_discovery
1972        has_device = self.bluetooth_facade.has_device
1973
1974        if has_device(device_address):
1975            has_device_initially = True
1976        else:
1977            if start_discovery:
1978                discovery_started = self.bluetooth_facade.start_discovery()
1979
1980            if discovery_started:
1981                try:
1982                    utils.poll_for_condition(
1983                            condition=(lambda: has_device(device_address)),
1984                            timeout=self.ADAPTER_DISCOVER_TIMEOUT_SECS,
1985                            sleep_interval=
1986                            self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS,
1987                            desc='Waiting for discovering %s' % device_address)
1988                    device_discovered = True
1989                except utils.TimeoutError as e:
1990                    logging.error('test_discover_device: %s', e)
1991                except Exception as e:
1992                    logging.error('test_discover_device: %s', e)
1993                    err = ('bluetoothd probably crashed.'
1994                           'Check out /var/log/messages')
1995                    logging.error(err)
1996                except:
1997                    logging.error('test_discover_device: unexpected error')
1998
1999            if start_discovery and stop_discovery:
2000                discovery_stopped, _ = self.bluetooth_facade.stop_discovery()
2001                is_not_discovering = self._wait_for_condition(
2002                        lambda: not self.bluetooth_facade.is_discovering(),
2003                        method_name())
2004
2005        self.results = {
2006                'has_device_initially': has_device_initially,
2007                'should_start_discovery': start_discovery,
2008                'should_stop_discovery': stop_discovery,
2009                'start_discovery': discovery_started,
2010                'stop_discovery': discovery_stopped,
2011                'is_not_discovering': is_not_discovering,
2012                'device_discovered': device_discovered}
2013
2014        # Make sure a discovered device properly started and stopped discovery
2015        device_found = device_discovered and discovery_started and (
2016                discovery_stopped and is_not_discovering
2017                if stop_discovery else True)
2018
2019        return has_device_initially or device_found
2020
2021
2022    def _test_discover_by_device(self, device):
2023        return device.Discover(self.bluetooth_facade.address)
2024
2025    @test_retry_and_log(False, messages_start=False, messages_stop=False)
2026    def test_discover_by_device(self, device):
2027        """Test that the device could discover the adapter address.
2028
2029        @param device: Meta device to represent peer device.
2030
2031        @returns: True if the adapter is found by the device.
2032        """
2033        adapter_discovered = False
2034        discover_by_device = self._test_discover_by_device
2035        discovered_initially = discover_by_device(device)
2036
2037        if not discovered_initially:
2038            try:
2039                utils.poll_for_condition(
2040                        condition=(lambda: discover_by_device(device)),
2041                        timeout=self.ADAPTER_DISCOVER_TIMEOUT_SECS,
2042                        sleep_interval=
2043                        self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS,
2044                        desc='Waiting for adapter to be discovered')
2045                adapter_discovered = True
2046            except utils.TimeoutError as e:
2047                logging.error('test_discover_by_device: %s', e)
2048            except Exception as e:
2049                logging.error('test_discover_by_device: %s', e)
2050                err = ('bluetoothd probably crashed.'
2051                       'Check out /var/log/messages')
2052                logging.error(err)
2053            except:
2054                logging.error('test_discover_by_device: unexpected error')
2055
2056        self.results = {
2057            'adapter_discovered_initially': discovered_initially,
2058            'adapter_discovered': adapter_discovered
2059        }
2060        return any(self.results.values())
2061
2062    @test_retry_and_log(False, messages_start=False, messages_stop=False)
2063    def test_discover_by_device_fails(self, device):
2064        """Test that the device could not discover the adapter address.
2065
2066        @param device: Meta device to represent peer device.
2067
2068        @returns False if the adapter is found by the device.
2069        """
2070        self.results = {
2071                'adapter_discovered': self._test_discover_by_device(device)
2072        }
2073        return not any(self.results.values())
2074
2075    @test_retry_and_log(False, messages_start=False, messages_stop=False)
2076    def test_device_set_discoverable(self, device, discoverable):
2077        """Test that we could set the peer device to discoverable. """
2078        try:
2079            device.SetDiscoverable(discoverable)
2080        except:
2081            return False
2082
2083        return True
2084
2085    @test_retry_and_log
2086    def test_pairing(self, device_address, pin, trusted=True):
2087        """Test that the adapter could pair with the device successfully.
2088
2089        @param device_address: Address of the device.
2090        @param pin: pin code to pair with the device.
2091        @param trusted: indicating whether to set the device trusted.
2092
2093        @returns: True if pairing succeeds. False otherwise.
2094
2095        """
2096
2097        def _pair_device():
2098            """Pair to the device.
2099
2100            @returns: True if it could pair with the device. False otherwise.
2101
2102            """
2103            return self.bluetooth_facade.pair_legacy_device(
2104                    device_address, pin, trusted,
2105                    self.ADAPTER_PAIRING_TIMEOUT_SECS)
2106
2107
2108        def _verify_connection_info():
2109            """Verify that connection info to device is retrievable.
2110
2111            @returns: True if the connection info is retrievable.
2112                      False otherwise.
2113            """
2114            return (self.bluetooth_facade.get_connection_info(device_address)
2115                    is not None)
2116
2117        def _verify_connected():
2118            """Verify the device is connected.
2119
2120            @returns: True if the device is connected, False otherwise.
2121            """
2122            return self.bluetooth_facade.device_is_connected(device_address)
2123
2124        has_device = False
2125        paired = False
2126        connected = False
2127        connection_info_retrievable = False
2128        connected_devices = self.get_num_connected_devices()
2129
2130        if self.bluetooth_facade.has_device(device_address):
2131            has_device = True
2132            try:
2133                utils.poll_for_condition(
2134                        condition=_pair_device,
2135                        timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS,
2136                        sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
2137                        desc='Waiting for pairing %s' % device_address)
2138                paired = True
2139            except utils.TimeoutError as e:
2140                logging.error('test_pairing: %s', e)
2141            except:
2142                logging.error('test_pairing: unexpected error')
2143
2144            connection_info_retrievable = _verify_connection_info()
2145            connected = _verify_connected()
2146
2147        self.results = {
2148                'has_device': has_device,
2149                'paired': paired,
2150                'connected': connected,
2151                'connection_info_retrievable': connection_info_retrievable,
2152                'connection_num': connected_devices + 1
2153        }
2154
2155        return all(self.results.values())
2156
2157    @test_retry_and_log
2158    def test_remove_pairing(self, device_address):
2159        """Test that the adapter could remove the paired device.
2160
2161        @param device_address: Address of the device.
2162
2163        @returns: True if the device is removed successfully. False otherwise.
2164
2165        """
2166        device_is_paired_initially = self.bluetooth_facade.device_is_paired(
2167                device_address)
2168        remove_pairing = False
2169        pairing_removed = False
2170
2171        if device_is_paired_initially:
2172            remove_pairing = self.bluetooth_facade.remove_device_object(
2173                    device_address)
2174            pairing_removed = not self.bluetooth_facade.device_is_paired(
2175                    device_address)
2176
2177        self.results = {
2178                'device_is_paired_initially': device_is_paired_initially,
2179                'remove_pairing': remove_pairing,
2180                'pairing_removed': pairing_removed}
2181        return all(self.results.values())
2182
2183
2184    def test_set_trusted(self, device_address, trusted=True):
2185        """Test whether the device with the specified address is trusted.
2186
2187        @param device_address: Address of the device.
2188        @param trusted : True or False indicating if trusted is expected.
2189
2190        @returns: True if the device's "Trusted" property is as specified;
2191                  False otherwise.
2192
2193        """
2194
2195        set_trusted = self.bluetooth_facade.set_trusted(
2196                device_address, trusted)
2197
2198        actual_trusted = self.bluetooth_facade.get_device_property(
2199                                device_address, 'Trusted')
2200
2201        self.results = {
2202                'set_trusted': set_trusted,
2203                'actual trusted': actual_trusted,
2204                'expected trusted': trusted}
2205        return actual_trusted == trusted
2206
2207
2208    @test_retry_and_log
2209    def test_connection_by_adapter(self, device_address):
2210        """Test that the adapter of dut could connect to the device successfully
2211
2212        It is the caller's responsibility to pair to the device before
2213        doing connection.
2214
2215        @param device_address: Address of the device.
2216
2217        @returns: True if connection is performed. False otherwise.
2218
2219        """
2220
2221        def _connect_device():
2222            """Connect to the device.
2223
2224            @returns: True if it could connect to the device. False otherwise.
2225
2226            """
2227            return self.bluetooth_facade.connect_device(device_address)
2228
2229
2230        has_device = False
2231        connected = False
2232        if self.bluetooth_facade.has_device(device_address):
2233            has_device = True
2234            try:
2235                utils.poll_for_condition(
2236                        condition=_connect_device,
2237                        timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS,
2238                        sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
2239                        desc='Waiting for connecting to %s' % device_address)
2240                connected = True
2241            except utils.TimeoutError as e:
2242                logging.error('test_connection_by_adapter: %s', e)
2243            except:
2244                logging.error('test_connection_by_adapter: unexpected error')
2245
2246        self.results = {'has_device': has_device, 'connected': connected}
2247        return all(self.results.values())
2248
2249
2250    @test_retry_and_log
2251    def test_disconnection_by_adapter(self, device_address):
2252        """Test that the adapter of dut could disconnect the device successfully
2253
2254        @param device_address: Address of the device.
2255
2256        @returns: True if disconnection is performed. False otherwise.
2257
2258        """
2259        return self.bluetooth_facade.disconnect_device(device_address)
2260
2261
2262    def _enter_command_mode(self, device):
2263        """Let the device enter command mode.
2264
2265        Before using the device, need to call this method to make sure
2266        it is in the command mode.
2267
2268        @param device: the bluetooth HID device
2269
2270        @returns: True if successful. False otherwise.
2271
2272        """
2273        result = _is_successful(_run_method(device.EnterCommandMode,
2274                                            'EnterCommandMode'))
2275        if not result:
2276            logging.error('EnterCommandMode failed')
2277        return result
2278
2279
2280    @test_retry_and_log
2281    def test_connection_by_device(self, device):
2282        """Test that the device could connect to the adapter successfully.
2283
2284        This emulates the behavior that a device may initiate a
2285        connection request after waking up from power saving mode.
2286
2287        @param device: the bluetooth HID device
2288
2289        @returns: True if connection is performed correctly by device and
2290                  the adapter also enters connection state.
2291                  False otherwise.
2292
2293        """
2294        if not self._enter_command_mode(device):
2295            return False
2296
2297        method_name = 'test_connection_by_device'
2298        connection_by_device = False
2299        adapter_address = self.bluetooth_facade.address
2300        try:
2301            connection_by_device = device.ConnectToRemoteAddress(
2302                adapter_address)
2303        except Exception as e:
2304            logging.error('%s (device): %s', method_name, e)
2305        except:
2306            logging.error('%s (device): unexpected error', method_name)
2307
2308        connection_seen_by_adapter = False
2309        device_address = device.address
2310        device_is_connected = self.bluetooth_facade.device_is_connected
2311        try:
2312            utils.poll_for_condition(
2313                    condition=lambda: device_is_connected(device_address),
2314                    timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS,
2315                    desc=('Waiting for connection from %s' % device_address))
2316            connection_seen_by_adapter = True
2317
2318            # Although the connect may be complete, it can take a few
2319            # seconds for the input device to be ready for use
2320            time.sleep(self.ADAPTER_HID_INPUT_DELAY)
2321        except utils.TimeoutError as e:
2322            logging.error('%s (adapter): %s', method_name, e)
2323        except:
2324            logging.error('%s (adapter): unexpected error', method_name)
2325
2326        self.results = {
2327                'connection_by_device': connection_by_device,
2328                'connection_seen_by_adapter': connection_seen_by_adapter}
2329        return all(self.results.values())
2330
2331    @test_retry_and_log(True, messages_start=False, messages_stop=False)
2332    def test_connection_by_device_only(self, device, adapter_address):
2333        """Test that the device could connect to adapter successfully.
2334
2335        This is a modified version of test_connection_by_device that only
2336        communicates with the peer device and not the host (in case the host is
2337        suspended for example).
2338
2339        @param device: the bluetooth peer device
2340        @param adapter_address: address of the adapter
2341
2342        @returns: True if the connection was established by the device or False.
2343        """
2344        connected = device.ConnectToRemoteAddress(adapter_address)
2345        if connected:
2346            # Although the connect may be complete, it can take a few
2347            # seconds for the input device to be ready for use
2348            time.sleep(self.ADAPTER_HID_INPUT_DELAY)
2349
2350        self.results = {
2351            'connection_by_device': connected
2352        }
2353
2354        return all(self.results.values())
2355
2356
2357    @test_retry_and_log
2358    def test_disconnection_by_device(self, device):
2359        """Test that the device could disconnect the adapter successfully.
2360
2361        This emulates the behavior that a device may initiate a
2362        disconnection request before going into power saving mode.
2363
2364        Note: should not try to enter command mode in this method. When
2365              a device is connected, there is no way to enter command mode.
2366              One could just issue a special disconnect command without
2367              entering command mode.
2368
2369        @param device: the bluetooth HID device
2370
2371        @returns: True if disconnection is performed correctly by device and
2372                  the adapter also observes the disconnection.
2373                  False otherwise.
2374
2375        """
2376        method_name = 'test_disconnection_by_device'
2377        disconnection_by_device = False
2378        try:
2379            device.Disconnect()
2380            disconnection_by_device = True
2381        except Exception as e:
2382            logging.error('%s (device): %s', method_name, e)
2383        except:
2384            logging.error('%s (device): unexpected error', method_name)
2385
2386        disconnection_seen_by_adapter = False
2387        device_address = device.address
2388        device_is_connected = self.bluetooth_facade.device_is_connected
2389        try:
2390            utils.poll_for_condition(
2391                    condition=lambda: not device_is_connected(device_address),
2392                    timeout=self.ADAPTER_DISCONNECTION_TIMEOUT_SECS,
2393                    desc=('Waiting for disconnection from %s' % device_address))
2394            disconnection_seen_by_adapter = True
2395        except utils.TimeoutError as e:
2396            logging.error('%s (adapter): %s', method_name, e)
2397        except:
2398            logging.error('%s (adapter): unexpected error', method_name)
2399
2400        self.results = {
2401                'disconnection_by_device': disconnection_by_device,
2402                'disconnection_seen_by_adapter': disconnection_seen_by_adapter}
2403        return all(self.results.values())
2404
2405
2406    @test_retry_and_log(False)
2407    def test_device_is_connected(self, device_address):
2408        """Test that device address given is currently connected.
2409
2410        @param device_address: Address of the device.
2411
2412        @returns: True if the device is connected.
2413                  False otherwise.
2414        """
2415
2416        def _is_connected():
2417            """Test if device is connected.
2418
2419            @returns: True if device is connected. False otherwise.
2420
2421            """
2422            return self.bluetooth_facade.device_is_connected(device_address)
2423
2424
2425        method_name = 'test_device_is_connected'
2426        has_device = False
2427        connected = False
2428        if self.bluetooth_facade.has_device(device_address):
2429            has_device = True
2430            try:
2431                utils.poll_for_condition(
2432                        condition=_is_connected,
2433                        timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS,
2434                        sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
2435                        desc='Waiting to check connection to %s' %
2436                              device_address)
2437                connected = True
2438            except utils.TimeoutError as e:
2439                logging.error('%s: %s', method_name, e)
2440            except:
2441                logging.error('%s: unexpected error', method_name)
2442        self.results = {'has_device': has_device, 'connected': connected}
2443        return all(self.results.values())
2444
2445
2446    @test_retry_and_log(False)
2447    def test_device_is_not_connected(self, device_address):
2448        """Test that device address given is NOT currently connected.
2449
2450        @param device_address: Address of the device.
2451
2452        @returns: True if the device is NOT connected.
2453                  False otherwise.
2454
2455        """
2456
2457        def _is_not_connected():
2458            """Test if device is not connected.
2459
2460            @returns: True if device is not connected. False otherwise.
2461
2462            """
2463            return not self.bluetooth_facade.device_is_connected(
2464                    device_address)
2465
2466
2467        method_name = 'test_device_is_not_connected'
2468        not_connected = False
2469        if self.bluetooth_facade.has_device(device_address):
2470            try:
2471                utils.poll_for_condition(
2472                        condition=_is_not_connected,
2473                        timeout=self.ADAPTER_CONNECTION_TIMEOUT_SECS,
2474                        sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
2475                        desc='Waiting to check connection to %s' %
2476                              device_address)
2477                not_connected = True
2478            except utils.TimeoutError as e:
2479                logging.error('%s: %s', method_name, e)
2480            except:
2481                logging.error('%s: unexpected error', method_name)
2482                raise
2483        else:
2484            not_connected = True
2485        self.results = {'not_connected': not_connected}
2486        return all(self.results.values())
2487
2488
2489    @test_retry_and_log
2490    def test_device_is_paired(self, device_address):
2491        """Test that the device address given is currently paired.
2492
2493        @param device_address: Address of the device.
2494
2495        @returns: True if the device is paired.
2496                  False otherwise.
2497
2498        """
2499        def _is_paired():
2500            """Test if device is paired.
2501
2502            @returns: True if device is paired. False otherwise.
2503
2504            """
2505            return self.bluetooth_facade.device_is_paired(device_address)
2506
2507
2508        method_name = 'test_device_is_paired'
2509        has_device = False
2510        paired = False
2511        if self.bluetooth_facade.has_device(device_address):
2512            has_device = True
2513            try:
2514                utils.poll_for_condition(
2515                        condition=_is_paired,
2516                        timeout=self.ADAPTER_PAIRING_TIMEOUT_SECS,
2517                        sleep_interval=self.ADAPTER_PAIRING_POLLING_SLEEP_SECS,
2518                        desc='Waiting for connection to %s' % device_address)
2519                paired = True
2520            except utils.TimeoutError as e:
2521                logging.error('%s: %s', method_name, e)
2522            except:
2523                logging.error('%s: unexpected error', method_name)
2524        self.results = {'has_device': has_device, 'paired': paired}
2525        return all(self.results.values())
2526
2527
2528    def _get_device_name(self, device_address):
2529        """Get the device name.
2530
2531        @returns: True if the device name is derived. None otherwise.
2532
2533        """
2534
2535        self.discovered_device_name = self.bluetooth_facade.get_device_property(
2536                                device_address, 'Name')
2537
2538        return bool(self.discovered_device_name)
2539
2540
2541    @test_retry_and_log
2542    def test_device_name(self, device_address, expected_device_name):
2543        """Test that the device name discovered by the adapter is correct.
2544
2545        @param device_address: Address of the device.
2546        @param expected_device_name: the bluetooth device name
2547
2548        @returns: True if the discovered_device_name is expected_device_name.
2549                  False otherwise.
2550
2551        """
2552        try:
2553            utils.poll_for_condition(
2554                    condition=lambda: self._get_device_name(device_address),
2555                    timeout=self.ADAPTER_DISCOVER_NAME_TIMEOUT_SECS,
2556                    sleep_interval=self.ADAPTER_DISCOVER_POLLING_SLEEP_SECS,
2557                    desc='Waiting for device name of %s' % device_address)
2558        except utils.TimeoutError as e:
2559            logging.error('test_device_name: %s', e)
2560        except:
2561            logging.error('test_device_name: unexpected error')
2562
2563        self.results = {
2564                'expected_device_name': expected_device_name,
2565                'discovered_device_name': self.discovered_device_name}
2566        return self.discovered_device_name == expected_device_name
2567
2568
2569    @test_retry_and_log
2570    def test_device_class_of_service(self, device_address,
2571                                     expected_class_of_service):
2572        """Test that the discovered device class of service is as expected.
2573
2574        @param device_address: Address of the device.
2575        @param expected_class_of_service: the expected class of service
2576
2577        @returns: True if the discovered class of service matches the
2578                  expected class of service. False otherwise.
2579
2580        """
2581
2582        device_class = self.bluetooth_facade.get_device_property(device_address,
2583                                                                 'Class')
2584        discovered_class_of_service = (device_class & self.CLASS_OF_SERVICE_MASK
2585                                       if device_class else None)
2586
2587        self.results = {
2588                'device_class': device_class,
2589                'expected_class_of_service': expected_class_of_service,
2590                'discovered_class_of_service': discovered_class_of_service}
2591        return discovered_class_of_service == expected_class_of_service
2592
2593
2594    @test_retry_and_log
2595    def test_device_class_of_device(self, device_address,
2596                                    expected_class_of_device):
2597        """Test that the discovered device class of device is as expected.
2598
2599        @param device_address: Address of the device.
2600        @param expected_class_of_device: the expected class of device
2601
2602        @returns: True if the discovered class of device matches the
2603                  expected class of device. False otherwise.
2604
2605        """
2606
2607        device_class = self.bluetooth_facade.get_device_property(device_address,
2608                                                                 'Class')
2609        discovered_class_of_device = (device_class & self.CLASS_OF_DEVICE_MASK
2610                                      if device_class else None)
2611
2612        self.results = {
2613                'device_class': device_class,
2614                'expected_class_of_device': expected_class_of_device,
2615                'discovered_class_of_device': discovered_class_of_device}
2616        return discovered_class_of_device == expected_class_of_device
2617
2618
2619    def _get_btmon_log(self, method, logging_timespan=1):
2620        """Capture the btmon log when executing the specified method.
2621
2622        @param method: the method to capture log.
2623                       The method would be executed only when it is not None.
2624                       This allows us to simply capture btmon log without
2625                       executing any command.
2626        @param logging_timespan: capture btmon log for logging_timespan seconds.
2627
2628        """
2629        self.bluetooth_le_facade.btmon_start()
2630        self.advertising_msg = method() if method else ''
2631        time.sleep(logging_timespan)
2632        self.bluetooth_le_facade.btmon_stop()
2633
2634
2635    def convert_to_adv_jiffies(self, adv_interval_ms):
2636        """Convert adv interval in ms to jiffies, i.e., multiples of 0.625 ms.
2637
2638        @param adv_interval_ms: an advertising interval
2639
2640        @returns: the equivalent jiffies
2641
2642        """
2643        return adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT
2644
2645
2646    def compute_duration(self, max_adv_interval_ms):
2647        """Compute duration from max_adv_interval_ms.
2648
2649        Advertising duration is calculated approximately as
2650            duration = max_adv_interval_ms / 1000.0 * 1.1
2651
2652        @param max_adv_interval_ms: max advertising interval in milliseconds.
2653
2654        @returns: duration in seconds.
2655
2656        """
2657        return max_adv_interval_ms / 1000.0 * 1.1
2658
2659
2660    def compute_logging_timespan(self, max_adv_interval_ms):
2661        """Compute the logging timespan from max_adv_interval_ms.
2662
2663        The logging timespan is the time needed to record btmon log.
2664
2665        @param max_adv_interval_ms: max advertising interval in milliseconds.
2666
2667        @returns: logging_timespan in seconds.
2668
2669        """
2670        duration = self.compute_duration(max_adv_interval_ms)
2671        logging_timespan = max(self.count_advertisements * duration, 1)
2672        return logging_timespan
2673
2674
2675    @test_retry_and_log(False)
2676    def test_check_duration_and_intervals(self, min_adv_interval_ms,
2677                                          max_adv_interval_ms,
2678                                          number_advertisements):
2679        """Verify that every advertisements are scheduled according to the
2680        duration and intervals.
2681
2682        An advertisement would be scheduled at the time span of
2683             duration * number_advertisements
2684
2685        @param min_adv_interval_ms: min advertising interval in milliseconds.
2686        @param max_adv_interval_ms: max advertising interval in milliseconds.
2687        @param number_advertisements: the number of existing advertisements
2688
2689        @returns: True if all advertisements are scheduled based on the
2690                duration and intervals.
2691
2692        """
2693
2694
2695        def within_tolerance(expected, actual, max_error=0.1):
2696            """Determine if the percent error is within specified tolerance.
2697
2698            @param expected: The expected value.
2699            @param actual: The actual (measured) value.
2700            @param max_error: The maximum percent error acceptable.
2701
2702            @returns: True if the percent error is less than or equal to
2703                      max_error.
2704            """
2705            return abs(expected - actual) / abs(expected) <= max_error
2706
2707
2708        start_str = 'Set Advertising Intervals:'
2709        search_strings = ['HCI Command: LE Set Advertising Data', 'Company']
2710        search_str = '|'.join(search_strings)
2711
2712        contents = self.bluetooth_le_facade.btmon_get(search_str=search_str,
2713                                                      start_str=start_str)
2714
2715        # Company string looks like
2716        #   Company: not assigned (65283)
2717        company_pattern = re.compile('Company:.*\((\d*)\)')
2718
2719        # The string with timestamp looks like
2720        #   < HCI Command: LE Set Advertising Data (0x08|0x0008) [hci0] 3.799236
2721        set_adv_time_str = 'LE Set Advertising Data.*\[hci\d\].*(\d+\.\d+)'
2722        set_adv_time_pattern = re.compile(set_adv_time_str)
2723
2724        adv_timestamps = {}
2725        timestamp = None
2726        manufacturer_id = None
2727        for line in contents:
2728            result = set_adv_time_pattern.search(line)
2729            if result:
2730                timestamp = float(result.group(1))
2731
2732            result = company_pattern.search(line)
2733            if result:
2734                manufacturer_id = '0x%04x' % int(result.group(1))
2735
2736            if timestamp and manufacturer_id:
2737                if manufacturer_id not in adv_timestamps:
2738                    adv_timestamps[manufacturer_id] = []
2739                adv_timestamps[manufacturer_id].append(timestamp)
2740                timestamp = None
2741                manufacturer_id = None
2742
2743        duration = self.compute_duration(max_adv_interval_ms)
2744        expected_timespan = duration * number_advertisements
2745
2746        check_duration = True
2747        for manufacturer_id, values in six.iteritems(adv_timestamps):
2748            logging.debug('manufacturer_id %s: %s', manufacturer_id, values)
2749            timespans = [values[i] - values[i - 1]
2750                         for i in range(1, len(values))]
2751            errors = [timespans[i] for i in range(len(timespans))
2752                      if not within_tolerance(expected_timespan, timespans[i])]
2753            logging.debug('timespans: %s', timespans)
2754            logging.debug('errors: %s', errors)
2755            if bool(errors):
2756                check_duration = False
2757
2758        # Verify that the advertising intervals are also correct.
2759        min_adv_interval_ms_found, max_adv_interval_ms_found = (
2760                self._verify_advertising_intervals(min_adv_interval_ms,
2761                                                   max_adv_interval_ms))
2762
2763        self.results = {
2764                'check_duration': check_duration,
2765                'max_adv_interval_ms_found': max_adv_interval_ms_found,
2766                'max_adv_interval_ms_found': max_adv_interval_ms_found,
2767        }
2768        return all(self.results.values())
2769
2770
2771    def _get_min_max_intervals_strings(self, min_adv_interval_ms,
2772                                       max_adv_interval_ms):
2773        """Get the min and max advertising intervals strings shown in btmon.
2774
2775        Advertising intervals shown in the btmon log look like
2776            Min advertising interval: 1280.000 msec (0x0800)
2777            Max advertising interval: 1280.000 msec (0x0800)
2778
2779        @param min_adv_interval_ms: min advertising interval in milliseconds.
2780        @param max_adv_interval_ms: max advertising interval in milliseconds.
2781
2782        @returns: the min and max intervals strings.
2783
2784        """
2785        min_str = ('Min advertising interval: %.3f msec (0x%04x)' %
2786                   (min_adv_interval_ms,
2787                    min_adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT))
2788        logging.debug('min_adv_interval_ms: %s', min_str)
2789
2790        max_str = ('Max advertising interval: %.3f msec (0x%04x)' %
2791                   (max_adv_interval_ms,
2792                    max_adv_interval_ms / self.ADVERTISING_INTERVAL_UNIT))
2793        logging.debug('max_adv_interval_ms: %s', max_str)
2794
2795        return (min_str, max_str)
2796
2797
2798    def _verify_advertising_intervals(self, min_adv_interval_ms,
2799                                      max_adv_interval_ms):
2800        """Verify min and max advertising intervals.
2801
2802        Advertising intervals look like
2803            Min advertising interval: 1280.000 msec (0x0800)
2804            Max advertising interval: 1280.000 msec (0x0800)
2805
2806        @param min_adv_interval_ms: min advertising interval in milliseconds.
2807        @param max_adv_interval_ms: max advertising interval in milliseconds.
2808
2809        @returns: a tuple of (True, True) if both min and max advertising
2810            intervals could be found. Otherwise, the corresponding element
2811            in the tuple if False.
2812
2813        """
2814        min_str, max_str = self._get_min_max_intervals_strings(
2815                min_adv_interval_ms, max_adv_interval_ms)
2816
2817        min_adv_interval_ms_found = self.bluetooth_le_facade.btmon_find(min_str)
2818        max_adv_interval_ms_found = self.bluetooth_le_facade.btmon_find(max_str)
2819
2820        return min_adv_interval_ms_found, max_adv_interval_ms_found
2821
2822
2823    def _verify_scan_response_data(self, adv_data):
2824        """Verify advertisement's scan response data is correct
2825
2826        Unlike the other fixed advertising fields, Scan Response Data is set
2827        in a tag-value data format. This function helps verify the data format
2828        for specific tag values to ensure scan response was propagated correctly
2829
2830        @param adv_data: Dictionary defining advertising fields to be registered
2831                with bluetoothd daemon's RegisterAdvertisement interface
2832
2833        @returns: True if all Registered Scan Response tags were located in
2834                btmon trace, False otherwise
2835        """
2836
2837        scan_rsp = adv_data.get('ScanResponseData')
2838        if not scan_rsp:
2839            return True
2840
2841        for tag, data in scan_rsp.items():
2842            # Validate 16 bit Service Data tag
2843            if int(tag, 16) == 0x16:
2844                # First two bytes of data are endian-corrected UUID, followed
2845                # by service data
2846                uuid = '%x%x' % (data[1], data[0])
2847                data_str = ''.join(
2848                        ['%02x' % data[i] for i in range(2, len(data))])
2849
2850                # Service data has the following format in btmon trace:
2851                # Service Data (UUID 0xfef3): 01020304
2852                search_str = 'Service Data (UUID 0x{}): {}'.format(
2853                        uuid, data_str)
2854
2855                # Fail if data can't be located in btmon trace
2856                if not self.bluetooth_le_facade.btmon_find(search_str):
2857                    return False
2858
2859        return True
2860
2861    def test_advertising_flags(self, flag_strs=[]):
2862        """Verify that advertising flags are set in registered advertisement
2863
2864        Each flag has a specific descriptor that appears in btmon trace. This
2865        simple checker validates that the desired flag descriptors appear in
2866        btmon trace when the advertisement was registered.
2867
2868        @param flag_strs: Flag string descriptors expected in btmon trace
2869
2870        #returns: True if all flag descriptors were located, False otherwise
2871        """
2872
2873        for flag_str in flag_strs:
2874            if not self.bluetooth_le_facade.btmon_find(flag_str):
2875                logging.info(
2876                        'Flag descriptor not located: {}'.format(flag_str))
2877                return False
2878
2879        return True
2880
2881    def ext_adv_enabled(self):
2882        """ Check if platform supports extended advertising
2883
2884        @returns True if extended advertising is supported, else False
2885        """
2886        platform = self.get_base_platform_name()
2887        return platform in EXT_ADV_MODELS
2888
2889
2890    @test_retry_and_log(False)
2891    def test_register_advertisement(self, advertisement_data, instance_id,
2892                                    min_adv_interval_ms, max_adv_interval_ms):
2893        """Verify that an advertisement is registered correctly.
2894
2895        This test verifies the following data:
2896        - advertisement added
2897        - manufacturer data
2898        - service UUIDs
2899        - service data
2900        - advertising intervals
2901        - advertising enabled
2902
2903        @param advertisement_data: the data of an advertisement to register.
2904        @param instance_id: the instance id which starts at 1.
2905        @param min_adv_interval_ms: min_adv_interval in milliseconds.
2906        @param max_adv_interval_ms: max_adv_interval in milliseconds.
2907
2908        @returns: True if the advertisement is registered correctly.
2909                  False otherwise.
2910
2911        """
2912        # When registering a new advertisement, it is possible that another
2913        # instance is advertising. It may need to wait for all other
2914        # advertisements to complete advertising once.
2915        self.count_advertisements += 1
2916        logging_timespan = self.compute_logging_timespan(max_adv_interval_ms)
2917        self._get_btmon_log(
2918                lambda: self.bluetooth_le_facade.register_advertisement(
2919                        advertisement_data),
2920                logging_timespan=logging_timespan)
2921
2922        # Verify that a new advertisement is added.
2923        advertisement_added = (
2924                self.bluetooth_le_facade.btmon_find('Advertising Added') and
2925                self.bluetooth_le_facade.btmon_find('Instance: %d' %
2926                                                    instance_id))
2927
2928        # Verify that the manufacturer data could be found.
2929        manufacturer_data = advertisement_data.get('ManufacturerData', '')
2930        manufacturer_data_found = True
2931        for manufacturer_id in manufacturer_data:
2932            # The 'not assigned' text below means the manufacturer id
2933            # is not actually assigned to any real manufacturer.
2934            # For real 16-bit manufacturer UUIDs, refer to
2935            #  https://www.bluetooth.com/specifications/assigned-numbers/16-bit-UUIDs-for-Members
2936            manufacturer_data_found = self.bluetooth_le_facade.btmon_find(
2937                    'Company: not assigned (%d)' % int(manufacturer_id, 16))
2938
2939        # Verify that all service UUIDs could be found.
2940        service_uuids_found = True
2941        for uuid in advertisement_data.get('ServiceUUIDs', []):
2942            # Service UUIDs looks like ['0x180D', '0x180F']
2943            #   Heart Rate (0x180D)
2944            #   Battery Service (0x180F)
2945            # For actual 16-bit service UUIDs, refer to
2946            #   https://www.bluetooth.com/specifications/gatt/services
2947            if not self.bluetooth_le_facade.btmon_find('0x%s' % uuid):
2948                service_uuids_found = False
2949                break
2950
2951        # Verify service data.
2952        service_data_found = True
2953        for uuid, data in advertisement_data.get('ServiceData', {}).items():
2954            # A service data looks like
2955            #   Service Data (UUID 0x9999): 0001020304
2956            # while uuid is '9999' and data is [0x00, 0x01, 0x02, 0x03, 0x04]
2957            data_str = ''.join(['%02x' % n for n in data])
2958            if not self.bluetooth_le_facade.btmon_find(
2959                    'Service Data (UUID 0x%s): %s' % (uuid, data_str)):
2960                service_data_found = False
2961                break
2962
2963        # Broadcast advertisements are overwritten in some kernel versions to
2964        # be more aggressive. Verify that the advertising intervals are correct
2965        # if this mode is not used
2966        if advertisement_data.get('Type') != 'broadcast':
2967            min_adv_interval_ms_found, max_adv_interval_ms_found = (
2968                    self._verify_advertising_intervals(min_adv_interval_ms,
2969                                                       max_adv_interval_ms))
2970
2971        else:
2972            min_adv_interval_ms_found = True
2973            max_adv_interval_ms_found = True
2974
2975        scan_rsp_correct = self._verify_scan_response_data(advertisement_data)
2976
2977        # Verify advertising is enabled.
2978        advertising_enabled = self.bluetooth_le_facade.btmon_find(
2979                'Advertising: Enabled (0x01)')
2980
2981        self.results = {
2982                'advertisement_added': advertisement_added,
2983                'manufacturer_data_found': manufacturer_data_found,
2984                'service_uuids_found': service_uuids_found,
2985                'service_data_found': service_data_found,
2986                'min_adv_interval_ms_found': min_adv_interval_ms_found,
2987                'max_adv_interval_ms_found': max_adv_interval_ms_found,
2988                'scan_rsp_correct': scan_rsp_correct,
2989                'advertising_enabled': advertising_enabled,
2990        }
2991        return all(self.results.values())
2992
2993
2994    @test_retry_and_log(False)
2995    def test_fail_to_register_advertisement(self, advertisement_data,
2996                                            min_adv_interval_ms,
2997                                            max_adv_interval_ms):
2998        """Verify that failure is incurred when max advertisements are reached.
2999
3000        This test verifies that a registration failure is incurred when
3001        max advertisements are reached. The error message looks like:
3002
3003            org.bluez.Error.Failed: Maximum advertisements reached
3004
3005        @param advertisement_data: the advertisement to register.
3006        @param min_adv_interval_ms: min_adv_interval in milliseconds.
3007        @param max_adv_interval_ms: max_adv_interval in milliseconds.
3008
3009        @returns: True if the error message is received correctly.
3010                  False otherwise.
3011
3012        """
3013        logging_timespan = self.compute_logging_timespan(max_adv_interval_ms)
3014        self._get_btmon_log(
3015                lambda: self.bluetooth_le_facade.register_advertisement(
3016                        advertisement_data),
3017                logging_timespan=logging_timespan)
3018
3019        # Verify that it failed to register advertisement due to the fact
3020        # that max advertisements are reached.
3021        failed_to_register_error = (self.ERROR_FAILED_TO_REGISTER_ADVERTISEMENT
3022                                    in self.advertising_msg)
3023
3024        # Verify that no new advertisement is added.
3025        advertisement_not_added = not self.bluetooth_le_facade.btmon_find(
3026                'Advertising Added:')
3027
3028        self.results = {
3029                'failed_to_register_error': failed_to_register_error,
3030                'advertisement_not_added': advertisement_not_added,
3031        }
3032
3033        # If the registration fails and extended advertising is available,
3034        # there will be no events in btmon. Therefore, we only run this part of
3035        # the test if extended advertising is not available, indicating that
3036        # software advertisement rotation is being used.
3037        if not self.ext_adv_enabled():
3038            # Verify that the advertising intervals are correct.
3039            min_adv_interval_ms_found, max_adv_interval_ms_found = (
3040                    self._verify_advertising_intervals(min_adv_interval_ms,
3041                                                       max_adv_interval_ms))
3042
3043            # Verify advertising remains enabled.
3044            advertising_enabled = self.bluetooth_le_facade.btmon_find(
3045                    'Advertising: Enabled (0x01)')
3046
3047            self.results.update({
3048                'min_adv_interval_ms_found': min_adv_interval_ms_found,
3049                'max_adv_interval_ms_found': max_adv_interval_ms_found,
3050                'advertising_enabled': advertising_enabled,
3051            })
3052
3053        return all(self.results.values())
3054
3055
3056    @test_retry_and_log(False)
3057    def test_unregister_advertisement(self, advertisement_data, instance_id,
3058                                      advertising_disabled):
3059        """Verify that an advertisement is unregistered correctly.
3060
3061        This test verifies the following data:
3062        - advertisement removed
3063        - advertising status: enabled if there are advertisements left;
3064                              disabled otherwise.
3065
3066        @param advertisement_data: the data of an advertisement to unregister.
3067        @param instance_id: the instance id of the advertisement to remove.
3068        @param advertising_disabled: is advertising disabled? This happens
3069                only when all advertisements are removed.
3070
3071        @returns: True if the advertisement is unregistered correctly.
3072                  False otherwise.
3073
3074        """
3075        self.count_advertisements -= 1
3076        self._get_btmon_log(
3077                lambda: self.bluetooth_le_facade.unregister_advertisement(
3078                        advertisement_data))
3079
3080        # Verify that the advertisement is removed.
3081        advertisement_removed = (
3082                self.bluetooth_le_facade.btmon_find('Advertising Removed') and
3083                self.bluetooth_le_facade.btmon_find('Instance: %d' %
3084                                                    instance_id))
3085
3086        # If advertising_disabled is True, there should be no log like
3087        #       'Advertising: Enabled (0x01)'
3088        # If advertising_disabled is False, there should be log like
3089        #       'Advertising: Enabled (0x01)'
3090
3091        # Only need to check advertising status when the last advertisement
3092        # is removed. For any other advertisements prior to the last one,
3093        # we may or may not observe 'Advertising: Enabled (0x01)' message.
3094        # Hence, the test would become flaky if we insist to see that message.
3095        # A possible workaround is to sleep for a while and then check the
3096        # message. The drawback is that we may need to wait up to 10 seconds
3097        # if the advertising duration and intervals are long.
3098        # In a test case, we always run test_check_duration_and_intervals()
3099        # to check if advertising duration and intervals are correct after
3100        # un-registering one or all advertisements, it is safe to do so.
3101        advertising_enabled_found = self.bluetooth_le_facade.btmon_find(
3102                'Advertising: Enabled (0x01)')
3103        advertising_disabled_found = self.bluetooth_le_facade.btmon_find(
3104                'Advertising: Disabled (0x00)')
3105        advertising_status_correct = not advertising_disabled or (
3106                advertising_disabled_found and not advertising_enabled_found)
3107
3108        self.results = {
3109                'advertisement_removed': advertisement_removed,
3110                'advertising_status_correct': advertising_status_correct,
3111        }
3112        return all(self.results.values())
3113
3114
3115    @test_retry_and_log(False)
3116    def test_set_advertising_intervals(self, min_adv_interval_ms,
3117                                       max_adv_interval_ms):
3118        """Verify that new advertising intervals are set correctly.
3119
3120        Note that setting advertising intervals does not enable/disable
3121        advertising. Hence, there is no need to check the advertising
3122        status.
3123
3124        @param min_adv_interval_ms: the min advertising interval in ms.
3125        @param max_adv_interval_ms: the max advertising interval in ms.
3126
3127        @returns: True if the new advertising intervals are correct.
3128                  False otherwise.
3129
3130        """
3131        self._get_btmon_log(
3132                lambda: self.bluetooth_le_facade.set_advertising_intervals(
3133                        min_adv_interval_ms, max_adv_interval_ms))
3134
3135        # Verify the new advertising intervals.
3136        # With intervals of 200 ms and 200 ms, the log looks like
3137        #   bluetoothd: Set Advertising Intervals: 0x0140, 0x0140
3138        txt = 'bluetoothd: Set Advertising Intervals: 0x%04x, 0x%04x'
3139        adv_intervals_found = self.bluetooth_le_facade.btmon_find(
3140                txt % (self.convert_to_adv_jiffies(min_adv_interval_ms),
3141                       self.convert_to_adv_jiffies(max_adv_interval_ms)))
3142
3143        self.results = {'adv_intervals_found': adv_intervals_found}
3144        return all(self.results.values())
3145
3146
3147    @test_retry_and_log(False)
3148    def test_fail_to_set_advertising_intervals(
3149            self, invalid_min_adv_interval_ms, invalid_max_adv_interval_ms,
3150            orig_min_adv_interval_ms, orig_max_adv_interval_ms):
3151        """Verify that setting invalid advertising intervals results in error.
3152
3153        If invalid min/max advertising intervals are given, it would incur
3154        the error: 'org.bluez.Error.InvalidArguments: Invalid arguments'.
3155        Note that valid advertising intervals fall between 20 ms and 10,240 ms.
3156
3157        @param invalid_min_adv_interval_ms: the invalid min advertising interval
3158                in ms.
3159        @param invalid_max_adv_interval_ms: the invalid max advertising interval
3160                in ms.
3161        @param orig_min_adv_interval_ms: the original min advertising interval
3162                in ms.
3163        @param orig_max_adv_interval_ms: the original max advertising interval
3164                in ms.
3165
3166        @returns: True if it fails to set invalid advertising intervals.
3167                  False otherwise.
3168
3169        """
3170        self._get_btmon_log(
3171                lambda: self.bluetooth_le_facade.set_advertising_intervals(
3172                        invalid_min_adv_interval_ms,
3173                        invalid_max_adv_interval_ms))
3174
3175        # Verify that the invalid error is observed in the dbus error callback
3176        # message.
3177        invalid_intervals_error = (self.ERROR_INVALID_ADVERTISING_INTERVALS in
3178                                   self.advertising_msg)
3179
3180        # Verify that the min/max advertising intervals remain the same
3181        # after setting the invalid advertising intervals.
3182        #
3183        # In btmon log, we would see the following message first.
3184        #    bluetoothd: Set Advertising Intervals: 0x0010, 0x0010
3185        # And then, we should check if "Min advertising interval" and
3186        # "Max advertising interval" remain the same.
3187        start_str = 'bluetoothd: Set Advertising Intervals: 0x%04x, 0x%04x' % (
3188                self.convert_to_adv_jiffies(invalid_min_adv_interval_ms),
3189                self.convert_to_adv_jiffies(invalid_max_adv_interval_ms))
3190
3191        search_strings = ['Min advertising interval:',
3192                          'Max advertising interval:']
3193        search_str = '|'.join(search_strings)
3194
3195        contents = self.bluetooth_le_facade.btmon_get(search_str=search_str,
3196                                                      start_str=start_str)
3197
3198        # The min/max advertising intervals of all advertisements should remain
3199        # the same as the previous valid ones.
3200        min_max_str = '[Min|Max] advertising interval: (\d*\.\d*) msec'
3201        min_max_pattern = re.compile(min_max_str)
3202        correct_orig_min_adv_interval = True
3203        correct_orig_max_adv_interval = True
3204        for line in contents:
3205            result = min_max_pattern.search(line)
3206            if result:
3207                interval = float(result.group(1))
3208                if 'Min' in line and interval != orig_min_adv_interval_ms:
3209                    correct_orig_min_adv_interval = False
3210                elif 'Max' in line and interval != orig_max_adv_interval_ms:
3211                    correct_orig_max_adv_interval = False
3212
3213        self.results = {
3214                'invalid_intervals_error': invalid_intervals_error,
3215                'correct_orig_min_adv_interval': correct_orig_min_adv_interval,
3216                'correct_orig_max_adv_interval': correct_orig_max_adv_interval}
3217
3218        return all(self.results.values())
3219
3220
3221    @test_retry_and_log(False)
3222    def test_check_advertising_intervals(self, min_adv_interval_ms,
3223                                         max_adv_interval_ms):
3224        """Verify that the advertising intervals are as expected.
3225
3226        @param min_adv_interval_ms: the min advertising interval in ms.
3227        @param max_adv_interval_ms: the max advertising interval in ms.
3228
3229        @returns: True if the advertising intervals are correct.
3230                  False otherwise.
3231
3232        """
3233        self._get_btmon_log(None)
3234
3235        # Verify that the advertising intervals are correct.
3236        min_adv_interval_ms_found, max_adv_interval_ms_found = (
3237                self._verify_advertising_intervals(min_adv_interval_ms,
3238                                                   max_adv_interval_ms))
3239
3240        self.results = {
3241                'min_adv_interval_ms_found': min_adv_interval_ms_found,
3242                'max_adv_interval_ms_found': max_adv_interval_ms_found,
3243        }
3244        return all(self.results.values())
3245
3246
3247    @test_retry_and_log(False)
3248    def test_reset_advertising(self, instance_ids=[]):
3249        """Verify that advertising is reset correctly.
3250
3251        Note that reset advertising would set advertising intervals to
3252        the default values. However, we would not be able to observe
3253        the values change until new advertisements are registered.
3254        Therefore, it is required that a test_register_advertisement()
3255        test is conducted after this test.
3256
3257        If instance_ids is [], all advertisements would still be removed
3258        if there are any. However, no need to check 'Advertising Removed'
3259        in btmon log since we may or may not be able to observe the message.
3260        This feature is needed if this test is invoked as the first one in
3261        a test case to reset advertising. In this situation, this test does
3262        not know how many advertisements exist.
3263
3264        @param instance_ids: the list of instance IDs that should be removed.
3265
3266        @returns: True if advertising is reset correctly.
3267                  False otherwise.
3268
3269        """
3270        self.count_advertisements = 0
3271        self._get_btmon_log(
3272                lambda: self.bluetooth_le_facade.reset_advertising())
3273
3274        # Verify that every advertisement is removed. When an advertisement
3275        # with instance id 1 is removed, the log looks like
3276        #   Advertising Removed
3277        #       instance: 1
3278        if len(instance_ids) > 0:
3279            advertisement_removed = self.bluetooth_le_facade.btmon_find(
3280                    'Advertising Removed')
3281            if advertisement_removed:
3282                for instance_id in instance_ids:
3283                    txt = 'Instance: %d' % instance_id
3284                    if not self.bluetooth_le_facade.btmon_find(txt):
3285                        advertisement_removed = False
3286                        break
3287        else:
3288            advertisement_removed = True
3289
3290        if not advertisement_removed:
3291            logging.error('Failed to remove advertisement')
3292
3293        # Verify that "Reset Advertising Intervals" command has been issued.
3294        reset_advertising_intervals = self.bluetooth_le_facade.btmon_find(
3295                'bluetoothd: Reset Advertising Intervals')
3296
3297        # Verify the advertising is disabled.
3298        advertising_disabled_observied = self.bluetooth_le_facade.btmon_find(
3299                'Advertising: Disabled')
3300        # If there are no existing advertisements, we may not observe
3301        # 'Advertising: Disabled'.
3302        advertising_disabled = (instance_ids == [] or
3303                                advertising_disabled_observied)
3304
3305        self.results = {
3306                'advertisement_removed': advertisement_removed,
3307                'reset_advertising_intervals': reset_advertising_intervals,
3308                'advertising_disabled': advertising_disabled,
3309        }
3310        return all(self.results.values())
3311
3312
3313    @test_retry_and_log(False)
3314    def test_receive_advertisement(self, address=None, UUID=None, timeout=10):
3315        """Verifies that we receive an advertisement with specific contents
3316
3317        Since test_discover_device only uses the existence of a device dbus path
3318        to indicate when a device is discovered, it is not adequate if we want
3319        to verify that we have received an advertisement from a device. This
3320        test monitors btmon around a discovery instance and searches for the
3321        relevant advertising report.
3322
3323        @param address: String address of peer
3324        @param UUID: String of hex data
3325        @param timeout: seconds to listen for traffic
3326
3327        @returns True if report was located, otherwise False
3328        """
3329
3330        def _discover_devices():
3331            self.test_start_discovery()
3332            time.sleep(timeout)
3333            self.test_stop_discovery()
3334
3335        # Run discovery, record btmon log
3336        self._get_btmon_log(_discover_devices)
3337
3338        # Grab all logs received
3339        btmon_log = '\n'.join(self.bluetooth_le_facade.btmon_get('', ''))
3340
3341        desired_strs = []
3342
3343        if address is not None:
3344            desired_strs.append('Address: {}'.format(address))
3345
3346        if UUID is not None:
3347            desired_strs.append('({})'.format(UUID))
3348
3349        # Split btmon events by HCI and MGMT delimiters
3350        event_delimiter = '|'.join(['@ MGMT', '> HCI', '< HCI'])
3351        btmon_events = re.split(event_delimiter, btmon_log)
3352
3353        features_located = False
3354
3355        for event_str in btmon_events:
3356            if 'Advertising Report' not in event_str:
3357                continue
3358
3359            for desired_str in desired_strs:
3360                if desired_str not in event_str:
3361                    break
3362
3363            else:
3364                features_located = True
3365
3366        self.results = {
3367                'features_located': features_located,
3368        }
3369        return all(self.results.values())
3370
3371
3372    def add_device(self, address, address_type, action):
3373        """Add a device to the Kernel action list."""
3374        return self.bluetooth_facade.add_device(address, address_type, action)
3375
3376
3377    def remove_device(self, address, address_type):
3378        """Remove a device from the Kernel action list."""
3379        return self.bluetooth_facade.remove_device(address,address_type)
3380
3381
3382    def read_supported_commands(self):
3383        """Read the set of supported commands from the Kernel."""
3384        return self.bluetooth_facade.read_supported_commands()
3385
3386
3387    def read_info(self):
3388        """Read the adapter information from the Kernel."""
3389        return self.bluetooth_facade.read_info()
3390
3391
3392    def get_adapter_properties(self):
3393        """Read the adapter properties from the Bluetooth Daemon."""
3394        return self.bluetooth_facade.get_adapter_properties()
3395
3396
3397    def get_dev_info(self):
3398        """Read raw HCI device information."""
3399        return self.bluetooth_facade.get_dev_info()
3400
3401    def log_settings(self, msg, settings):
3402        """function convert MGMT_OP_READ_INFO settings to string
3403
3404        @param msg: string to include in output
3405        @param settings: bitstring returned by MGMT_OP_READ_INFO
3406        @return : List of strings indicating different settings
3407        """
3408        strs = []
3409        if settings & bluetooth_socket.MGMT_SETTING_POWERED:
3410            strs.append("POWERED")
3411        if settings & bluetooth_socket.MGMT_SETTING_CONNECTABLE:
3412            strs.append("CONNECTABLE")
3413        if settings & bluetooth_socket.MGMT_SETTING_FAST_CONNECTABLE:
3414            strs.append("FAST-CONNECTABLE")
3415        if settings & bluetooth_socket.MGMT_SETTING_DISCOVERABLE:
3416            strs.append("DISCOVERABLE")
3417        if settings & bluetooth_socket.MGMT_SETTING_PAIRABLE:
3418            strs.append("PAIRABLE")
3419        if settings & bluetooth_socket.MGMT_SETTING_LINK_SECURITY:
3420            strs.append("LINK-SECURITY")
3421        if settings & bluetooth_socket.MGMT_SETTING_SSP:
3422            strs.append("SSP")
3423        if settings & bluetooth_socket.MGMT_SETTING_BREDR:
3424            strs.append("BR/EDR")
3425        if settings & bluetooth_socket.MGMT_SETTING_HS:
3426            strs.append("HS")
3427        if settings & bluetooth_socket.MGMT_SETTING_LE:
3428            strs.append("LE")
3429        logging.debug('%s : %s', msg, " ".join(strs))
3430        return strs
3431
3432    def log_flags(self, msg, flags):
3433        """Function to convert HCI state configuration to a string
3434
3435        @param msg: string to include in output
3436        @param settings: bitstring returned by get_dev_info
3437        @return : List of strings indicating different flags
3438        """
3439        strs = []
3440        if flags & bluetooth_socket.HCI_UP:
3441            strs.append("UP")
3442        else:
3443            strs.append("DOWN")
3444        if flags & bluetooth_socket.HCI_INIT:
3445            strs.append("INIT")
3446        if flags & bluetooth_socket.HCI_RUNNING:
3447            strs.append("RUNNING")
3448        if flags & bluetooth_socket.HCI_PSCAN:
3449            strs.append("PSCAN")
3450        if flags & bluetooth_socket.HCI_ISCAN:
3451            strs.append("ISCAN")
3452        if flags & bluetooth_socket.HCI_AUTH:
3453            strs.append("AUTH")
3454        if flags & bluetooth_socket.HCI_ENCRYPT:
3455            strs.append("ENCRYPT")
3456        if flags & bluetooth_socket.HCI_INQUIRY:
3457            strs.append("INQUIRY")
3458        if flags & bluetooth_socket.HCI_RAW:
3459            strs.append("RAW")
3460        logging.debug('%s [HCI]: %s', msg, " ".join(strs))
3461        return strs
3462
3463
3464    @test_retry_and_log(False)
3465    def test_service_resolved(self, address):
3466        """Test that the services under device address can be resolved
3467
3468        @param address: MAC address of a device
3469
3470        @returns: True if the ServicesResolved property is changed before
3471                 timeout, False otherwise.
3472
3473        """
3474        is_resolved_func = self.bluetooth_facade.device_services_resolved
3475        return self._wait_for_condition(lambda : is_resolved_func(address),\
3476                                        method_name())
3477
3478
3479    @test_retry_and_log(False)
3480    def test_gatt_browse(self, address):
3481        """Test that the GATT client can get the attributes correctly
3482
3483        @param address: MAC address of a device
3484
3485        @returns: True if the attribute map received by GATT client is the same
3486                  as expected. False otherwise.
3487
3488        """
3489
3490        gatt_client_facade = GATT_ClientFacade(self.bluetooth_facade)
3491        actual_app = gatt_client_facade.browse(address)
3492        expected_app = GATT_HIDApplication()
3493        diff = GATT_Application.diff(actual_app, expected_app)
3494
3495        self.result = {
3496            'actural_result': actual_app,
3497            'expected_result': expected_app
3498        }
3499
3500        gatt_attribute_hierarchy = ['Device', 'Service', 'Characteristic',
3501                                    'Descriptor']
3502        # Remove any difference in object path
3503        for parent, child in zip(gatt_attribute_hierarchy,
3504                                 gatt_attribute_hierarchy[1:]):
3505            pattern = re.compile('^%s .* is different in %s' % (child, parent))
3506            for diff_str in diff[::]:
3507                if pattern.search(diff_str):
3508                    diff.remove(diff_str)
3509
3510        if len(diff) != 0:
3511            logging.error('Application Diff: %s', diff)
3512            return False
3513        return True
3514
3515
3516    def _record_input_events(self, device, gesture, address=None):
3517        """Record the input events.
3518
3519        @param device: the bluetooth HID device.
3520        @param gesture: the gesture method to perform.
3521
3522        @returns: the input events received on the DUT.
3523
3524        """
3525        self.input_facade.initialize_input_recorder(device.name, uniq=address)
3526        self.input_facade.start_input_recorder(device.name)
3527        time.sleep(self.HID_REPORT_SLEEP_SECS)
3528        gesture()
3529        time.sleep(self.HID_REPORT_SLEEP_SECS)
3530        self.input_facade.stop_input_recorder(device.name)
3531        time.sleep(self.HID_REPORT_SLEEP_SECS)
3532        event_values = self.input_facade.get_input_events(device.name)
3533        events = [Event(*ev) for ev in event_values]
3534        return events
3535
3536
3537    # -------------------------------------------------------------------
3538    # Bluetooth mouse related tests
3539    # -------------------------------------------------------------------
3540
3541
3542    def _test_mouse_click(self, device, button):
3543        """Test that the mouse click events could be received correctly.
3544
3545        @param device: the meta device containing a bluetooth HID device
3546        @param button: which button to test, 'LEFT' or 'RIGHT'
3547
3548        @returns: True if the report received by the host matches the
3549                  expected one. False otherwise.
3550
3551        """
3552        if button == 'LEFT':
3553            gesture = device.LeftClick
3554        elif button == 'RIGHT':
3555            gesture = device.RightClick
3556        else:
3557            raise error.TestError('Button (%s) is not valid.' % button)
3558
3559        actual_events = self._record_input_events(device,
3560                                                  gesture,
3561                                                  address=device.address)
3562
3563        linux_input_button = {'LEFT': BTN_LEFT, 'RIGHT': BTN_RIGHT}
3564        expected_events = [
3565                # Button down
3566                recorder.MSC_SCAN_BTN_EVENT[button],
3567                Event(EV_KEY, linux_input_button[button], 1),
3568                recorder.SYN_EVENT,
3569                # Button up
3570                recorder.MSC_SCAN_BTN_EVENT[button],
3571                Event(EV_KEY, linux_input_button[button], 0),
3572                recorder.SYN_EVENT]
3573
3574        self.results = {
3575                'actual_events': list(map(str, actual_events)),
3576                'expected_events': list(map(str, expected_events))}
3577        return actual_events == expected_events
3578
3579
3580    @test_retry_and_log
3581    def test_mouse_left_click(self, device):
3582        """Test that the mouse left click events could be received correctly.
3583
3584        @param device: the meta device containing a bluetooth HID device
3585
3586        @returns: True if the report received by the host matches the
3587                  expected one. False otherwise.
3588
3589        """
3590        return self._test_mouse_click(device, 'LEFT')
3591
3592
3593    @test_retry_and_log
3594    def test_mouse_right_click(self, device):
3595        """Test that the mouse right click events could be received correctly.
3596
3597        @param device: the meta device containing a bluetooth HID device
3598
3599        @returns: True if the report received by the host matches the
3600                  expected one. False otherwise.
3601
3602        """
3603        return self._test_mouse_click(device, 'RIGHT')
3604
3605
3606    def _test_mouse_move(self, device, delta_x=0, delta_y=0):
3607        """Test that the mouse move events could be received correctly.
3608
3609        @param device: the meta device containing a bluetooth HID device
3610        @param delta_x: the distance to move cursor in x axis
3611        @param delta_y: the distance to move cursor in y axis
3612
3613        @returns: True if the report received by the host matches the
3614                  expected one. False otherwise.
3615
3616        """
3617        gesture = lambda: device.Move(delta_x, delta_y)
3618        actual_events = self._record_input_events(device,
3619                                                  gesture,
3620                                                  address=device.address)
3621
3622        events_x = [Event(EV_REL, REL_X, delta_x)] if delta_x else []
3623        events_y = [Event(EV_REL, REL_Y, delta_y)] if delta_y else []
3624        expected_events = events_x + events_y + [recorder.SYN_EVENT]
3625
3626        self.results = {
3627                'actual_events': list(map(str, actual_events)),
3628                'expected_events': list(map(str, expected_events))}
3629        return actual_events == expected_events
3630
3631
3632    @test_retry_and_log
3633    def test_mouse_move_in_x(self, device, delta_x):
3634        """Test that the mouse move events in x could be received correctly.
3635
3636        @param device: the meta device containing a bluetooth HID device
3637        @param delta_x: the distance to move cursor in x axis
3638
3639        @returns: True if the report received by the host matches the
3640                  expected one. False otherwise.
3641
3642        """
3643        return self._test_mouse_move(device, delta_x=delta_x)
3644
3645
3646    @test_retry_and_log
3647    def test_mouse_move_in_y(self, device, delta_y):
3648        """Test that the mouse move events in y could be received correctly.
3649
3650        @param device: the meta device containing a bluetooth HID device
3651        @param delta_y: the distance to move cursor in y axis
3652
3653        @returns: True if the report received by the host matches the
3654                  expected one. False otherwise.
3655
3656        """
3657        return self._test_mouse_move(device, delta_y=delta_y)
3658
3659
3660    @test_retry_and_log
3661    def test_mouse_move_in_xy(self, device, delta_x, delta_y):
3662        """Test that the mouse move events could be received correctly.
3663
3664        @param device: the meta device containing a bluetooth HID device
3665        @param delta_x: the distance to move cursor in x axis
3666        @param delta_y: the distance to move cursor in y axis
3667
3668        @returns: True if the report received by the host matches the
3669                  expected one. False otherwise.
3670
3671        """
3672        return self._test_mouse_move(device, delta_x=delta_x, delta_y=delta_y)
3673
3674
3675    def _test_mouse_scroll(self, device, units):
3676        """Test that the mouse wheel events could be received correctly.
3677
3678        @param device: the meta device containing a bluetooth HID device
3679        @param units: the units to scroll in y axis
3680
3681        @returns: True if the report received by the host matches the
3682                  expected one. False otherwise.
3683
3684        """
3685        gesture = lambda: device.Scroll(units)
3686        recorded_events = self._record_input_events(device,
3687                                                    gesture,
3688                                                    address=device.address)
3689
3690        # Since high-speed scrolling events are inserted after they are passed
3691        # through bluetooth module, we ignore these events since they are
3692        # irrelevant for us
3693        scroll_events = [ev for ev in recorded_events
3694                            if ev.code != REL_WHEEL_HI_RES]
3695
3696        expected_events = [Event(EV_REL, REL_WHEEL, units), recorder.SYN_EVENT]
3697        self.results = {
3698                'scroll_events': list(map(str, scroll_events)),
3699                'expected_events': list(map(str, expected_events))}
3700        return scroll_events == expected_events
3701
3702
3703    @test_retry_and_log
3704    def test_mouse_scroll_down(self, device, delta_y):
3705        """Test that the mouse wheel events could be received correctly.
3706
3707        @param device: the meta device containing a bluetooth HID device
3708        @param delta_y: the units to scroll down in y axis;
3709                        should be a postive value
3710
3711        @returns: True if the report received by the host matches the
3712                  expected one. False otherwise.
3713
3714        """
3715        if delta_y > 0:
3716            return self._test_mouse_scroll(device, delta_y)
3717        else:
3718            raise error.TestError('delta_y (%d) should be a positive value',
3719                                  delta_y)
3720
3721
3722    @test_retry_and_log
3723    def test_mouse_scroll_up(self, device, delta_y):
3724        """Test that the mouse wheel events could be received correctly.
3725
3726        @param device: the meta device containing a bluetooth HID device
3727        @param delta_y: the units to scroll up in y axis;
3728                        should be a postive value
3729
3730        @returns: True if the report received by the host matches the
3731                  expected one. False otherwise.
3732
3733        """
3734        if delta_y > 0:
3735            return self._test_mouse_scroll(device, -delta_y)
3736        else:
3737            raise error.TestError('delta_y (%d) should be a positive value',
3738                                  delta_y)
3739
3740
3741    @test_retry_and_log
3742    def test_mouse_click_and_drag(self, device, delta_x, delta_y):
3743        """Test that the mouse click-and-drag events could be received
3744        correctly.
3745
3746        @param device: the meta device containing a bluetooth HID device
3747        @param delta_x: the distance to drag in x axis
3748        @param delta_y: the distance to drag in y axis
3749
3750        @returns: True if the report received by the host matches the
3751                  expected one. False otherwise.
3752
3753        """
3754        gesture = lambda: device.ClickAndDrag(delta_x, delta_y)
3755        actual_events = self._record_input_events(device,
3756                                                  gesture,
3757                                                  address=device.address)
3758
3759        button = 'LEFT'
3760        expected_events = (
3761                [# Button down
3762                 recorder.MSC_SCAN_BTN_EVENT[button],
3763                 Event(EV_KEY, BTN_LEFT, 1),
3764                 recorder.SYN_EVENT] +
3765                # cursor movement in x and y
3766                ([Event(EV_REL, REL_X, delta_x)] if delta_x else []) +
3767                ([Event(EV_REL, REL_Y, delta_y)] if delta_y else []) +
3768                [recorder.SYN_EVENT] +
3769                # Button up
3770                [recorder.MSC_SCAN_BTN_EVENT[button],
3771                 Event(EV_KEY, BTN_LEFT, 0),
3772                 recorder.SYN_EVENT])
3773
3774        self.results = {
3775                'actual_events': list(map(str, actual_events)),
3776                'expected_events': list(map(str, expected_events))}
3777        return actual_events == expected_events
3778
3779
3780    # -------------------------------------------------------------------
3781    # Bluetooth keyboard related tests
3782    # -------------------------------------------------------------------
3783
3784    # TODO may be deprecated as stated in b:140515628
3785    @test_retry_and_log
3786    def test_keyboard_input_from_string(self, device, string_to_send):
3787        """Test that the keyboard's key events could be received correctly.
3788
3789        @param device: the meta device containing a bluetooth HID device
3790        @param string_to_send: the set of keys that will be pressed one-by-one
3791
3792        @returns: True if the report received by the host matches the
3793                  expected one. False otherwise.
3794
3795        """
3796
3797        gesture = lambda: device.KeyboardSendString(string_to_send)
3798
3799        actual_events = self._record_input_events(device,
3800                                                  gesture,
3801                                                  address=device.address)
3802
3803        resulting_string = bluetooth_test_utils.reconstruct_string(
3804                           actual_events)
3805
3806        return string_to_send == resulting_string
3807
3808
3809    @test_retry_and_log
3810    def test_keyboard_input_from_trace(self, device, trace_name):
3811        """ Tests that keyboard events can be transmitted and received correctly
3812
3813        @param device: the meta device containing a bluetooth HID device
3814        @param trace_name: string name for keyboard activity trace to be used
3815                           in the test i.e. "simple_text"
3816
3817        @returns: true if the recorded output matches the expected output
3818                  false otherwise
3819        """
3820        length_correct = True
3821        content_correct = True
3822
3823        # Read data from trace I/O files
3824        input_trace = bluetooth_test_utils.parse_trace_file(os.path.join(
3825                      TRACE_LOCATION, '{}_input.txt'.format(trace_name)))
3826        output_trace = bluetooth_test_utils.parse_trace_file(os.path.join(
3827                      TRACE_LOCATION, '{}_output.txt'.format(trace_name)))
3828
3829        if not input_trace or not output_trace:
3830            logging.error('Failure in using trace')
3831            return False
3832
3833        # Disregard timing data for now
3834        input_scan_codes = [tup[1] for tup in input_trace]
3835        predicted_events = [Event(*tup[1]) for tup in output_trace]
3836
3837        # Create and run this trace as a gesture
3838        gesture = lambda: device.KeyboardSendTrace(input_scan_codes)
3839        rec_events = self._record_input_events(device,
3840                                               gesture,
3841                                               address=device.address)
3842
3843        # Filter out any input events that were not from the keyboard
3844        rec_key_events = [ev for ev in rec_events if ev.type == EV_KEY]
3845
3846        # Fail if we didn't record the correct number of events
3847        if len(rec_key_events) != len(input_scan_codes):
3848            logging.info('Expected {} events, received {}'.format(
3849                    len(input_scan_codes), len(rec_key_events)))
3850            length_correct = False
3851
3852        for idx, predicted in enumerate(predicted_events):
3853            recorded = rec_key_events[idx]
3854
3855            if not predicted == recorded:
3856                content_correct = False
3857                break
3858
3859        self.results = {
3860            'received_events': len(rec_key_events) > 0,
3861            'length_correct': length_correct,
3862            'content_correct': content_correct,
3863        }
3864
3865        return all(self.results)
3866
3867
3868    def is_newer_kernel_version(self, version, minimum_version):
3869        """ Check if given kernel version is newer than unsupported version."""
3870
3871        return utils.compare_versions(version, minimum_version) >= 0
3872
3873
3874    def is_supported_kernel_version(self, kernel_version, minimum_version,
3875                                    msg=None):
3876        """ Check if kernel version is greater than minimum version.
3877
3878            Check if given kernel version is greater than or equal to minimum
3879            version. Raise TEST_NA if given kernel version is lower than the
3880            minimum version.
3881
3882            Note: Kernel version may have suffixes, so ensure that minimum
3883            version should be the smallest version that is permissible.
3884            Ex: If minimum version is 3.8.11 then 3.8.11-<random> will
3885            pass the check.
3886
3887            @param kernel_version: kernel version to be checked as a string
3888            @param: minimum_version: minimum kernel version requried
3889
3890            @returns: None
3891
3892            @raises: TEST_NA if kernel version is not greater than the minimum
3893                     version
3894        """
3895
3896        logging.debug('kernel version is {} minimum version'
3897                      'is {}'.format(kernel_version,minimum_version))
3898
3899        if msg is None:
3900            msg = 'Test not supported on this kernel version'
3901
3902        if not self.is_newer_kernel_version(kernel_version, minimum_version):
3903            logging.info('Kernel version check failed: %s', msg)
3904            raise error.TestNAError(msg)
3905
3906        logging.debug('Kernel version check passed')
3907
3908
3909    # -------------------------------------------------------------------
3910    # Bluetooth AVRCP related test
3911    # -------------------------------------------------------------------
3912
3913
3914    @test_retry_and_log
3915    def test_avrcp_event(self, device, generator, avrcp_event):
3916        """Tests that AVRCP events can be transmitted and received correctly
3917
3918        @param device: the meta device containing a Bluetooth AVRCP capable
3919                       audio device.
3920        @param generator: the peer device generator/function which trigger
3921                          the AVRCP event.
3922        @param avrcp_event: the AVRCP event to test.
3923
3924        @returns: true if the recorded output matches the expected output
3925                  false otherwise
3926        """
3927        logging.debug('AVRCP Event Test, Event: %s', avrcp_event)
3928        linux_input_button = {'play': KEY_PLAYCD, 'pause': KEY_PAUSECD,
3929                              'stop': KEY_STOPCD, 'next': KEY_NEXTSONG,
3930                              'previous': KEY_PREVIOUSSONG}
3931        expected_event = [
3932                # Button down
3933                Event(EV_KEY, linux_input_button[avrcp_event], 1),
3934                recorder.SYN_EVENT,
3935                # Button up
3936                Event(EV_KEY, linux_input_button[avrcp_event], 0),
3937                recorder.SYN_EVENT]
3938
3939        gesture = lambda: generator(avrcp_event)
3940        actual_event = self._record_input_events(device, gesture)
3941
3942        return actual_event == expected_event
3943
3944
3945    # -------------------------------------------------------------------
3946    # Servod related tests
3947    # -------------------------------------------------------------------
3948
3949    @test_retry_and_log
3950    def test_power_consumption(self, device, max_power_mw):
3951        """Test the average power consumption."""
3952        power_mw = device.servod.MeasurePowerConsumption()
3953        self.results = {'power_mw': power_mw}
3954
3955        if (power_mw is None):
3956            logging.error('Failed to measure power consumption')
3957            return False
3958
3959        power_mw = float(power_mw)
3960        logging.info('power consumption (mw): %f (max allowed: %f)',
3961                     power_mw, max_power_mw)
3962
3963        return power_mw <= max_power_mw
3964
3965
3966    @test_retry_and_log
3967    def test_start_notify(self, object_path, cccd_value):
3968        """Test that a notification can be started on a characteristic
3969
3970        @param object_path: the object path of the characteristic.
3971        @param cccd_value: Possible CCCD values include
3972               0x00 - inferred from the remote characteristic's properties
3973               0x01 - notification
3974               0x02 - indication
3975
3976        @returns: The test results.
3977
3978        """
3979        if object_path is None:
3980            logging.error('Invalid object path')
3981            return False
3982
3983        start_notify = self.bluetooth_facade.start_notify(
3984            object_path, cccd_value)
3985        is_notifying = self._wait_for_condition(
3986            lambda: self.bluetooth_facade.is_notifying(
3987                object_path), method_name())
3988
3989        self.results = {
3990            'start_notify': start_notify,
3991            'is_notifying': is_notifying}
3992
3993        return all(self.results.values())
3994
3995
3996    @test_retry_and_log
3997    def test_stop_notify(self, object_path):
3998        """Test that a notification can be stopped on a characteristic
3999
4000        @param object_path: the object path of the characteristic.
4001
4002        @returns: The test results.
4003
4004        """
4005        if object_path is None:
4006            logging.error('Invalid object path')
4007            return False
4008
4009        stop_notify = self.bluetooth_facade.stop_notify(object_path)
4010        is_not_notifying = self._wait_for_condition(
4011            lambda: not self.bluetooth_facade.is_notifying(
4012                object_path), method_name())
4013
4014        self.results = {
4015            'stop_notify': stop_notify,
4016            'is_not_notifying': is_not_notifying}
4017
4018        return all(self.results.values())
4019
4020
4021    @test_retry_and_log(False)
4022    def test_set_discovery_filter(self, filter):
4023        """Test set discovery filter"""
4024
4025        return self.bluetooth_facade.set_discovery_filter(filter)
4026
4027
4028    @test_retry_and_log(False)
4029    def test_set_le_connection_parameters(self, address, parameters):
4030        """Test set LE connection parameters"""
4031
4032        return self.bluetooth_facade.set_le_connection_parameters(
4033            address, parameters)
4034
4035
4036    @test_retry_and_log(False)
4037    def test_get_connection_info(self, address):
4038        """Test that connection info to device is retrievable."""
4039
4040        return (self.bluetooth_facade.get_connection_info(address)
4041                is not None)
4042
4043
4044    @test_retry_and_log(False, messages_stop=False)
4045    def test_suspend_and_wait_for_sleep(self, suspend, sleep_timeout):
4046        """ Suspend the device and wait until it is sleeping.
4047
4048        @param suspend: Sub-process that does the actual suspend call.
4049        @param sleep_timeout time limit in seconds to allow the host sleep.
4050
4051        @return True if host is asleep within a short timeout, False otherwise.
4052        """
4053        suspend.start()
4054        try:
4055            self.host.test_wait_for_sleep(sleep_timeout)
4056        except Exception as e:
4057            suspend.join()
4058            self.results = {'exception': str(e)}
4059            return False
4060
4061        return True
4062
4063
4064    @test_retry_and_log(False, messages_start=False)
4065    def test_wait_for_resume(self,
4066                             boot_id,
4067                             suspend,
4068                             resume_timeout,
4069                             test_start_time,
4070                             resume_slack=RESUME_DELTA,
4071                             fail_on_timeout=False,
4072                             fail_early_wake=True):
4073        """ Wait for device to resume from suspend.
4074
4075        @param boot_id: Current boot id
4076        @param suspend: Sub-process that does actual suspend call.
4077        @param resume_timeout: Expect device to resume in given timeout.
4078        @param test_start_time: When was this test started? (device time)
4079        @param resume_slack: Allow some slack on resume timeout.
4080        @param fail_on_timeout: Fails if timeout is reached
4081        @param fail_early_wake: Fails if timeout isn't reached
4082
4083        @return True if suspend sub-process completed without error.
4084        """
4085        success = False
4086        results = {}
4087
4088        def _check_timeout(delta):
4089            if delta > timedelta(seconds=resume_timeout):
4090                return not fail_on_timeout
4091            else:
4092                return not fail_early_wake
4093
4094        def _check_suspend_attempt_or_raise(test_start, wake_at):
4095            """Make sure suspend attempt was recent or raise TestNA.
4096
4097            If we're looking at a previous suspend attempt, it means the test
4098            didn't trigger a suspend properly (i.e. no powerd call)
4099
4100            @param test_start: When we started the test.
4101            @param wake_at: When powerd suspend resumed.
4102
4103            @raises: error.TestNAError if found suspend occurred before we
4104                     started the test.
4105            """
4106            # If the last suspend attempt was before we started the test, it's
4107            # probably not a recent attempt.
4108            if wake_at < test_start:
4109                raise error.TestNAError(
4110                        'No recent suspend attempt found. '
4111                        'Started test at {} but last suspend ended at {}'.
4112                        format(test_start, wake_at))
4113
4114            return True
4115
4116        def _check_retcode_or_raise(retcode):
4117            """Make sure powerd return was successful.
4118
4119            @param retcode: Return code of powerd_suspend.
4120
4121            @raises: error.TestNAError if failed suspend due to non-BT
4122            @return: False if BT woke us, True otherwise
4123            """
4124            if retcode:
4125                if self.bluetooth_facade.bt_caused_last_resume():
4126                    return False
4127                else:
4128                    raise error.TestNAError(
4129                            'Failed suspend due to non-BT wake')
4130
4131            return True
4132
4133        # Sometimes it takes longer to resume from suspend; give some leeway
4134        resume_timeout = resume_timeout + resume_slack
4135        results['resume timeout'] = resume_timeout
4136        try:
4137            start = datetime.now()
4138
4139            # Wait for resume needs to wait longer in case device rebooted.
4140            # Otherwise, the test will fail with errno 111 (connection refused)
4141            self.host.test_wait_for_resume(
4142                boot_id, resume_timeout=self.RESUME_INTERNAL_TIMEOUT_SECS)
4143
4144            results['device accessible on resume'] = True
4145
4146            # As of now, a timeout in test_wait_for_resume doesn't raise. Start
4147            # by first measuring the delta until network is back up to the dut.
4148            network_delta = datetime.now() - start
4149
4150            # Use powerd logs to see how much time we actually spent in suspend
4151            # If the network went down during suspend, we will have spent less
4152            # time in suspend than expected. If we can't find info via powerd,
4153            # we can use measured time instead.
4154            info = self.bluetooth_facade.find_last_suspend_via_powerd_logs()
4155            if info:
4156                (start_suspend_at, end_suspend_at, retcode) = info
4157                actual_delta = end_suspend_at - start_suspend_at
4158                results['powerd time to resume'] = actual_delta.total_seconds()
4159                results['powerd retcode'] = retcode
4160
4161                # Resume is successful if suspend occurred correctly and woke up
4162                # within the timeout. One significant caveat is that we only
4163                # fail here if BT blocked suspend, not if we woke spuriously.
4164                # This is by design (we depend on the timeout to check for
4165                # spurious wakeup).
4166                success = _check_suspend_attempt_or_raise(
4167                        test_start_time,
4168                        end_suspend_at) and _check_retcode_or_raise(
4169                                retcode) and _check_timeout(actual_delta)
4170            else:
4171                results['time to resume'] = network_delta.total_seconds()
4172                success = _check_timeout(network_delta)
4173        except error.TestFail as e:
4174            results['device accessible on resume'] = False
4175            success = False
4176            logging.error('wait_for_resume: %s', e)
4177
4178            # If the resume failed due to a reboot, raise the testFail and exit
4179            # early from the test
4180            if 'client rebooted' in str(e):
4181                raise
4182        finally:
4183            suspend.join()
4184
4185        results['success'] = success
4186        results['suspend exit code'] = suspend.exitcode
4187        self.results = results
4188
4189        return all([success, suspend.exitcode == 0])
4190
4191
4192    def suspend_async(self, suspend_time, expect_bt_wake=False):
4193        """ Suspend asynchronously and return process for joining
4194
4195        @param suspend_time: how long to stay in suspend
4196        @param expect_bt_wake: Whether we expect bluetooth to wake us from
4197            suspend. If true, we expect this resume will occur early
4198
4199        @returns multiprocessing.Process object with suspend task
4200        """
4201
4202        def _action_suspend():
4203            try:
4204                self.bluetooth_facade.do_suspend(suspend_time, expect_bt_wake)
4205            except socket.error as e:
4206                # Socket errors may occur after suspend if the underlying
4207                # connection is lost during suspend (happens if usb-ethernet
4208                # disconnects and reconnects on resume). Catch all these errors
4209                # and swallow them.
4210                logging.warning(
4211                        'Socket error on suspend. Swallowing error: %s',
4212                        str(e))
4213            return 0
4214
4215        proc = multiprocessing.Process(target=_action_suspend)
4216        proc.daemon = True
4217        return proc
4218
4219
4220    def device_connect_async(self,
4221                             device_type,
4222                             device,
4223                             adapter_address,
4224                             delay_wake=1,
4225                             should_wake=True):
4226        """ Connects peer device asynchronously with DUT.
4227
4228        This function uses a thread instead of a subprocess so that the test
4229        result is stored for the test. Otherwise, the test connection was
4230        sometimes failing but the test itself was passing.
4231
4232        @param device_type: The device type (used to check if it's LE)
4233        @param device: the meta device with the peer device
4234        @param adapter_address: the address of the adapter
4235        @param delay_wake: delay wakeup by this many seconds
4236        @param should_wake: Should this cause a wakeup?
4237
4238        @returns threading.Thread object with device connect task
4239        """
4240
4241        def _action_device_connect():
4242            time.sleep(delay_wake)
4243            if 'BLE' in device_type:
4244                # LE reconnects by advertising (dut controller will create LE
4245                # connection, not the peer device)
4246                self.test_device_set_discoverable(device, True)
4247            else:
4248                # Classic requires peer to initiate a connection to wake up the
4249                # dut
4250                connect_func = self.test_connection_by_device_only
4251                if should_wake:
4252                    connect_func(device, adapter_address)
4253                else:
4254                    # If we're not expecting wake, this connect attempt will
4255                    # probably fail.
4256                    self.ignore_failure(connect_func, device, adapter_address)
4257
4258        thread = threading.Thread(target=_action_device_connect)
4259        return thread
4260
4261
4262    @test_retry_and_log(False)
4263    def test_hid_device_created(self, device_address):
4264        """ Tests that the hid device is created before using it for tests.
4265
4266        @param device_address: Address of peripheral device
4267        """
4268        device_found = self.bluetooth_facade.wait_for_hid_device(
4269                device_address)
4270        self.results = {
4271                'device_found': device_found
4272        }
4273        return all(self.results.values())
4274
4275
4276    @test_retry_and_log
4277    def test_battery_reporting(self, device):
4278        """ Tests that battery reporting through GATT can be received
4279
4280        @param device: the meta device containing a Bluetooth device
4281
4282        @returns: true if battery reporting is received
4283        """
4284
4285        percentage = self.bluetooth_facade.get_battery_property(
4286                device.address, 'Percentage')
4287
4288        return percentage > 0
4289
4290    def _apply_new_adapter_alias(self, alias):
4291        """ Sets new system alias and applies discoverable setting
4292
4293        @param alias: string alias to be applied to Adapter->Alias property
4294        """
4295
4296        # Set Adapter's Alias property
4297        self.bluetooth_facade.set_adapter_alias(alias)
4298
4299        # Set discoverable setting on
4300        self.bluetooth_facade.set_discoverable(True)
4301
4302    @test_retry_and_log(False)
4303    def test_set_adapter_alias(self, alias):
4304        """ Validates that a new adapter alias is applied correctly
4305
4306        @param alias: string alias to be applied to Adapter->Alias property
4307
4308        @returns: True if the applied alias is properly applied in btmon trace
4309        """
4310
4311        orig_alias = self.get_adapter_properties()['Alias']
4312        self.bluetooth_le_facade = self.bluetooth_facade
4313
4314        # 1. Capture btmon logs around alias set operation
4315        self._get_btmon_log(lambda: self._apply_new_adapter_alias(alias))
4316
4317        # 2. Verify that name appears in btmon trace with the following format:
4318        # "Name (complete): Chromebook_BA0E" as appears in EIR data set
4319        expected_alias_str = 'Name (complete): ' + alias
4320        alias_found = self.bluetooth_facade.btmon_find(expected_alias_str)
4321
4322        # 3. Re-apply previous bluez alias as other tests expect default
4323        self.bluetooth_facade.set_adapter_alias(orig_alias)
4324
4325        self.results = {'alias_found': alias_found}
4326        return all(self.results.values())
4327
4328    # -------------------------------------------------------------------
4329    # Autotest methods
4330    # -------------------------------------------------------------------
4331
4332
4333    def initialize(self):
4334        """Initialize bluetooth adapter tests."""
4335        # Run through every tests and collect failed tests in self.fails.
4336        self.fails = []
4337
4338        # If a test depends on multiple conditions, write the results of
4339        # the conditions in self.results so that it is easy to know
4340        # what conditions failed by looking at the log.
4341        self.results = None
4342
4343        # Some tests may instantiate a peripheral device for testing.
4344        self.devices = dict()
4345        self.shared_peers = []
4346        for device_type in SUPPORTED_DEVICE_TYPES:
4347            self.devices[device_type] = list()
4348
4349        # The count of registered advertisements.
4350        self.count_advertisements = 0
4351
4352
4353    def update_btpeer(self):
4354        """ Check and update the chameleond bundle on Bluetooth peer
4355        Latest chameleond bundle and git commit is stored in the google cloud
4356        This function compares the git commit of the Bluetooth peers and update
4357        the peer if the commit does not match
4358
4359        @returns True: If all peer are updated to (or currently) in latest
4360                       commit. False if any update fails
4361
4362        """
4363        def _update_btpeer():
4364            status = {}
4365            for peer in self.host.btpeer_list:
4366                status[peer] = {}
4367                status[peer]['update_needed'] = \
4368                    bluetooth_peer_update.is_update_needed(peer, commit)
4369
4370            logging.debug(status)
4371            if not any([v['update_needed'] for v in status.values()]):
4372                logging.info('No peer needed update')
4373                return True
4374            logging.debug('Atleast one peer needs update')
4375
4376            if not bluetooth_peer_update.download_installation_files(self.host,
4377                                                                     commit):
4378                logging.error('Unable to download installation files ')
4379                return False
4380
4381            # TODO(b:160782273) Make this parallel
4382            for peer in self.host.btpeer_list:
4383                if status[peer]['update_needed']:
4384                    status[peer]['updated'], status[peer]['reason'] = \
4385                        bluetooth_peer_update.update_peer(peer, commit)
4386
4387            for peer, v in status.items():
4388                if not v['update_needed']:
4389                    logging.debug('peer %s did not need update', str(peer.host))
4390                elif not v['updated']:
4391                    logging.error('update peer %s failed %s', str(peer.host),
4392                                  v['reason'])
4393                else:
4394                    logging.debug('peer %s updated successfully',
4395                                  str(peer.host))
4396
4397            return all([v['updated'] for v in status.values()
4398                        if v['update_needed']])
4399
4400        try:
4401            commit = None
4402            (_, commit) = bluetooth_peer_update.get_latest_commit()
4403            if commit is None:
4404                logging.error('Unable to get current commit')
4405                return False
4406
4407            return _update_btpeer()
4408        except Exception as e:
4409            logging.error('Exception %s in update_btpeer', str(e))
4410            return False
4411        finally:
4412            if not bluetooth_peer_update.cleanup(self.host, commit):
4413                logging.error('Update peer cleanup failed')
4414
4415
4416    def get_chipset_name(self):
4417        """ Get the name of BT/WiFi chipset on this host
4418
4419        @returns chipset name if successful else ''
4420        """
4421        (vid,pid) = self.bluetooth_facade.get_wlan_vid_pid()
4422        logging.debug('Bluetooth module vid pid is %s %s', vid, pid)
4423        if vid is None or pid is None:
4424            # Controllers that aren't WLAN+BT combo chips does not expose
4425            # Vendor ID/Product ID. Use alternate method.
4426            # This will return one of ['WCN3991', ''] or a string containing
4427            # the name of chipset read from DUT
4428            return self.bluetooth_facade.get_bt_module_name()
4429        for name, l in CHIPSET_TO_VIDPID.items():
4430            if (vid, pid) in l:
4431                return name
4432        return ''
4433
4434
4435    def verify_device_rssi(self, address_list):
4436        """ Test device rssi is over required threshold.
4437
4438        @param address_list: List of peer devices to verify address for
4439
4440        @raises error.TestNA if any device isn't found or RSSI is too low
4441        """
4442        try:
4443            self.test_start_discovery()
4444            for device_address in address_list:
4445                # The RSSI property is only maintained while discovery is
4446                # enabled.  Stopping discovery removes the property. Thus, look
4447                # up the RSSI without modifying discovery state.
4448                found = self.test_discover_device(device_address,
4449                                                  start_discovery=False,
4450                                                  stop_discovery=False)
4451                rssi = self.bluetooth_facade.get_device_property(
4452                        device_address, 'RSSI')
4453
4454                if not found:
4455                    logging.info('Failing with TEST_NA as peer %s was not'
4456                                  ' discovered', device_address)
4457                    raise error.TestNAError(
4458                            'Peer {} not discovered'.format(device_address))
4459
4460                if not rssi or rssi < self.MIN_RSSI:
4461                    logging.info('Failing with TEST_NA since RSSI (%s) is low ',
4462                                  rssi)
4463                    raise error.TestNAError(
4464                            'Peer {} RSSI is too low: {}'.format(
4465                                    device_address, rssi))
4466
4467                logging.info('Peer {} RSSI {}'.format(device_address, rssi))
4468        finally:
4469            self.test_stop_discovery()
4470
4471
4472    def verify_controller_capability(self, required_roles=[],
4473                                     test_type=''):
4474        """Raise an exception if required role support isn't present
4475
4476        @param required_roles: List of test role requirements in
4477                               ["central", "peripheral", "central-peripheral"]
4478
4479        @raises: error.TestFail if device does not meet requirements
4480                                AND test_type is 'AVL'
4481                 error.TestNA if device does not meet requirements
4482                                and test_type is not 'AVL'
4483        """
4484
4485        adapter_props = self.get_adapter_properties()
4486
4487        supported_roles = adapter_props.get('Roles', [])
4488
4489        for req in required_roles:
4490            if req not in supported_roles:
4491                # We don't meet requirements, throw error
4492                msg = 'Role requirement {} not in supported modes {}'.format(
4493                      req, supported_roles)
4494
4495                if test_type == 'AVL':
4496                    raise error.TestFail(msg)
4497
4498                logging.info('Failing with TEST_NA due to %s', msg)
4499                raise error.TestNAError(msg)
4500
4501
4502    def set_fail_fast(self, args_dict, default=False):
4503        """Set whether the test should fail fast if running into any problem
4504
4505        By default it should not fail fast so that a batch test can continue
4506        running the rest after a failure in one test
4507
4508        :param args_dict: the arguments passed int from the command line
4509        :param default: the default value when the flag is missing from the
4510                        args_dict
4511
4512        """
4513        flag_name = 'fail_fast'
4514        if args_dict and flag_name in args_dict:
4515            self.fail_fast = bool(args_dict[flag_name].lower() == 'true')
4516        else:
4517            self.fail_fast = default
4518
4519
4520    def assert_discover_and_pair(self, device):
4521        """ Discovers and pairs given device. Automatically connects too.
4522
4523        If any of the test expressions fail, it will raise an error so only call
4524        this function as a setup for a test.
4525        """
4526        self.assert_on_fail(self.test_device_set_discoverable(device, True))
4527        self.assert_on_fail(self.test_discover_device(device.address))
4528        self.assert_on_fail(
4529                self.test_pairing(device.address, device.pin, trusted=True))
4530
4531    def run_once(self, *args, **kwargs):
4532        """This method should be implemented by children classes.
4533
4534        Typically, the run_once() method would look like:
4535
4536        factory = remote_facade_factory.RemoteFacadeFactory(host)
4537        self.bluetooth_facade = factory.create_bluetooth_facade()
4538
4539        self.test_bluetoothd_running()
4540        # ...
4541        # invoke more self.test_xxx() tests.
4542        # ...
4543
4544        if self.fails:
4545            raise error.TestFail(self.fails)
4546
4547        """
4548        raise NotImplementedError
4549
4550
4551    def cleanup(self, test_state='END'):
4552        """Clean up bluetooth adapter tests.
4553
4554        @param test_state: string describing the requested clear is for
4555                           a new test(NEW), the middle of the test(MID),
4556                           or the end of the test(END).
4557        """
4558
4559        if test_state == 'END':
4560            # Disable all the bluetooth debug logs
4561            self.enable_disable_debug_log(enable=False)
4562
4563            # Re-enable cellular services
4564            self.enable_disable_cellular(enable=True)
4565
4566            # Re-enable ui
4567            self.enable_disable_ui(enable=True)
4568
4569            if hasattr(self, 'host'):
4570                # Stop btmon process
4571                self.host.run('pkill btmon || true')
4572
4573                #Stop tcpdump usbmon process
4574                self.host.run('pkill tcpdump || true')
4575
4576
4577        # Close the device properly if a device is instantiated.
4578        # Note: do not write something like the following statements
4579        #           if self.devices[device_type]:
4580        #       or
4581        #           if bool(self.devices[device_type]):
4582        #       Otherwise, it would try to invoke bluetooth_mouse.__nonzero__()
4583        #       which just does not exist.
4584        for device_name, device_list  in self.devices.items():
4585            for device in device_list:
4586                if device is not None:
4587                    device.Close()
4588
4589                    # Power cycle BT device if we're in the middle of a test
4590                    if test_state == 'MID':
4591                        device.PowerCycle()
4592
4593        self.devices = dict()
4594        for device_type in SUPPORTED_DEVICE_TYPES:
4595            self.devices[device_type] = list()
4596