1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18 19from blueberry.tests.gd.cert.captures import HciCaptures 20from blueberry.tests.gd.cert.closable import safeClose 21from blueberry.tests.gd.cert.event_stream import EventStream 22from blueberry.tests.gd.cert.matchers import HciMatchers 23from blueberry.tests.gd.cert.py_hci import PyHci 24from blueberry.tests.gd.cert.py_security import PySecurity 25from blueberry.tests.gd.cert.truth import assertThat 26from google.protobuf import empty_pb2 as empty_proto 27from blueberry.facade.l2cap.classic import facade_pb2 as l2cap_facade 28from blueberry.facade.security.facade_pb2 import IoCapabilities 29from blueberry.facade.security.facade_pb2 import AuthenticationRequirements 30from blueberry.facade.security.facade_pb2 import OobDataPresent 31from blueberry.utils import bluetooth 32import hci_packets as hci 33 34 35class CertSecurity(PySecurity): 36 """ 37 Contain all of the certification stack logic for sending and receiving 38 HCI commands following the Classic Pairing flows. 39 """ 40 _io_cap_lookup = { 41 IoCapabilities.DISPLAY_ONLY: hci.IoCapability.DISPLAY_ONLY, 42 IoCapabilities.DISPLAY_YES_NO_IO_CAP: hci.IoCapability.DISPLAY_YES_NO, 43 IoCapabilities.KEYBOARD_ONLY: hci.IoCapability.KEYBOARD_ONLY, 44 IoCapabilities.NO_INPUT_NO_OUTPUT: hci.IoCapability.NO_INPUT_NO_OUTPUT, 45 } 46 47 _auth_req_lookup = { 48 AuthenticationRequirements.NO_BONDING: 49 hci.AuthenticationRequirements.NO_BONDING, 50 AuthenticationRequirements.NO_BONDING_MITM_PROTECTION: 51 hci.AuthenticationRequirements.NO_BONDING_MITM_PROTECTION, 52 AuthenticationRequirements.DEDICATED_BONDING: 53 hci.AuthenticationRequirements.DEDICATED_BONDING, 54 AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION: 55 hci.AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION, 56 AuthenticationRequirements.GENERAL_BONDING: 57 hci.AuthenticationRequirements.GENERAL_BONDING, 58 AuthenticationRequirements.GENERAL_BONDING_MITM_PROTECTION: 59 hci.AuthenticationRequirements.GENERAL_BONDING_MITM_PROTECTION, 60 } 61 62 _oob_present_lookup = { 63 OobDataPresent.NOT_PRESENT: hci.OobDataPresent.NOT_PRESENT, 64 OobDataPresent.P192_PRESENT: hci.OobDataPresent.P_192_PRESENT, 65 OobDataPresent.P256_PRESENT: hci.OobDataPresent.P_256_PRESENT, 66 OobDataPresent.P192_AND_256_PRESENT: hci.OobDataPresent.P_192_AND_256_PRESENT, 67 } 68 69 _hci_event_stream = None 70 _io_caps = hci.IoCapability.DISPLAY_ONLY 71 _auth_reqs = hci.AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION 72 _secure_connections_enabled = False 73 74 _hci = None 75 76 MAX_PIN_LENGTH = 16 77 MIN_PIN_LENGTH = 1 78 79 def _enqueue_hci_command(self, command, expect_complete): 80 if (expect_complete): 81 self._hci.send_command(command) 82 else: 83 self._hci.send_command(command) 84 85 def __init__(self, device): 86 """ 87 Don't call super b/c the gRPC stream setup will crash test 88 """ 89 logging.info("Cert: Init") 90 self._device = device 91 self._device.wait_channel_ready() 92 self._hci = PyHci(device) 93 self._hci.register_for_events(hci.EventCode.ENCRYPTION_CHANGE, 94 hci.EventCode.CHANGE_CONNECTION_LINK_KEY_COMPLETE, 95 hci.EventCode.CENTRAL_LINK_KEY_COMPLETE, hci.EventCode.RETURN_LINK_KEYS, 96 hci.EventCode.PIN_CODE_REQUEST, hci.EventCode.LINK_KEY_REQUEST, 97 hci.EventCode.LINK_KEY_NOTIFICATION, 98 hci.EventCode.ENCRYPTION_KEY_REFRESH_COMPLETE, 99 hci.EventCode.IO_CAPABILITY_REQUEST, hci.EventCode.IO_CAPABILITY_RESPONSE, 100 hci.EventCode.REMOTE_OOB_DATA_REQUEST, hci.EventCode.SIMPLE_PAIRING_COMPLETE, 101 hci.EventCode.USER_PASSKEY_NOTIFICATION, hci.EventCode.KEYPRESS_NOTIFICATION, 102 hci.EventCode.USER_CONFIRMATION_REQUEST, hci.EventCode.USER_PASSKEY_REQUEST) 103 self._hci_event_stream = self._hci.get_event_stream() 104 105 def create_bond(self, address, type): 106 """ 107 Creates a bond from the cert perspective 108 """ 109 logging.info("Cert: Creating bond to '%s' from '%s'" % (str(address), str(self._device.address))) 110 # TODO(optedoblivion): Trigger connection to Send AuthenticationRequested 111 112 def remove_bond(self, address, type): 113 """ 114 We store the link key locally in the test and pretend 115 So to remove_bond we need to Remove the "stored" data 116 """ 117 pass 118 119 def set_io_capabilities(self, io_capabilities): 120 """ 121 Set the IO Capabilities used for the cert 122 """ 123 logging.info("Cert: setting IO Capabilities data to '%s'" % 124 self._io_capabilities_name_lookup.get(io_capabilities, "ERROR")) 125 self._io_caps = self._io_cap_lookup.get(io_capabilities, hci.IoCapability.DISPLAY_ONLY) 126 127 def set_authentication_requirements(self, auth_reqs): 128 """ 129 Establish authentication requirements for the stack 130 """ 131 logging.info("Cert: setting Authentication Requirements data to '%s'" % 132 self._auth_reqs_name_lookup.get(auth_reqs, "ERROR")) 133 self._auth_reqs = self._auth_req_lookup.get(auth_reqs, hci.AuthenticationRequirements.GENERAL_BONDING) 134 135 def get_oob_data_from_controller(self, pb_oob_data_type): 136 """ 137 Get the Out-of-band data for SSP pairing 138 139 :param pb_oob_data_type: Type of data needed 140 :return: a tuple of bytes (192c,192r,256c,256r) with increasing security; bytes may be all 0s depending on pb_oob_data_type value 141 142 """ 143 oob_data_type = self._oob_present_lookup[pb_oob_data_type] 144 145 if (oob_data_type == hci.OobDataPresent.NOT_PRESENT): 146 logging.warn("No data present, no need to call get_oob_data") 147 return ([0 for i in range(0, 16)], [0 for i in range(0, 16)], [0 for i in range(0, 16)], 148 [0 for i in range(0, 16)]) 149 150 logging.info("Cert: Requesting OOB data") 151 if oob_data_type == hci.OobDataPresent.P_192_PRESENT: 152 # If host and controller supports secure connections we always used ReadLocalOobExtendedDataRequest 153 if self._secure_connections_enabled: 154 logging.info("Cert: Requesting P192 Data; secure connections") 155 complete_capture = HciCaptures.ReadLocalOobExtendedDataCompleteCapture() 156 self._enqueue_hci_command(hci.ReadLocalOobExtendedData(), True) 157 logging.info("Cert: Waiting for OOB response from controller") 158 assertThat(self._hci_event_stream).emits(complete_capture) 159 complete = complete_capture.get() 160 return (list(complete.c192), list(complete.r192), [0 for i in range(0, 16)], [0 for i in range(0, 16)]) 161 # else we use ReadLocalDataRequest 162 else: 163 logging.info("Cert: Requesting P192 Data; no secure connections") 164 complete_capture = HciCaptures.ReadLocalOobDataCompleteCapture() 165 self._enqueue_hci_command(hci.ReadLocalOobData(), True) 166 logging.info("Cert: Waiting for OOB response from controller") 167 assertThat(self._hci_event_stream).emits(complete_capture) 168 complete = complete_capture.get() 169 return (list(complete.c), list(complete.r), [0 for i in range(0, 16)], [0 for i in range(0, 16)]) 170 171 # Must be secure connection compatible to use these 172 elif oob_data_type == hci.OobDataPresent.P_256_PRESENT: 173 logging.info("Cert: Requesting P256 Extended Data; secure connections") 174 complete_capture = HciCaptures.ReadLocalOobExtendedDataCompleteCapture() 175 self._enqueue_hci_command(hci.ReadLocalOobExtendedData(), True) 176 logging.info("Cert: Waiting for OOB response from controller") 177 assertThat(self._hci_event_stream).emits(complete_capture) 178 complete = complete_capture.get() 179 return ([0 for i in range(0, 16)], [0 for i in range(0, 16)], list(complete.c256), list(complete.r256)) 180 181 else: # Both 182 logging.info("Cert: Requesting P192 AND P256 Extended Data; secure connections") 183 complete_capture = HciCaptures.ReadLocalOobExtendedDataCompleteCapture() 184 self._enqueue_hci_command(hci.ReadLocalOobExtendedData(), True) 185 logging.info("Cert: Waiting for OOB response from controller") 186 assertThat(self._hci_event_stream).emits(complete_capture) 187 complete = complete_capture.get() 188 return (list(complete.c192), list(complete.r192), list(complete.c256), list(complete.r256)) 189 190 def input_passkey(self, address, passkey): 191 """ 192 Pretend to answer the pairing dialog as a user 193 """ 194 logging.info("Cert: Waiting for PASSKEY request") 195 assertThat(self._hci_event_stream).emits(HciMatchers.EventWithCode(hci.EventCode.USER_PASSKEY_REQUEST)) 196 logging.info("Cert: Send user input passkey %d for %s" % (passkey, address)) 197 peer = bluetooth.Address(address) 198 self._enqueue_hci_command( 199 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.ENTRY_STARTED), 200 True) 201 self._enqueue_hci_command( 202 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED), 203 True) 204 self._enqueue_hci_command( 205 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED), 206 True) 207 self._enqueue_hci_command( 208 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.CLEARED), True) 209 self._enqueue_hci_command( 210 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED), 211 True) 212 self._enqueue_hci_command( 213 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED), 214 True) 215 self._enqueue_hci_command( 216 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ERASED), 217 True) 218 self._enqueue_hci_command( 219 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED), 220 True) 221 self._enqueue_hci_command( 222 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED), 223 True) 224 self._enqueue_hci_command( 225 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED), 226 True) 227 self._enqueue_hci_command( 228 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED), 229 True) 230 self._enqueue_hci_command( 231 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED), 232 True) 233 self._enqueue_hci_command( 234 hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.ENTRY_COMPLETED), 235 True) 236 self._enqueue_hci_command(hci.UserPasskeyRequestReply(bd_addr=peer, numeric_value=passkey), True) 237 238 def input_pin(self, address, pin): 239 """ 240 Pretend to answer the pairing dialog as a user 241 """ 242 243 if len(pin) > self.MAX_PIN_LENGTH or len(pin) < self.MIN_PIN_LENGTH: 244 raise Exception("Pin code must be within range") 245 246 logging.info("Cert: Waiting for PIN request") 247 assertThat(self._hci_event_stream).emits(HciMatchers.PinCodeRequest()) 248 logging.info("Cert: Send user input PIN %s for %s" % (pin.decode(), address)) 249 peer = address.decode('utf-8') 250 pin_list = list(pin) 251 # Pad 252 for i in range(self.MAX_PIN_LENGTH - len(pin_list)): 253 pin_list.append(0) 254 self._enqueue_hci_command( 255 hci.PinCodeRequestReply(bd_addr=bluetooth.Address(peer), pin_code_length=len(pin), pin_code=pin_list), True) 256 257 def __send_ui_callback(self, address, callback_type, b, uid, pin): 258 """ 259 Pretend to answer the pairing dailog as a user 260 """ 261 logging.info("Cert: Send user input callback uid:%d; response: %s" % (uid, b)) 262 # TODO(optedoblivion): Make callback and set it to the module 263 264 def enable_secure_simple_pairing(self): 265 """ 266 This is called when you want to enable SSP for testing 267 """ 268 logging.info("Cert: Sending WRITE_SIMPLE_PAIRING_MODE [True]") 269 self._enqueue_hci_command(hci.WriteSimplePairingMode(simple_pairing_mode=hci.Enable.ENABLED), True) 270 logging.info("Cert: Waiting for controller response") 271 assertThat(self._hci_event_stream).emits(HciMatchers.CommandComplete(hci.OpCode.WRITE_SIMPLE_PAIRING_MODE)) 272 273 def enable_secure_connections(self): 274 """ 275 This is called when you want to enable secure connections support 276 """ 277 logging.info("Cert: Sending WRITE_SECURE_CONNECTIONS_HOST_SUPPORT [True]") 278 self._enqueue_hci_command( 279 hci.WriteSecureConnectionsHostSupport(secure_connections_host_support=hci.Enable.ENABLED), True) 280 logging.info("Cert: Waiting for controller response") 281 assertThat(self._hci_event_stream).emits( 282 HciMatchers.CommandComplete(hci.OpCode.WRITE_SECURE_CONNECTIONS_HOST_SUPPORT)) 283 # TODO(optedoblivion): Figure this out and remove (see classic_pairing_handler.cc) 284 #self._secure_connections_enabled = True 285 286 def send_io_caps(self, address): 287 logging.info("Cert: Waiting for IO_CAPABILITY_REQUEST") 288 assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityRequest()) 289 logging.info("Cert: Sending IO_CAPABILITY_REQUEST_REPLY") 290 oob_data_present = hci.OobDataPresent.NOT_PRESENT 291 self._enqueue_hci_command( 292 hci.IoCapabilityRequestReply(bd_addr=bluetooth.Address(address), 293 io_capability=self._io_caps, 294 oob_present=oob_data_present, 295 authentication_requirements=self._auth_reqs), True) 296 297 def accept_pairing(self, dut_address, reply_boolean, expect_to_fail, on_responder_reply): 298 """ 299 Here we handle the pairing events at the HCI level 300 """ 301 logging.info("Cert: Waiting for IO_CAPABILITY_RESPONSE") 302 assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityResponse()) 303 self.send_io_caps(dut_address) 304 logging.info("Cert: Waiting for USER_CONFIRMATION_REQUEST") 305 assertThat(self._hci_event_stream).emits(HciMatchers.UserConfirmationRequest()) 306 logging.info("Cert: Sending Simulated User Response '%s'" % reply_boolean) 307 if reply_boolean: 308 logging.info("Cert: Sending USER_CONFIRMATION_REQUEST_REPLY") 309 self._enqueue_hci_command(hci.UserConfirmationRequestReply(bd_addr=bluetooth.Address(dut_address)), True) 310 on_responder_reply() 311 logging.info("Cert: Waiting for SIMPLE_PAIRING_COMPLETE") 312 assertThat(self._hci_event_stream).emits(HciMatchers.SimplePairingComplete()) 313 if not expect_to_fail: 314 logging.info("Cert: Waiting for LINK_KEY_NOTIFICATION") 315 assertThat(self._hci_event_stream).emits(HciMatchers.LinkKeyNotification()) 316 else: 317 logging.info("Cert: Sending USER_CONFIRMATION_REQUEST_NEGATIVE_REPLY") 318 self._enqueue_hci_command(hci.UserConfirmationRequestNegativeReply(bd_addr=bluetooth.Address(dut_address)), 319 True) 320 on_responder_reply() 321 logging.info("Cert: Waiting for SIMPLE_PAIRING_COMPLETE") 322 assertThat(self._hci_event_stream).emits(HciMatchers.SimplePairingComplete()) 323 324 def accept_oob_pairing(self, dut_address): 325 logging.info("Cert: Waiting for IO_CAPABILITY_RESPONSE") 326 assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityResponse()) 327 self.send_io_caps(dut_address) 328 logging.info("Cert: Waiting for SIMPLE_PAIRING_COMPLETE") 329 ssp_complete_capture = HciCaptures.SimplePairingCompleteCapture() 330 assertThat(self._hci_event_stream).emits(ssp_complete_capture) 331 ssp_complete = ssp_complete_capture.get() 332 logging.info(ssp_complete.status) 333 assertThat(ssp_complete.status).isEqualTo(hci.ErrorCode.SUCCESS) 334 335 def on_user_input(self, dut_address, reply_boolean, expected_ui_event): 336 """ 337 Cert doesn't need the test to respond to the ui event 338 Cert responds in accept pairing 339 """ 340 pass 341 342 def wait_for_bond_event(self, expected_bond_event): 343 """ 344 A bond event will be triggered once the bond process 345 is complete. For the DUT we need to wait for it, 346 for Cert it isn't needed. 347 """ 348 pass 349 350 def enforce_security_policy(self, address, type, policy): 351 """ 352 Pass for now 353 """ 354 pass 355 356 def wait_for_enforce_security_event(self, expected_enforce_security_event): 357 """ 358 Cert side needs to pass 359 """ 360 pass 361 362 def wait_for_disconnect_event(self): 363 """ 364 Cert side needs to pass 365 """ 366 pass 367 # FIXME: Gabeldorsche facade don't allow us to register for an DISCONNECT_COMPLETE event 368 # logging.info("Cert: Waiting for DISCONNECT_COMPLETE") 369 # assertThat(self._hci_event_stream).emits(HciMatchers.DisconnectionComplete()) 370 371 def close(self): 372 safeClose(self._hci) 373