1#!/usr/bin/env python3 2# 3# Copyright 2022 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import queue 18import logging 19 20from google.protobuf import empty_pb2 as empty_proto 21 22import hci_packets as hci 23 24from blueberry.tests.gd_sl4a.lib import gd_sl4a_base_test 25from blueberry.tests.gd_sl4a.lib.bt_constants import ble_scan_settings_phys 26from blueberry.tests.gd.cert.matchers import SecurityMatchers 27from blueberry.tests.gd.cert.py_le_security import PyLeSecurity 28from blueberry.tests.gd.cert.truth import assertThat 29 30from blueberry.facade import common_pb2 as common 31from blueberry.facade.hci import le_advertising_manager_facade_pb2 as le_advertising_facade 32from blueberry.facade.hci import le_initiator_address_facade_pb2 as le_initiator_address_facade 33from blueberry.facade.security.facade_pb2 import AdvertisingCallbackMsgType 34from blueberry.facade.security.facade_pb2 import BondMsgType 35from blueberry.facade.security.facade_pb2 import LeAuthRequirementsMessage 36from blueberry.facade.security.facade_pb2 import LeIoCapabilityMessage 37from blueberry.facade.security.facade_pb2 import LeOobDataPresentMessage 38from blueberry.facade.security.facade_pb2 import UiCallbackMsg 39from blueberry.facade.security.facade_pb2 import UiCallbackType 40from blueberry.facade.security.facade_pb2 import UiMsgType 41 42from mobly import test_runner 43 44LeIoCapabilities = LeIoCapabilityMessage.LeIoCapabilities 45LeOobDataFlag = LeOobDataPresentMessage.LeOobDataFlag 46 47DISPLAY_ONLY = LeIoCapabilityMessage(capabilities=LeIoCapabilities.DISPLAY_ONLY) 48 49OOB_NOT_PRESENT = LeOobDataPresentMessage(data_present=LeOobDataFlag.NOT_PRESENT) 50 51 52class OobData: 53 54 address = None 55 confirmation = None 56 randomizer = None 57 58 def __init__(self, address, confirmation, randomizer): 59 self.address = address 60 self.confirmation = confirmation 61 self.randomizer = randomizer 62 63 64class OobPairingSl4aTest(gd_sl4a_base_test.GdSl4aBaseTestClass): 65 # Events sent from SL4A 66 SL4A_EVENT_GENERATED = "GeneratedOobData" 67 SL4A_EVENT_ERROR = "ErrorOobData" 68 SL4A_EVENT_BONDED = "Bonded" 69 SL4A_EVENT_UNBONDED = "Unbonded" 70 71 # Matches tBT_TRANSPORT 72 # Used Strings because ints were causing gRPC problems 73 TRANSPORT_AUTO = "0" 74 TRANSPORT_BREDR = "1" 75 TRANSPORT_LE = "2" 76 77 def setup_class(self): 78 super().setup_class(cert_module='SECURITY') 79 self.default_timeout = 5 # seconds 80 81 def setup_test(self): 82 super().setup_test() 83 self.cert_security = PyLeSecurity(self.cert) 84 85 def teardown_test(self): 86 self.cert_security.close() 87 super().teardown_test() 88 89 def __get_test_irk(self): 90 return bytes( 91 bytearray([0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10])) 92 93 def _generate_sl4a_oob_data(self, transport): 94 logging.info("Fetching OOB data...") 95 self.dut.sl4a.bluetoothGenerateLocalOobData(transport) 96 try: 97 event_info = self.dut.ed.pop_event(self.SL4A_EVENT_GENERATED, self.default_timeout) 98 except queue.Empty as error: 99 logging.error("Failed to generate OOB data!") 100 return None 101 logging.info("Data received!") 102 logging.info(event_info["data"]) 103 return OobData(event_info["data"]["address_with_type"], event_info["data"]["confirmation"], 104 event_info["data"]["randomizer"]) 105 106 def _generate_cert_oob_data(self, transport): 107 if transport == self.TRANSPORT_LE: 108 oob_data = self.cert.security.GetLeOutOfBandData(empty_proto.Empty()) 109 # GetLeOutOfBandData adds null terminator to string in C code 110 # (length 17) before passing back to python via gRPC where it is 111 # converted back to bytes. Remove the null terminator for handling 112 # in python test, since length is known to be 16 for 113 # confirmation_value and random_value 114 oob_data.confirmation_value = oob_data.confirmation_value[:-1] 115 oob_data.random_value = oob_data.random_value[:-1] 116 return oob_data 117 return None 118 119 def _set_cert_privacy_policy_with_random_address_but_advertise_resolvable(self, irk): 120 # Random static address below, no random resolvable address at this point 121 random_address_bytes = "DD:34:02:05:5C:EE".encode() 122 private_policy = le_initiator_address_facade.PrivacyPolicy( 123 address_policy=le_initiator_address_facade.AddressPolicy.USE_RESOLVABLE_ADDRESS, 124 address_with_type=common.BluetoothAddressWithType( 125 address=common.BluetoothAddress(address=random_address_bytes), type=common.RANDOM_DEVICE_ADDRESS), 126 rotation_irk=irk) 127 self.cert.security.SetLeInitiatorAddressPolicy(private_policy) 128 # Bluetooth MAC address must be upper case 129 return random_address_bytes.decode('utf-8').upper() 130 131 def __advertise_rpa_random_policy(self, legacy_pdus, irk): 132 DEVICE_NAME = 'Im_The_CERT!' 133 logging.info("Getting random address") 134 ADDRESS = self._set_cert_privacy_policy_with_random_address_but_advertise_resolvable(irk) 135 logging.info("Done %s" % ADDRESS) 136 137 # Setup cert side to advertise 138 gap_name = hci.GapData(data_type=hci.GapDataType.COMPLETE_LOCAL_NAME, 139 data=list(bytes(DEVICE_NAME, encoding='utf8'))) 140 gap_data = le_advertising_facade.GapDataMsg(data=gap_name.serialize()) 141 config = le_advertising_facade.AdvertisingConfig( 142 advertisement=[gap_data], 143 interval_min=512, 144 interval_max=768, 145 advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, 146 own_address_type=common.USE_RANDOM_DEVICE_ADDRESS, 147 channel_map=7, 148 filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) 149 extended_config = le_advertising_facade.ExtendedAdvertisingConfig( 150 include_tx_power=True, 151 connectable=True, 152 legacy_pdus=legacy_pdus, 153 advertising_config=config, 154 secondary_advertising_phy=ble_scan_settings_phys["1m"]) 155 request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config) 156 logging.info("Creating %s PDU advertiser..." % ("Legacy" if legacy_pdus else "Extended")) 157 create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request) 158 logging.info("%s PDU advertiser created." % ("Legacy" if legacy_pdus else "Extended")) 159 return (ADDRESS, create_response) 160 161 def _advertise_rpa_random_legacy_pdu(self, irk): 162 return self.__advertise_rpa_random_policy(True, irk) 163 164 def _advertise_rpa_random_extended_pdu(self, irk): 165 return self.__advertise_rpa_random_policy(False, irk) 166 167 def _stop_advertising(self, advertiser_id): 168 logging.info("Stop advertising") 169 remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=advertiser_id) 170 self.cert.hci_le_advertising_manager.RemoveAdvertiser(remove_request) 171 logging.info("Stopped advertising") 172 173 def _wait_for_own_address(self): 174 own_address = common.BluetoothAddress() 175 176 def get_address(event): 177 if event.message_type == AdvertisingCallbackMsgType.OWN_ADDRESS_READ: 178 nonlocal own_address 179 own_address = event.address.address 180 return True 181 return False 182 183 assertThat(self.cert_security.get_advertising_callback_event_stream()).emits(get_address) 184 return own_address 185 186 def _wait_for_advertising_set_started(self): 187 advertising_started = False 188 189 def get_advertising_set_started(event): 190 if event.message_type == AdvertisingCallbackMsgType.ADVERTISING_SET_STARTED: 191 nonlocal advertising_started 192 if event.advertising_started == 1: 193 advertising_started = True 194 return True 195 return False 196 197 assertThat(self.cert_security.get_advertising_callback_event_stream()).emits(get_advertising_set_started) 198 return advertising_started 199 200 def _wait_for_yes_no_dialog(self): 201 address_with_type = common.BluetoothAddressWithType() 202 203 def get_address_with_type(event): 204 if event.message_type == UiMsgType.DISPLAY_PAIRING_PROMPT: 205 nonlocal address_with_type 206 address_with_type = event.peer 207 return True 208 return False 209 210 assertThat(self.cert_security.get_ui_stream()).emits(get_address_with_type) 211 return address_with_type 212 213 def test_sl4a_classic_generate_oob_data(self): 214 oob_data = self._generate_sl4a_oob_data(self.TRANSPORT_BREDR) 215 logging.info("OOB data received") 216 logging.info(oob_data) 217 assertThat(oob_data).isNotNone() 218 219 def test_sl4a_classic_generate_oob_data_twice(self): 220 self.test_sl4a_classic_generate_oob_data() 221 self.test_sl4a_classic_generate_oob_data() 222 223 def test_sl4a_ble_generate_oob_data(self): 224 oob_data = self._generate_sl4a_oob_data(self.TRANSPORT_LE) 225 assertThat(oob_data).isNotNone() 226 227 def test_cert_ble_generate_oob_data(self): 228 oob_data = self._generate_cert_oob_data(self.TRANSPORT_LE) 229 assertThat(oob_data).isNotNone() 230 231 def _bond_sl4a_cert_oob(self): 232 self.cert.security.SetLeIoCapability(DISPLAY_ONLY) 233 self.cert.security.SetLeOobDataPresent(OOB_NOT_PRESENT) 234 self.cert.security.SetLeAuthRequirements(LeAuthRequirementsMessage(bond=1, mitm=1, secure_connections=1)) 235 236 RANDOM_ADDRESS, create_response = self._advertise_rpa_random_extended_pdu(self.__get_test_irk()) 237 238 self._wait_for_advertising_set_started() 239 240 get_own_address_request = le_advertising_facade.GetOwnAddressRequest( 241 advertiser_id=create_response.advertiser_id) 242 self.cert.hci_le_advertising_manager.GetOwnAddress(get_own_address_request) 243 advertising_address = self._wait_for_own_address() 244 245 oob_data = self._generate_cert_oob_data(self.TRANSPORT_LE) 246 assertThat(oob_data).isNotNone() 247 248 self.dut.sl4a.bluetoothCreateBondOutOfBand( 249 advertising_address.decode("utf-8").upper(), self.TRANSPORT_LE, oob_data.confirmation_value.hex(), 250 oob_data.random_value.hex()) 251 252 address_with_type = self._wait_for_yes_no_dialog() 253 self.cert.security.SendUiCallback( 254 UiCallbackMsg(message_type=UiCallbackType.PAIRING_PROMPT, 255 boolean=True, 256 unique_id=1, 257 address=address_with_type)) 258 259 assertThat(self.cert_security.get_bond_stream()).emits(SecurityMatchers.BondMsg(BondMsgType.DEVICE_BONDED)) 260 261 try: 262 bond_state = self.dut.ed.pop_event(self.SL4A_EVENT_BONDED, self.default_timeout) 263 except queue.Empty as error: 264 logging.error("Failed to get bond event!") 265 266 assertThat(bond_state).isNotNone() 267 logging.info("Bonded: %s", bond_state["data"]["bonded_state"]) 268 assertThat(bond_state["data"]["bonded_state"]).isEqualTo(True) 269 self._stop_advertising(create_response.advertiser_id) 270 return (RANDOM_ADDRESS, advertising_address) 271 272 def test_sl4a_create_bond_out_of_band(self): 273 self._bond_sl4a_cert_oob() 274 275 def test_generate_oob_after_pairing(self): 276 self._bond_sl4a_cert_oob() 277 assertThat(self._generate_sl4a_oob_data(self.TRANSPORT_LE)).isNotNone() 278 279 def test_remove_bond(self): 280 self.dut.sl4a.bluetoothUnbond(self._bond_sl4a_cert_oob()[1].decode().upper()) 281 bond_state = None 282 try: 283 bond_state = self.dut.ed.pop_event(self.SL4A_EVENT_UNBONDED, self.default_timeout) 284 except queue.Empty as error: 285 logging.error("Failed to get bond event!") 286 287 assertThat(bond_state).isNotNone() 288 assertThat(bond_state["data"]["bonded_state"]).isEqualTo(False) 289 290 291if __name__ == '__main__': 292 test_runner.main() 293