1# Lint as: python2, python3
2# Copyright 2019 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""
7Server side bluetooth tests on adapter pairing and connecting to a bluetooth
8HID device.
9"""
10
11from __future__ import absolute_import
12from __future__ import division
13from __future__ import print_function
14
15import logging
16import time
17
18import common
19from autotest_lib.server.cros.bluetooth import bluetooth_adapter_tests
20from six.moves import range
21
22
23class BluetoothAdapterPairingTests(
24        bluetooth_adapter_tests.BluetoothAdapterTests):
25    """Server side bluetooth adapter pairing and connecting to bluetooth device
26
27    This test tries to verify that the adapter of the DUT could
28    pair and connect to a bluetooth HID device correctly.
29
30    In particular, the following subtests are performed. Look at the
31    docstrings of the subtests for more details.
32    -
33
34    Refer to BluetoothAdapterTests for all subtests performed in this test.
35
36    """
37
38    # TODO(josephsih): Reduce the sleep intervals to speed up the tests.
39    PAIR_TEST_SLEEP_SECS = 5
40
41    def pairing_test(self, device, check_connected_method=lambda device: True,
42                     pairing_twice=False, suspend_resume=False, reboot=False):
43        """Running Bluetooth adapter tests about pairing to a device."""
44
45        # Reset the adapter to forget previously paired devices if any.
46        self.test_reset_on_adapter()
47
48        # The adapter must be set to the pairable state.
49        self.test_pairable()
50
51        # Test if the adapter could discover the target device.
52        time.sleep(self.PAIR_TEST_SLEEP_SECS)
53        self.test_discover_device(device.address)
54
55        # Test if the discovered device class of service is correct.
56        self.test_device_class_of_service(device.address,
57                                          device.class_of_service)
58
59        # Test if the discovered device class of device is correct.
60        self.test_device_class_of_device(device.address,
61                                         device.class_of_device)
62
63        # Verify that the adapter could pair with the device.
64        # Also set the device trusted when pairing is done.
65        # Device will be connected at the end of pairing.
66        self.test_pairing(device.address, device.pin, trusted=True)
67
68        # Test if the discovered device name is correct.
69        # Sometimes, it takes quite a long time after discovering
70        # the device (more than 60 seconds) to resolve the device name.
71        # Hence, it is safer to test the device name after pairing and
72        # connection is done.
73        self.test_device_name(device.address, device.name)
74
75        # Run hid test to make sure profile is connected
76        check_connected_method(device)
77
78        # Test if the device is still connected after suspend/resume.
79        if suspend_resume:
80            self.suspend_resume()
81
82            time.sleep(self.PAIR_TEST_SLEEP_SECS)
83            self.test_device_is_paired(device.address)
84
85
86            # check if peripheral is connected after suspend resume
87            if not self.ignore_failure(check_connected_method, device):
88                logging.info("device not connected after suspend_resume")
89                self.test_connection_by_device(device)
90            else:
91                logging.info("device remains connected after suspend_resume")
92
93            time.sleep(self.PAIR_TEST_SLEEP_SECS)
94            check_connected_method(device)
95
96            time.sleep(self.PAIR_TEST_SLEEP_SECS)
97            self.test_device_name(device.address, device.name)
98
99        # Test if the device is still connected after reboot.
100        # if reboot:
101        #     self.host.reboot()
102
103        #     time.sleep(self.PAIR_TEST_SLEEP_SECS)
104        #     self.test_device_is_paired(device.address)
105
106        #     # After a reboot, we need to wake the peripheral
107        #     # as it is not connected.
108        #     time.sleep(self.PAIR_TEST_SLEEP_SECS)
109        #     self.test_connection_by_adapter(device.address)
110
111        #     time.sleep(self.PAIR_TEST_SLEEP_SECS)
112        #     self.test_device_is_connected(device.address)
113
114        #     time.sleep(self.PAIR_TEST_SLEEP_SECS)
115        #     self.test_device_name(device.address, device.name)
116
117        # Verify that the adapter could disconnect the device.
118        self.test_disconnection_by_adapter(device.address)
119
120        time.sleep(self.PAIR_TEST_SLEEP_SECS)
121        if device.can_init_connection:
122            # Verify that the device could initiate the connection.
123            self.test_connection_by_device(device)
124
125            # With raspberry pi peer, it takes a moment before the device is
126            # registered as an input device. Without delay, the input recorder
127            # doesn't find the device
128            time.sleep(1)
129            check_connected_method(device)
130        else:
131            # Reconnect so that we can test disconnection from the kit
132            self.test_connection_by_adapter(device.address)
133
134        # TODO(alent): Needs a new capability, but this is a good proxy
135        if device.can_init_connection:
136            # Verify that the device could initiate the disconnection.
137            self.test_disconnection_by_device(device)
138        else:
139            # Reconnect so that we can test disconnection from the kit
140            self.test_disconnection_by_adapter(device.address)
141
142        # Verify that the adapter could remove the paired device.
143        self.test_remove_pairing(device.address)
144
145        # Check if the device could be re-paired after being forgotten.
146        if pairing_twice:
147            # Test if the adapter could discover the target device again.
148            time.sleep(self.PAIR_TEST_SLEEP_SECS)
149            self.test_discover_device(device.address)
150
151            # Verify that the adapter could pair with the device again.
152            # Also set the device trusted when pairing is done.
153            time.sleep(self.PAIR_TEST_SLEEP_SECS)
154            self.test_pairing(device.address, device.pin, trusted=True)
155
156            # Verify that the adapter could remove the paired device again.
157            time.sleep(self.PAIR_TEST_SLEEP_SECS)
158            self.test_remove_pairing(device.address)
159
160
161    def connect_disconnect_loop(self, device, loops):
162        """Perform a connect disconnect loop test"""
163
164        # First pair and disconnect, to emulate real life scenario
165        self.test_discover_device(device.address)
166        # self.bluetooth_facade.is_discovering() doesn't work as expected:
167        # crbug:905374
168        # self.test_stop_discovery()
169        time.sleep(self.PAIR_TEST_SLEEP_SECS)
170        self.test_pairing(device.address, device.pin, trusted=True)
171
172        # Verify device is now connected
173        self.test_device_is_connected(device.address)
174        self.test_hid_device_created(device.address)
175
176        # Disconnect the device
177        self.test_disconnection_by_adapter(device.address)
178        total_duration_by_adapter = 0
179        loop_cnt = 0
180        for i in range(0, loops):
181
182            # Verify device didn't connect automatically
183            time.sleep(2)
184            self.test_device_is_not_connected(device.address)
185
186            start_time = time.time()
187            self.test_connection_by_adapter(device.address)
188            end_time = time.time()
189            time_diff = end_time - start_time
190
191            # Verify device is now connected
192            self.test_device_is_connected(device.address)
193            self.test_hid_device_created(device.address)
194
195            self.test_disconnection_by_adapter(device.address)
196
197            if not bool(self.fails):
198                loop_cnt += 1
199                total_duration_by_adapter += time_diff
200                logging.info('%d: Connection establishment duration %f sec',
201                             i, time_diff)
202            else:
203                break
204
205        if not bool(self.fails):
206            logging.info('Average duration (by adapter) %f sec',
207                         total_duration_by_adapter/loop_cnt)
208
209
210    def auto_reconnect_loop(self,
211                            device,
212                            loops,
213                            check_connected_method=lambda device: True,
214                            restart_adapter=False):
215        """Running a loop to verify the paired peer can auto reconnect"""
216
217        # Let the adapter pair, and connect to the target device first
218        self.test_discover_device(device.address)
219        self.test_pairing(device.address, device.pin, trusted=True)
220
221        # Verify device is now connected
222        self.test_connection_by_adapter(device.address)
223        self.test_hid_device_created(device.address)
224
225        total_reconnection_duration = 0
226        loop_cnt = 0
227        for i in range(loops):
228            # Restart either the adapter or the peer
229            if restart_adapter:
230                self.test_power_off_adapter()
231                self.test_power_on_adapter()
232                start_time = time.time()
233            else:
234                # Restart and clear peer HID device
235                self.restart_peers()
236                start_time = time.time()
237
238            # Verify that the device is reconnected. Wait for the input device
239            # to become available before checking the profile connection.
240            self.test_device_is_connected(device.address)
241            self.test_hid_device_created(device.address)
242
243            check_connected_method(device)
244            end_time = time.time()
245            time_diff = end_time - start_time
246
247            if not bool(self.fails):
248                total_reconnection_duration += time_diff
249                loop_cnt += 1
250                logging.info('%d: Reconnection duration %f sec', i, time_diff)
251            else:
252                break
253
254        if not bool(self.fails):
255            logging.info('Average Reconnection duration %f sec',
256                         total_reconnection_duration/loop_cnt)
257