1# Lint as: python2, python3 2# Copyright 2020 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""A class provides most of the test logic for Bluetooth Better Together""" 7 8from __future__ import absolute_import 9from __future__ import division 10from __future__ import print_function 11 12import logging 13import base64 14import json 15 16import common 17from autotest_lib.server.cros.bluetooth.bluetooth_adapter_quick_tests import ( 18 BluetoothAdapterQuickTests) 19from autotest_lib.server.cros.bluetooth.bluetooth_adapter_pairing_tests import ( 20 BluetoothAdapterPairingTests) 21from autotest_lib.server.cros.bluetooth.bluetooth_adapter_tests import ( 22 test_retry_and_log) 23from six.moves import range 24 25class BluetoothAdapterBetterTogether(BluetoothAdapterQuickTests, 26 BluetoothAdapterPairingTests): 27 """A Batch of Bluetooth LE tests for Better Together. This test is written 28 as a batch of tests in order to reduce test time, since auto-test ramp up 29 time is costly. The batch is using BluetoothAdapterQuickTests wrapper 30 methods to start and end a test and a batch of tests. 31 32 This class can be called to run the entire test batch or to run a 33 specific test only 34 """ 35 36 BETTER_TOGETHER_SERVICE_UUID = 'b3b7e28e-a000-3e17-bd86-6e97b9e28c11' 37 CLIENT_RX_CHARACTERISTIC_UUID = '00000100-0004-1000-8000-001a11000102' 38 CLIENT_TX_CHARACTERISTIC_UUID = '00000100-0004-1000-8000-001a11000101' 39 CCCD_VALUE_INDICATION = 0x02 40 TEST_ITERATION = 3 41 42 # The following messages were captured during a smart unlock process. These 43 # are the messages exchanged between the phone and the chromebook to 44 # authorize each other in order to unlock the chromebook. 45 CONNECTION_OPEN_MESSAGE = b'\x80\x00\x01\x00\x01\x00\x00' 46 MESSAGE_1 = b'\x1c\x03\x00\xc7\x7b\x22\x66\x65\x61\x74\x75\x72' \ 47 b'\x65\x22\x3a\x22\x61\x75\x74\x68\x22\x2c\x22\x70' \ 48 b'\x61\x79\x6c\x6f\x61\x64\x22\x3a\x22\x43\x6c\x6b' \ 49 b'\x4b\x56\x41\x67\x42\x45\x41\x45\x79\x54\x67\x70' \ 50 b'\x4b\x43\x41\x45\x53\x52\x67\x6f\x68\x41\x49\x37' \ 51 b'\x6e\x78\x63\x34\x6b\x44\x4d\x68\x6d\x68\x6e\x4d' \ 52 b'\x75\x69\x7a\x79\x62\x47\x54\x5f\x31\x4e\x6c\x52' \ 53 b'\x4c\x6c\x6d\x32\x59\x34\x72\x35\x78\x6f\x6e\x69' \ 54 b'\x47\x51\x35\x49\x6a\x45\x69\x45\x41\x75\x45\x64' \ 55 b'\x43\x4c\x37\x36\x44\x70\x30\x70\x6a\x6a\x75\x74' \ 56 b'\x56\x6a\x47\x35\x38\x62\x55\x49\x57\x42\x57\x49' \ 57 b'\x41\x4f\x5a\x39\x38\x46\x43\x49\x4f\x79\x58\x68' \ 58 b'\x2d\x6b\x66\x6f\x51\x41\x52\x49\x42\x72\x68\x49' \ 59 b'\x67\x7a\x35\x62\x49\x37\x53\x6f\x59\x47\x56\x36' \ 60 b'\x6b\x4e\x4e\x74\x6d\x73\x59\x53\x6a\x75\x68\x4d' \ 61 b'\x68\x61\x66\x51\x6c\x4d\x44\x55\x68\x78\x42\x36' \ 62 b'\x44\x50\x52\x79\x4c\x5a\x62\x30\x3d\x22\x7d' 63 64 MESSAGE_2 = b'\x2c\x03\x00\xfb\x7b\x22\x66\x65\x61\x74\x75\x72' \ 65 b'\x65\x22\x3a\x22\x61\x75\x74\x68\x22\x2c\x22\x70' \ 66 b'\x61\x79\x6c\x6f\x61\x64\x22\x3a\x22\x43\x6f\x41' \ 67 b'\x42\x43\x68\x77\x49\x41\x52\x41\x43\x4b\x68\x43' \ 68 b'\x34\x69\x38\x45\x47\x73\x76\x71\x6e\x78\x5f\x66' \ 69 b'\x66\x4c\x33\x59\x42\x61\x6b\x35\x59\x4d\x67\x51' \ 70 b'\x49\x44\x52\x41\x42\x45\x6d\x42\x41\x71\x5f\x47' \ 71 b'\x62\x72\x49\x42\x6c\x42\x42\x76\x73\x6a\x73\x32' \ 72 b'\x67\x47\x56\x59\x70\x76\x73\x50\x6f\x64\x57\x6e' \ 73 b'\x70\x50\x42\x6e\x5a\x71\x41\x61\x46\x6b\x30\x59' \ 74 b'\x54\x48\x76\x79\x6b\x55\x76\x6b\x4a\x79\x6c\x35' \ 75 b'\x56\x54\x7a\x48\x53\x35\x43\x6e\x41\x65\x56\x4d' \ 76 b'\x79\x38\x49\x76\x44\x5f\x67\x65\x6d\x4e\x65\x42' \ 77 b'\x61\x76\x58\x70\x70\x52\x62\x4b\x43\x42\x34\x5f' \ 78 b'\x35\x35\x79\x4a\x50\x4e\x6b\x5f\x76\x45\x66\x53' \ 79 b'\x38\x57\x5a\x7a\x6d\x58\x64\x5a\x4d\x68\x72\x74' \ 80 b'\x31\x61\x6d\x67\x46\x7a\x6e\x35\x4b\x65\x61\x2d' \ 81 b'\x77\x5f\x58\x55\x53\x49\x47\x54\x6e\x55\x33\x6d' \ 82 b'\x68\x45\x36\x67\x63\x5a\x4c\x4b\x49\x52\x79\x4d' \ 83 b'\x61\x39\x68\x78\x41\x4f\x30\x4b\x6f\x7a\x6d\x68' \ 84 b'\x43\x71\x6d\x4d\x42\x54\x4a\x6e\x57\x4e\x6f\x52' \ 85 b'\x62\x22\x7d' 86 87 MESSAGE_3 = b'\x3c\x03\x00\xae\x7b\x22\x66\x65\x61\x74\x75\x72' \ 88 b'\x65\x22\x3a\x22\x65\x61\x73\x79\x5f\x75\x6e\x6c' \ 89 b'\x6f\x63\x6b\x22\x2c\x22\x70\x61\x79\x6c\x6f\x61' \ 90 b'\x64\x22\x3a\x22\x43\x6b\x41\x4b\x48\x41\x67\x42' \ 91 b'\x45\x41\x49\x71\x45\x44\x30\x72\x52\x69\x54\x77' \ 92 b'\x32\x41\x6e\x6e\x5f\x5f\x5f\x59\x33\x6c\x68\x55' \ 93 b'\x53\x65\x45\x79\x42\x41\x67\x4e\x45\x41\x45\x53' \ 94 b'\x49\x46\x59\x61\x64\x36\x33\x43\x75\x52\x34\x67' \ 95 b'\x37\x77\x4d\x41\x4b\x61\x65\x76\x2d\x72\x7a\x38' \ 96 b'\x66\x64\x6f\x47\x46\x2d\x67\x44\x4c\x6a\x37\x50' \ 97 b'\x47\x62\x43\x6f\x57\x54\x66\x6f\x45\x69\x43\x38' \ 98 b'\x4a\x56\x62\x4c\x48\x75\x49\x35\x42\x76\x38\x43' \ 99 b'\x4c\x74\x73\x53\x51\x35\x50\x4c\x6e\x72\x5a\x41' \ 100 b'\x70\x68\x6b\x75\x4c\x78\x2d\x56\x59\x64\x6c\x32' \ 101 b'\x53\x4d\x71\x5f\x36\x41\x3d\x3d\x22\x7d' 102 103 MESSAGE_4 = b'\x4c\x03\x00\xc2\x7b\x22\x66\x65\x61\x74\x75\x72' \ 104 b'\x65\x22\x3a\x22\x65\x61\x73\x79\x5f\x75\x6e\x6c' \ 105 b'\x6f\x63\x6b\x22\x2c\x22\x70\x61\x79\x6c\x6f\x61' \ 106 b'\x64\x22\x3a\x22\x43\x6c\x41\x4b\x48\x41\x67\x42' \ 107 b'\x45\x41\x49\x71\x45\x4d\x61\x4e\x6c\x75\x43\x56' \ 108 b'\x63\x72\x59\x37\x68\x70\x34\x68\x46\x74\x69\x6f' \ 109 b'\x45\x4d\x6b\x79\x42\x41\x67\x4e\x45\x41\x45\x53' \ 110 b'\x4d\x47\x6c\x68\x33\x55\x68\x44\x77\x58\x4b\x70' \ 111 b'\x58\x46\x5f\x5a\x6e\x4f\x61\x30\x36\x4a\x48\x4b' \ 112 b'\x69\x6b\x56\x68\x51\x62\x32\x50\x4d\x36\x62\x62' \ 113 b'\x76\x78\x51\x6a\x47\x50\x47\x73\x66\x79\x42\x5a' \ 114 b'\x74\x52\x67\x39\x5f\x65\x36\x6f\x42\x5a\x79\x5f' \ 115 b'\x74\x35\x42\x45\x74\x78\x49\x67\x51\x75\x37\x42' \ 116 b'\x62\x6e\x75\x4e\x57\x64\x67\x6d\x42\x36\x4d\x47' \ 117 b'\x75\x68\x54\x79\x34\x33\x43\x37\x32\x2d\x58\x44' \ 118 b'\x58\x35\x4b\x4f\x6a\x67\x6d\x56\x74\x48\x58\x41' \ 119 b'\x68\x4e\x67\x3d\x22\x7d' 120 121 CONNECTION_CLOSE_MESSAGE = b'\xd2\x00\x00' 122 123 def test_smart_unlock(self, address): 124 """Simulate the Smart Unlock flow, here are the steps that involve 125 1. Set the discovery filter to match LE devices only. 126 2. Start the discovery. 127 3. Stop the discovery after the device was found. 128 4. Set the LE connection parameters to reduce the min and max 129 connection intervals to 6. 130 5. Connect the device. 131 6. Set the Trusted property of the device to true. 132 7. Verify all the services were resolved. 133 8. Start notification on the RX characteristic of the 134 Proximity Service. 135 9. Exchange some messages with the peer device to authorize it. 136 10. Stop the notification. 137 11. Disconnect the device. 138 """ 139 140 filter = {'Transport':'le'} 141 parameters = {'MinimumConnectionInterval':6, 142 'MaximumConnectionInterval':6} 143 144 # We don't use the control file for iteration since it will involve the 145 # device setup steps which don't reflect the real user scenario. 146 for i in range(self.TEST_ITERATION): 147 logging.debug("Test iteration %d", i) 148 self.test_set_discovery_filter(filter) 149 self.test_discover_device(address) 150 151 self.test_set_le_connection_parameters(address, parameters) 152 self.test_connection_by_adapter(address) 153 154 self.test_set_trusted(address) 155 self.test_service_resolved(address) 156 self.test_find_object_path(address) 157 158 self.test_start_notify(self.rx_object_path, 159 self.CCCD_VALUE_INDICATION) 160 self.test_messages_exchange( 161 self.rx_object_path, self.tx_object_path, address) 162 self.test_stop_notify(self.rx_object_path) 163 self.test_disconnection_by_adapter(address) 164 165 self.test_remove_device_object(address) 166 167 return True 168 169 170 @test_retry_and_log(False) 171 def test_remove_device_object(self, address): 172 """Test the device object can be removed from the adapter""" 173 return self.bluetooth_facade.remove_device_object(address) 174 175 176 @test_retry_and_log(False) 177 def test_find_object_path(self, address): 178 """Test the object path can be found for a device""" 179 self.tx_object_path = None 180 self.rx_object_path = None 181 attr_map = self.bluetooth_facade.get_gatt_attributes_map(address) 182 attr_map_json = json.loads(attr_map) 183 servs_json = attr_map_json['services'] 184 for uuid_s in servs_json: 185 if uuid_s == self.BETTER_TOGETHER_SERVICE_UUID: 186 chrcs_json = servs_json[uuid_s]['characteristics'] 187 for uuid_c in chrcs_json: 188 if uuid_c == self.CLIENT_TX_CHARACTERISTIC_UUID: 189 self.tx_object_path = chrcs_json[uuid_c]['path'] 190 if uuid_c == self.CLIENT_RX_CHARACTERISTIC_UUID: 191 self.rx_object_path = chrcs_json[uuid_c]['path'] 192 193 return self.tx_object_path and self.rx_object_path 194 195 196 @test_retry_and_log(False) 197 def test_messages_exchange( 198 self, rx_object_path, tx_object_path, address): 199 """Test message exchange with the peer, Better Together performs the 200 messages exchange for authorizing the peer device before unlocking 201 the Chromebook. 202 """ 203 if rx_object_path is None or tx_object_path is None: 204 logging.error('Invalid object path') 205 return False 206 207 self.test_message_exchange( 208 rx_object_path, 209 tx_object_path, 210 self.CONNECTION_OPEN_MESSAGE, 211 'connection open message') 212 213 self.test_message_exchange( 214 rx_object_path, 215 tx_object_path, 216 self.MESSAGE_1, 217 'message 1') 218 219 self.test_message_exchange( 220 rx_object_path, 221 tx_object_path, 222 self.MESSAGE_2, 223 'message 2') 224 225 # Better Together gets connection info multiple times to ensure 226 # the device is close enough by checking the RSSI, here we simulate it. 227 for i in range(9): 228 self.test_get_connection_info(address) 229 230 self.test_message_exchange( 231 rx_object_path, 232 tx_object_path, 233 self.MESSAGE_3, 234 'message 3') 235 236 self.test_message_exchange( 237 rx_object_path, 238 tx_object_path, 239 self.MESSAGE_4, 240 'message 4') 241 242 for i in range(2): 243 self.test_get_connection_info(address) 244 245 self.test_message_exchange( 246 rx_object_path, 247 tx_object_path, 248 self.CONNECTION_CLOSE_MESSAGE, 249 'connection close message') 250 251 return True 252 253 254 @test_retry_and_log(False) 255 def test_message_exchange( 256 self, rx_object_path, tx_object_path, message, message_type): 257 """Test message exchange between DUT and the peer device""" 258 ret_msg = self.bluetooth_facade.exchange_messages( 259 tx_object_path, rx_object_path, message) 260 261 if not ret_msg: 262 logging.error("Failed to write message: %s", message_type) 263 return False 264 265 return base64.standard_b64decode(ret_msg) == message 266