1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18 19from bluetooth_packets_python3 import hci_packets 20from cert.closable import Closable 21from cert.closable import safeClose 22from cert.event_stream import EventStream 23from cert.truth import assertThat 24from facade import common_pb2 as common 25from google.protobuf import empty_pb2 as empty_proto 26 27from security.facade_pb2 import AuthenticationRequirements 28from security.facade_pb2 import AuthenticationRequirementsMessage 29from security.facade_pb2 import BondMsgType 30from security.facade_pb2 import SecurityPolicyMessage 31from security.facade_pb2 import IoCapabilities 32from security.facade_pb2 import IoCapabilityMessage 33from security.facade_pb2 import OobDataBondMessage 34from security.facade_pb2 import OobDataMessage 35from security.facade_pb2 import OobDataPresentMessage 36from security.facade_pb2 import UiMsgType 37from security.facade_pb2 import UiCallbackMsg 38from security.facade_pb2 import UiCallbackType 39 40 41class PySecurity(Closable): 42 """ 43 Abstraction for security tasks and GRPC calls 44 """ 45 46 _io_capabilities_name_lookup = { 47 IoCapabilities.DISPLAY_ONLY: "DISPLAY_ONLY", 48 IoCapabilities.DISPLAY_YES_NO_IO_CAP: "DISPLAY_YES_NO_IO_CAP", 49 IoCapabilities.KEYBOARD_ONLY: "KEYBOARD_ONLY", 50 IoCapabilities.NO_INPUT_NO_OUTPUT: "NO_INPUT_NO_OUTPUT", 51 } 52 53 _auth_reqs_name_lookup = { 54 AuthenticationRequirements.NO_BONDING: "NO_BONDING", 55 AuthenticationRequirements.NO_BONDING_MITM_PROTECTION: "NO_BONDING_MITM_PROTECTION", 56 AuthenticationRequirements.DEDICATED_BONDING: "DEDICATED_BONDING", 57 AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION: "DEDICATED_BONDING_MITM_PROTECTION", 58 AuthenticationRequirements.GENERAL_BONDING: "GENERAL_BONDING", 59 AuthenticationRequirements.GENERAL_BONDING_MITM_PROTECTION: "GENERAL_BONDING_MITM_PROTECTION", 60 } 61 62 _ui_event_stream = None 63 _bond_event_stream = None 64 _oob_data_event_stream = None 65 66 def __init__(self, device): 67 logging.info("DUT: Init") 68 self._device = device 69 self._device.wait_channel_ready() 70 self._ui_event_stream = EventStream(self._device.security.FetchUiEvents(empty_proto.Empty())) 71 self._bond_event_stream = EventStream(self._device.security.FetchBondEvents(empty_proto.Empty())) 72 self._enforce_security_policy_stream = EventStream( 73 self._device.security.FetchEnforceSecurityPolicyEvents(empty_proto.Empty())) 74 self._disconnect_event_stream = EventStream(self._device.security.FetchDisconnectEvents(empty_proto.Empty())) 75 self._oob_data_event_stream = EventStream( 76 self._device.security.FetchGetOutOfBandDataEvents(empty_proto.Empty())) 77 78 def create_bond(self, address, type): 79 """ 80 Triggers stack under test to create bond 81 """ 82 logging.info("DUT: Creating bond to '%s' from '%s'" % (str(address), str(self._device.address))) 83 self._device.security.CreateBond( 84 common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type)) 85 86 def create_bond_out_of_band(self, address, type, p192_oob_data, p256_oob_data): 87 """ 88 Triggers stack under test to create bond using Out of Band method 89 """ 90 91 logging.info("DUT: Creating OOB bond to '%s' from '%s'" % (str(address), str(self._device.address))) 92 93 self._device.security.CreateBondOutOfBand( 94 OobDataBondMessage( 95 address=common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type), 96 p192_data=OobDataMessage( 97 address=common.BluetoothAddressWithType( 98 address=common.BluetoothAddress(address=address), type=type), 99 confirmation_value=bytes(bytearray(p192_oob_data[0])), 100 random_value=bytes(bytearray(p192_oob_data[1]))), 101 p256_data=OobDataMessage( 102 address=common.BluetoothAddressWithType( 103 address=common.BluetoothAddress(address=address), type=type), 104 confirmation_value=bytes(bytearray(p256_oob_data[0])), 105 random_value=bytes(bytearray(p256_oob_data[1]))))) 106 107 def remove_bond(self, address, type): 108 """ 109 Removes bond from stack under test 110 """ 111 self._device.security.RemoveBond( 112 common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type)) 113 114 def set_io_capabilities(self, io_capabilities): 115 """ 116 Set the IO Capabilities used for the DUT 117 """ 118 logging.info("DUT: setting IO Capabilities data to '%s'" % self._io_capabilities_name_lookup.get( 119 io_capabilities, "ERROR")) 120 self._device.security.SetIoCapability(IoCapabilityMessage(capability=io_capabilities)) 121 122 def set_authentication_requirements(self, auth_reqs): 123 """ 124 Establish authentication requirements for the stack 125 """ 126 logging.info("DUT: setting Authentication Requirements data to '%s'" % self._auth_reqs_name_lookup.get( 127 auth_reqs, "ERROR")) 128 self._device.security.SetAuthenticationRequirements(AuthenticationRequirementsMessage(requirement=auth_reqs)) 129 130 def __send_ui_callback(self, address, callback_type, b, uid, pin): 131 """ 132 Send a callback from the UI as if the user pressed a button on the dialog 133 """ 134 logging.info("DUT: Sending user input response uid: %d; response: %s" % (uid, b)) 135 self._device.security.SendUiCallback( 136 UiCallbackMsg( 137 message_type=callback_type, 138 boolean=b, 139 unique_id=uid, 140 pin=bytes(pin), 141 address=common.BluetoothAddressWithType( 142 address=common.BluetoothAddress(address=address), 143 type=common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS))) 144 145 def enable_secure_simple_pairing(self): 146 """ 147 This is called when you want to enable SSP for testing 148 Since the stack under test already enables it by default 149 we do not need to do anything here for the time being 150 """ 151 pass 152 153 def enable_secure_connections(self): 154 pass 155 156 def accept_pairing(self, cert_address, reply_boolean): 157 """ 158 Here we pass, but in cert we perform pairing flow tasks. 159 This was added here in order to be more dynamic, but the stack 160 under test will handle the pairing flow. 161 """ 162 pass 163 164 def accept_oob_pairing(self, cert_address, reply_boolean): 165 """ 166 Here we pass, but in cert we perform pairing flow tasks. 167 This was added here in order to be more dynamic, but the stack 168 under test will handle the pairing flow. 169 """ 170 pass 171 172 def wait_for_passkey(self, cert_address): 173 """ 174 Respond to the UI event 175 """ 176 passkey = -1 177 178 def get_passkey(event): 179 if event.message_type == UiMsgType.DISPLAY_PASSKEY: 180 nonlocal passkey 181 passkey = event.numeric_value 182 return True 183 return False 184 185 logging.info("DUT: Waiting for expected UI event") 186 assertThat(self._ui_event_stream).emits(get_passkey) 187 return passkey 188 189 def input_pin(self, cert_address, pin): 190 """ 191 Respond to the UI event 192 """ 193 logging.info("DUT: Inputting pin code: %s" % str(pin)) 194 self.on_user_input( 195 cert_address=cert_address, reply_boolean=True, expected_ui_event=UiMsgType.DISPLAY_PIN_ENTRY, pin=pin) 196 197 def on_user_input(self, cert_address, reply_boolean, expected_ui_event, pin=[]): 198 """ 199 Respond to the UI event 200 """ 201 if expected_ui_event is None: 202 return 203 204 ui_id = -1 205 206 def get_unique_id(event): 207 if event.message_type == expected_ui_event: 208 nonlocal ui_id 209 ui_id = event.unique_id 210 return True 211 return False 212 213 logging.info("DUT: Waiting for expected UI event") 214 assertThat(self._ui_event_stream).emits(get_unique_id) 215 callback_type = UiCallbackType.YES_NO if len(pin) == 0 else UiCallbackType.PIN 216 self.__send_ui_callback(cert_address, callback_type, reply_boolean, ui_id, pin) 217 218 def get_address(self): 219 return self._device.address 220 221 def wait_for_bond_event(self, expected_bond_event): 222 """ 223 A bond event will be triggered once the bond process 224 is complete. For the DUT we need to wait for it, 225 for Cert it isn't needed. 226 """ 227 logging.info("DUT: Waiting for Bond Event: %s " % expected_bond_event) 228 assertThat(self._bond_event_stream).emits( 229 lambda event: event.message_type == expected_bond_event or logging.info("DUT: Actual Bond Event: %s" % event.message_type) 230 ) 231 232 def wait_for_enforce_security_event(self, expected_enforce_security_event): 233 """ 234 We expect a 'True' or 'False' from the enforce security call 235 236 This interface will allow the caller to wait for a callback 237 result from enforcing security policy over the facade. 238 """ 239 logging.info("DUT: Waiting for enforce security event") 240 assertThat(self._enforce_security_policy_stream).emits( 241 lambda event: event.result == expected_enforce_security_event or logging.info(event.result)) 242 243 def wait_for_disconnect_event(self): 244 """ 245 The Address is expected to be returned 246 """ 247 logging.info("DUT: Waiting for Disconnect Event") 248 assertThat(self._disconnect_event_stream).emits(lambda event: logging.info("event: %s" % event.address) or True) 249 250 def enforce_security_policy(self, address, type, policy): 251 """ 252 Call to enforce classic security policy 253 """ 254 self._device.security.EnforceSecurityPolicy( 255 SecurityPolicyMessage( 256 address=common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type), 257 policy=policy)) 258 259 def get_oob_data_from_controller(self, oob_data_present): 260 self._device.security.GetOutOfBandData(empty_proto.Empty()) 261 oob_data = [] 262 263 def get_oob_data(event): 264 nonlocal oob_data 265 oob_data = [ 266 list(event.p192_data.confirmation_value), 267 list(event.p192_data.random_value), [0 for i in range(0, 16)], [0 for i in range(0, 16)] 268 ] 269 return True 270 271 assertThat(self._oob_data_event_stream).emits(get_oob_data) 272 return oob_data 273 274 def close(self): 275 safeClose(self._ui_event_stream) 276 safeClose(self._bond_event_stream) 277 safeClose(self._enforce_security_policy_stream) 278 safeClose(self._disconnect_event_stream) 279 safeClose(self._oob_data_event_stream) 280