1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18 19from bluetooth_packets_python3 import hci_packets 20from cert.captures import HciCaptures 21from cert.closable import safeClose 22from cert.event_stream import EventStream 23from cert.matchers import HciMatchers 24from cert.py_hci import PyHci 25from cert.py_security import PySecurity 26from cert.truth import assertThat 27from datetime import datetime 28from google.protobuf import empty_pb2 as empty_proto 29from l2cap.classic import facade_pb2 as l2cap_facade 30from security.facade_pb2 import IoCapabilities 31from security.facade_pb2 import AuthenticationRequirements 32from security.facade_pb2 import OobDataPresent 33 34 35class CertSecurity(PySecurity): 36 """ 37 Contain all of the certification stack logic for sending and receiving 38 HCI commands following the Classic Pairing flows. 39 """ 40 _io_cap_lookup = { 41 IoCapabilities.DISPLAY_ONLY: hci_packets.IoCapability.DISPLAY_ONLY, 42 IoCapabilities.DISPLAY_YES_NO_IO_CAP: hci_packets.IoCapability.DISPLAY_YES_NO, 43 IoCapabilities.KEYBOARD_ONLY: hci_packets.IoCapability.KEYBOARD_ONLY, 44 IoCapabilities.NO_INPUT_NO_OUTPUT: hci_packets.IoCapability.NO_INPUT_NO_OUTPUT, 45 } 46 47 _auth_req_lookup = { 48 AuthenticationRequirements.NO_BONDING: 49 hci_packets.AuthenticationRequirements.NO_BONDING, 50 AuthenticationRequirements.NO_BONDING_MITM_PROTECTION: 51 hci_packets.AuthenticationRequirements.NO_BONDING_MITM_PROTECTION, 52 AuthenticationRequirements.DEDICATED_BONDING: 53 hci_packets.AuthenticationRequirements.DEDICATED_BONDING, 54 AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION: 55 hci_packets.AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION, 56 AuthenticationRequirements.GENERAL_BONDING: 57 hci_packets.AuthenticationRequirements.GENERAL_BONDING, 58 AuthenticationRequirements.GENERAL_BONDING_MITM_PROTECTION: 59 hci_packets.AuthenticationRequirements.GENERAL_BONDING_MITM_PROTECTION, 60 } 61 62 _oob_present_lookup = { 63 OobDataPresent.NOT_PRESENT: hci_packets.OobDataPresent.NOT_PRESENT, 64 OobDataPresent.P192_PRESENT: hci_packets.OobDataPresent.P_192_PRESENT, 65 OobDataPresent.P256_PRESENT: hci_packets.OobDataPresent.P_256_PRESENT, 66 OobDataPresent.P192_AND_256_PRESENT: hci_packets.OobDataPresent.P_192_AND_256_PRESENT, 67 } 68 69 _hci_event_stream = None 70 _io_caps = hci_packets.IoCapability.DISPLAY_ONLY 71 _auth_reqs = hci_packets.AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION 72 _secure_connections_enabled = False 73 74 _hci = None 75 76 MAX_PIN_LENGTH = 16 77 MIN_PIN_LENGTH = 1 78 79 def _enqueue_hci_command(self, command, expect_complete): 80 if (expect_complete): 81 self._hci.send_command(command) 82 else: 83 self._hci.send_command(command) 84 85 def __init__(self, device): 86 """ 87 Don't call super b/c the gRPC stream setup will crash test 88 """ 89 logging.info("Cert: Init") 90 self._device = device 91 self._device.wait_channel_ready() 92 self._hci = PyHci(device) 93 self._hci.register_for_events( 94 hci_packets.EventCode.ENCRYPTION_CHANGE, hci_packets.EventCode.CHANGE_CONNECTION_LINK_KEY_COMPLETE, 95 hci_packets.EventCode.CENTRAL_LINK_KEY_COMPLETE, hci_packets.EventCode.RETURN_LINK_KEYS, 96 hci_packets.EventCode.PIN_CODE_REQUEST, hci_packets.EventCode.LINK_KEY_REQUEST, 97 hci_packets.EventCode.LINK_KEY_NOTIFICATION, hci_packets.EventCode.ENCRYPTION_KEY_REFRESH_COMPLETE, 98 hci_packets.EventCode.IO_CAPABILITY_REQUEST, hci_packets.EventCode.IO_CAPABILITY_RESPONSE, 99 hci_packets.EventCode.REMOTE_OOB_DATA_REQUEST, hci_packets.EventCode.SIMPLE_PAIRING_COMPLETE, 100 hci_packets.EventCode.USER_PASSKEY_NOTIFICATION, hci_packets.EventCode.KEYPRESS_NOTIFICATION, 101 hci_packets.EventCode.USER_CONFIRMATION_REQUEST, hci_packets.EventCode.USER_PASSKEY_REQUEST, 102 hci_packets.EventCode.REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION) 103 self._hci_event_stream = self._hci.get_event_stream() 104 105 def create_bond(self, address, type): 106 """ 107 Creates a bond from the cert perspective 108 """ 109 logging.info("Cert: Creating bond to '%s' from '%s'" % (str(address), str(self._device.address))) 110 # TODO(optedoblivion): Trigger connection to Send AuthenticationRequested 111 112 def remove_bond(self, address, type): 113 """ 114 We store the link key locally in the test and pretend 115 So to remove_bond we need to Remove the "stored" data 116 """ 117 pass 118 119 def set_io_capabilities(self, io_capabilities): 120 """ 121 Set the IO Capabilities used for the cert 122 """ 123 logging.info("Cert: setting IO Capabilities data to '%s'" % self._io_capabilities_name_lookup.get( 124 io_capabilities, "ERROR")) 125 self._io_caps = self._io_cap_lookup.get(io_capabilities, hci_packets.IoCapability.DISPLAY_ONLY) 126 127 def set_authentication_requirements(self, auth_reqs): 128 """ 129 Establish authentication requirements for the stack 130 """ 131 logging.info("Cert: setting Authentication Requirements data to '%s'" % self._auth_reqs_name_lookup.get( 132 auth_reqs, "ERROR")) 133 self._auth_reqs = self._auth_req_lookup.get(auth_reqs, hci_packets.AuthenticationRequirements.GENERAL_BONDING) 134 135 def get_oob_data_from_controller(self, pb_oob_data_type): 136 """ 137 Get the Out-of-band data for SSP pairing 138 139 :param pb_oob_data_type: Type of data needed 140 :return: a tuple of bytes (192c,192r,256c,256r) with increasing security; bytes may be all 0s depending on pb_oob_data_type value 141 142 """ 143 oob_data_type = self._oob_present_lookup[pb_oob_data_type] 144 145 if (oob_data_type == hci_packets.OobDataPresent.NOT_PRESENT): 146 logging.warn("No data present, no need to call get_oob_data") 147 return ([0 for i in range(0, 16)], [0 for i in range(0, 16)], [0 for i in range(0, 16)], 148 [0 for i in range(0, 16)]) 149 150 logging.info("Cert: Requesting OOB data") 151 if oob_data_type == hci_packets.OobDataPresent.P_192_PRESENT: 152 # If host and controller supports secure connections we always used ReadLocalOobExtendedDataRequest 153 if self._secure_connections_enabled: 154 logging.info("Cert: Requesting P192 Data; secure connections") 155 complete_capture = HciCaptures.ReadLocalOobExtendedDataCompleteCapture() 156 self._enqueue_hci_command(hci_packets.ReadLocalOobExtendedDataBuilder(), True) 157 logging.info("Cert: Waiting for OOB response from controller") 158 assertThat(self._hci_event_stream).emits(complete_capture) 159 command_complete = complete_capture.get() 160 complete = hci_packets.ReadLocalOobExtendedDataCompleteView(command_complete) 161 return (list(complete.GetC192()), list(complete.GetR192()), [0 for i in range(0, 16)], 162 [0 for i in range(0, 16)]) 163 # else we use ReadLocalDataRequest 164 else: 165 logging.info("Cert: Requesting P192 Data; no secure connections") 166 complete_capture = HciCaptures.ReadLocalOobDataCompleteCapture() 167 self._enqueue_hci_command(hci_packets.ReadLocalOobDataBuilder(), True) 168 logging.info("Cert: Waiting for OOB response from controller") 169 assertThat(self._hci_event_stream).emits(complete_capture) 170 command_complete = complete_capture.get() 171 complete = hci_packets.ReadLocalOobDataCompleteView(command_complete) 172 return (list(complete.GetC()), list(complete.GetR()), [0 for i in range(0, 16)], 173 [0 for i in range(0, 16)]) 174 175 # Must be secure connection compatible to use these 176 elif oob_data_type == hci_packets.OobDataPresent.P_256_PRESENT: 177 logging.info("Cert: Requesting P256 Extended Data; secure connections") 178 complete_capture = HciCaptures.ReadLocalOobExtendedDataCompleteCapture() 179 self._enqueue_hci_command(hci_packets.ReadLocalOobExtendedDataBuilder(), True) 180 logging.info("Cert: Waiting for OOB response from controller") 181 assertThat(self._hci_event_stream).emits(complete_capture) 182 command_complete = complete_capture.get() 183 complete = hci_packets.ReadLocalOobExtendedDataCompleteView(command_complete) 184 return ([0 for i in range(0, 16)], [0 for i in range(0, 16)], list(complete.GetC256()), 185 list(complete.GetR256())) 186 187 else: # Both 188 logging.info("Cert: Requesting P192 AND P256 Extended Data; secure connections") 189 complete_capture = HciCaptures.ReadLocalOobExtendedDataCompleteCapture() 190 self._enqueue_hci_command(hci_packets.ReadLocalOobExtendedDataBuilder(), True) 191 logging.info("Cert: Waiting for OOB response from controller") 192 assertThat(self._hci_event_stream).emits(complete_capture) 193 command_complete = complete_capture.get() 194 complete = hci_packets.ReadLocalOobExtendedDataCompleteView(command_complete) 195 return (list(complete.GetC192()), list(complete.GetR192()), list(complete.GetC256()), 196 list(complete.GetR256())) 197 198 def input_passkey(self, address, passkey): 199 """ 200 Pretend to answer the pairing dialog as a user 201 """ 202 logging.info("Cert: Waiting for PASSKEY request") 203 assertThat(self._hci_event_stream).emits(HciMatchers.EventWithCode(hci_packets.EventCode.USER_PASSKEY_REQUEST)) 204 logging.info("Cert: Send user input passkey %d for %s" % (passkey, address)) 205 peer = address.decode('utf-8') 206 self._enqueue_hci_command( 207 hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.ENTRY_STARTED), True) 208 self._enqueue_hci_command( 209 hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) 210 self._enqueue_hci_command( 211 hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) 212 self._enqueue_hci_command( 213 hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.CLEARED), True) 214 self._enqueue_hci_command( 215 hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) 216 self._enqueue_hci_command( 217 hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) 218 self._enqueue_hci_command( 219 hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ERASED), True) 220 self._enqueue_hci_command( 221 hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) 222 self._enqueue_hci_command( 223 hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) 224 self._enqueue_hci_command( 225 hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) 226 self._enqueue_hci_command( 227 hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) 228 self._enqueue_hci_command( 229 hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True) 230 self._enqueue_hci_command( 231 hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.ENTRY_COMPLETED), 232 True) 233 self._enqueue_hci_command(hci_packets.UserPasskeyRequestReplyBuilder(peer, passkey), True) 234 235 def input_pin(self, address, pin): 236 """ 237 Pretend to answer the pairing dialog as a user 238 """ 239 240 if len(pin) > self.MAX_PIN_LENGTH or len(pin) < self.MIN_PIN_LENGTH: 241 raise Exception("Pin code must be within range") 242 243 logging.info("Cert: Waiting for PIN request") 244 assertThat(self._hci_event_stream).emits(HciMatchers.PinCodeRequest()) 245 logging.info("Cert: Send user input PIN %s for %s" % (pin.decode(), address)) 246 peer = address.decode('utf-8') 247 pin_list = list(pin) 248 # Pad 249 for i in range(self.MAX_PIN_LENGTH - len(pin_list)): 250 pin_list.append(0) 251 self._enqueue_hci_command(hci_packets.PinCodeRequestReplyBuilder(peer, len(pin), pin_list), True) 252 253 def __send_ui_callback(self, address, callback_type, b, uid, pin): 254 """ 255 Pretend to answer the pairing dailog as a user 256 """ 257 logging.info("Cert: Send user input callback uid:%d; response: %s" % (uid, b)) 258 # TODO(optedoblivion): Make callback and set it to the module 259 260 def enable_secure_simple_pairing(self): 261 """ 262 This is called when you want to enable SSP for testing 263 """ 264 logging.info("Cert: Sending WRITE_SIMPLE_PAIRING_MODE [True]") 265 self._enqueue_hci_command(hci_packets.WriteSimplePairingModeBuilder(hci_packets.Enable.ENABLED), True) 266 logging.info("Cert: Waiting for controller response") 267 assertThat(self._hci_event_stream).emits( 268 HciMatchers.CommandComplete(hci_packets.OpCode.WRITE_SIMPLE_PAIRING_MODE)) 269 270 def enable_secure_connections(self): 271 """ 272 This is called when you want to enable secure connections support 273 """ 274 logging.info("Cert: Sending WRITE_SECURE_CONNECTIONS_HOST_SUPPORT [True]") 275 self._enqueue_hci_command( 276 hci_packets.WriteSecureConnectionsHostSupportBuilder(hci_packets.Enable.ENABLED), True) 277 logging.info("Cert: Waiting for controller response") 278 assertThat(self._hci_event_stream).emits( 279 HciMatchers.CommandComplete(hci_packets.OpCode.WRITE_SECURE_CONNECTIONS_HOST_SUPPORT)) 280 # TODO(optedoblivion): Figure this out and remove (see classic_pairing_handler.cc) 281 #self._secure_connections_enabled = True 282 283 def send_io_caps(self, address): 284 logging.info("Cert: Waiting for IO_CAPABILITY_REQUEST") 285 assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityRequest()) 286 logging.info("Cert: Sending IO_CAPABILITY_REQUEST_REPLY") 287 oob_data_present = hci_packets.OobDataPresent.NOT_PRESENT 288 self._enqueue_hci_command( 289 hci_packets.IoCapabilityRequestReplyBuilder( 290 address.decode('utf8'), self._io_caps, oob_data_present, self._auth_reqs), True) 291 292 def accept_pairing(self, dut_address, reply_boolean): 293 """ 294 Here we handle the pairing events at the HCI level 295 """ 296 logging.info("Cert: Waiting for LINK_KEY_REQUEST") 297 assertThat(self._hci_event_stream).emits(HciMatchers.LinkKeyRequest()) 298 logging.info("Cert: Sending LINK_KEY_REQUEST_NEGATIVE_REPLY") 299 self._enqueue_hci_command(hci_packets.LinkKeyRequestNegativeReplyBuilder(dut_address.decode('utf8')), True) 300 self.send_io_caps(dut_address) 301 logging.info("Cert: Waiting for USER_CONFIRMATION_REQUEST") 302 assertThat(self._hci_event_stream).emits(HciMatchers.UserConfirmationRequest()) 303 logging.info("Cert: Sending Simulated User Response '%s'" % reply_boolean) 304 if reply_boolean: 305 logging.info("Cert: Sending USER_CONFIRMATION_REQUEST_REPLY") 306 self._enqueue_hci_command(hci_packets.UserConfirmationRequestReplyBuilder(dut_address.decode('utf8')), True) 307 logging.info("Cert: Waiting for SIMPLE_PAIRING_COMPLETE") 308 assertThat(self._hci_event_stream).emits(HciMatchers.SimplePairingComplete()) 309 logging.info("Cert: Waiting for LINK_KEY_NOTIFICATION") 310 assertThat(self._hci_event_stream).emits(HciMatchers.LinkKeyNotification()) 311 else: 312 logging.info("Cert: Sending USER_CONFIRMATION_REQUEST_NEGATIVE_REPLY") 313 self._enqueue_hci_command( 314 hci_packets.UserConfirmationRequestNegativeReplyBuilder(dut_address.decode('utf8')), True) 315 logging.info("Cert: Waiting for SIMPLE_PAIRING_COMPLETE") 316 assertThat(self._hci_event_stream).emits(HciMatchers.SimplePairingComplete()) 317 318 def accept_oob_pairing(self, dut_address): 319 logging.info("Cert: Waiting for IO_CAPABILITY_RESPONSE") 320 assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityResponse()) 321 self.send_io_caps(dut_address) 322 logging.info("Cert: Waiting for SIMPLE_PAIRING_COMPLETE") 323 ssp_complete_capture = HciCaptures.SimplePairingCompleteCapture() 324 assertThat(self._hci_event_stream).emits(ssp_complete_capture) 325 ssp_complete = ssp_complete_capture.get() 326 logging.info(ssp_complete.GetStatus()) 327 assertThat(ssp_complete.GetStatus()).isEqualTo(hci_packets.ErrorCode.SUCCESS) 328 329 def on_user_input(self, dut_address, reply_boolean, expected_ui_event): 330 """ 331 Cert doesn't need the test to respond to the ui event 332 Cert responds in accept pairing 333 """ 334 pass 335 336 def wait_for_bond_event(self, expected_bond_event): 337 """ 338 A bond event will be triggered once the bond process 339 is complete. For the DUT we need to wait for it, 340 for Cert it isn't needed. 341 """ 342 pass 343 344 def enforce_security_policy(self, address, type, policy): 345 """ 346 Pass for now 347 """ 348 pass 349 350 def wait_for_enforce_security_event(self, expected_enforce_security_event): 351 """ 352 Cert side needs to pass 353 """ 354 pass 355 356 def wait_for_disconnect_event(self): 357 """ 358 Cert side needs to pass 359 """ 360 logging.info("Cert: Waiting for DISCONNECT_COMPLETE") 361 assertThat(self._hci_event_stream).emits(HciMatchers.DisconnectionComplete()) 362 363 def close(self): 364 safeClose(self._hci) 365