1# Copyright 2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Security grpc interface.""" 15 16import asyncio 17import logging 18from typing import AsyncGenerator 19from typing import AsyncIterator 20 21from floss.pandora.floss import adapter_client 22from floss.pandora.floss import floss_enums 23from floss.pandora.floss import utils 24from floss.pandora.server import bluetooth as bluetooth_module 25from google.protobuf import empty_pb2 26from google.protobuf import wrappers_pb2 27import grpc 28from pandora import security_grpc_aio 29from pandora import security_pb2 30 31 32class SecurityService(security_grpc_aio.SecurityServicer): 33 """Service to trigger Bluetooth Host security pairing procedures. 34 35 This class implements the Pandora bluetooth test interfaces, 36 where the meta class definition is automatically generated by the protobuf. 37 The interface definition can be found in: 38 https://cs.android.com/android/platform/superproject/+/main:external 39 /pandora/bt-test-interfaces/pandora/security.proto 40 """ 41 42 def __init__(self, bluetooth: bluetooth_module.Bluetooth): 43 self.bluetooth = bluetooth 44 self.manually_confirm = False 45 self.on_pairing_count = 0 46 47 class PairingObserver(adapter_client.BluetoothCallbacks): 48 """Observer to observe pairing events.""" 49 50 def __init__(self, client: adapter_client, security: security_grpc_aio.SecurityServicer): 51 self.client = client 52 self.security = security 53 54 @utils.glib_callback() 55 def on_ssp_request(self, remote_device, class_of_device, variant, passkey): 56 if self.security.manually_confirm: 57 return 58 59 logging.info("Security: on_ssp_request variant: %s passkey: %s", variant, passkey) 60 address, _ = remote_device 61 62 if variant in (floss_enums.PairingVariant.CONSENT, floss_enums.PairingVariant.PASSKEY_CONFIRMATION): 63 self.client.set_pairing_confirmation(address, 64 True, 65 method_callback=self.on_set_pairing_confirmation) 66 67 @utils.glib_callback() 68 def on_set_pairing_confirmation(self, err, result): 69 if err or not result: 70 logging.info('Security: on_set_pairing_confirmation failed. err: %s result: %s', err, result) 71 72 observer = PairingObserver(self.bluetooth.adapter_client, self) 73 name = utils.create_observer_name(observer) 74 self.bluetooth.adapter_client.register_callback_observer(name, observer) 75 self.pairing_observer = observer 76 77 async def wait_le_security_level(self, level, address): 78 79 class BondingObserver(adapter_client.BluetoothCallbacks): 80 """Observer to observe the bond state.""" 81 82 def __init__(self, task): 83 self.task = task 84 85 @utils.glib_callback() 86 def on_bond_state_changed(self, status, address, state): 87 if address != self.task['address']: 88 return 89 90 future = self.task['wait_bond'] 91 if status != floss_enums.BtStatus.SUCCESS: 92 future.get_loop().call_soon_threadsafe(future.set_result, (False, f'Status: {status}')) 93 return 94 95 if state == floss_enums.BondState.BONDED: 96 future.get_loop().call_soon_threadsafe(future.set_result, (True, None)) 97 elif state == floss_enums.BondState.NOT_BONDED: 98 future.get_loop().call_soon_threadsafe(future.set_result, 99 (False, f'Status: {status}, State: {state}')) 100 101 if level == security_pb2.LE_LEVEL1: 102 return True 103 if level == security_pb2.LE_LEVEL4: 104 logging.error('wait_le_security_level: Low-energy level 4 not supported.') 105 return False 106 107 if self.bluetooth.is_bonded(address): 108 is_bonded = True 109 else: 110 try: 111 wait_bond = asyncio.get_running_loop().create_future() 112 observer = BondingObserver({'wait_bond': wait_bond, 'address': address}) 113 name = utils.create_observer_name(observer) 114 self.bluetooth.adapter_client.register_callback_observer(name, observer) 115 is_bonded, reason = await wait_bond 116 if not is_bonded: 117 logging.error('Failed to bond to the address: %s, reason: %s', address, reason) 118 finally: 119 self.bluetooth.adapter_client.unregister_callback_observer(name, observer) 120 121 is_encrypted = self.bluetooth.is_encrypted(address) 122 if level == security_pb2.LE_LEVEL2: 123 return is_encrypted 124 if level == security_pb2.LE_LEVEL3: 125 return is_encrypted and is_bonded 126 127 logging.error('wait_le_security_level: Invalid security level %s.', level) 128 return False 129 130 async def wait_classic_security_level(self, level, address): 131 132 class BondingObserver(adapter_client.BluetoothCallbacks): 133 """Observer to observe the bond state""" 134 135 def __init__(self, task): 136 self.task = task 137 138 @utils.glib_callback() 139 def on_bond_state_changed(self, status, address, state): 140 if address != self.task['address']: 141 return 142 143 future = self.task['wait_bond'] 144 if status != floss_enums.BtStatus.SUCCESS: 145 future.get_loop().call_soon_threadsafe(future.set_result, (False, f'Status: {status}')) 146 return 147 148 if state == floss_enums.BondState.BONDED: 149 future.get_loop().call_soon_threadsafe(future.set_result, (True, None)) 150 elif state == floss_enums.BondState.NOT_BONDED: 151 future.get_loop().call_soon_threadsafe(future.set_result, 152 (False, f'Status: {status}, State: {state}')) 153 154 if level == security_pb2.LEVEL0: 155 return True 156 if level == security_pb2.LEVEL3: 157 logging.error('wait_classic_security_level: Classic level 3 not supported') 158 return False 159 160 if self.bluetooth.is_bonded(address): 161 is_bonded = True 162 else: 163 try: 164 wait_bond = asyncio.get_running_loop().create_future() 165 observer = BondingObserver({'wait_bond': wait_bond, 'address': address}) 166 name = utils.create_observer_name(observer) 167 self.bluetooth.adapter_client.register_callback_observer(name, observer) 168 is_bonded, reason = await wait_bond 169 if not is_bonded: 170 logging.error('Failed to bond to the address: %s, reason: %s', address, reason) 171 finally: 172 self.bluetooth.adapter_client.unregister_callback_observer(name, observer) 173 174 is_encrypted = self.bluetooth.is_encrypted(address) 175 if level == security_pb2.LEVEL1: 176 return not is_encrypted or is_bonded 177 if level == security_pb2.LEVEL2: 178 return is_encrypted and is_bonded 179 return False 180 181 async def OnPairing(self, request: AsyncIterator[security_pb2.PairingEventAnswer], 182 context: grpc.ServicerContext) -> AsyncGenerator[security_pb2.PairingEvent, None]: 183 logging.info('OnPairing') 184 on_pairing_id = self.on_pairing_count 185 self.on_pairing_count = self.on_pairing_count + 1 186 187 class PairingObserver(adapter_client.BluetoothCallbacks): 188 """Observer to observe all pairing events.""" 189 190 def __init__(self, loop: asyncio.AbstractEventLoop, task): 191 self.loop = loop 192 self.task = task 193 194 @utils.glib_callback() 195 def on_ssp_request(self, remote_device, class_of_device, variant, passkey): 196 address, name = remote_device 197 198 result = (address, name, variant, passkey) 199 asyncio.run_coroutine_threadsafe(self.task['pairing_events'].put(result), self.loop) 200 201 @utils.glib_callback() 202 def on_pin_request(self, remote_device, cod, min_16_digit): 203 address, name = remote_device 204 205 if min_16_digit: 206 variant = floss_enums.PairingVariant.PIN_16_DIGITS_ENTRY 207 else: 208 variant = floss_enums.PairingVariant.PIN_ENTRY 209 result = (address, name, variant, min_16_digit) 210 asyncio.run_coroutine_threadsafe(self.task['pairing_events'].put(result), self.loop) 211 212 @utils.glib_callback() 213 def on_pin_display(self, remote_device, pincode): 214 address, name = remote_device 215 216 variant = floss_enums.PairingVariant.PIN_NOTIFICATION 217 result = (address, name, variant, pincode) 218 asyncio.run_coroutine_threadsafe(self.task['pairing_events'].put(result), self.loop) 219 220 pairing_answers = request 221 222 async def streaming_answers(self): 223 while True: 224 nonlocal pairing_answers 225 nonlocal on_pairing_id 226 227 logging.info('OnPairing[%s]: Wait for pairing answer...', on_pairing_id) 228 pairing_answer = await utils.anext(pairing_answers) 229 230 answer = pairing_answer.WhichOneof('answer') 231 address = utils.address_from(pairing_answer.event.address) 232 logging.info('OnPairing[%s]: Pairing answer: %s address: %s', on_pairing_id, answer, address) 233 234 if answer == 'confirm': 235 self.bluetooth.set_pairing_confirmation(address, True) 236 elif answer == 'passkey': 237 self.bluetooth.set_pin(address, True, list(str(answer.passkey).zfill(6).encode())) 238 elif answer == 'pin': 239 self.bluetooth.set_pin(address, True, list(answer.pin)) 240 241 observers = [] 242 try: 243 self.manually_confirm = True 244 245 pairing_events = asyncio.Queue() 246 observer = PairingObserver(asyncio.get_running_loop(), {'pairing_events': pairing_events}) 247 name = utils.create_observer_name(observer) 248 self.bluetooth.adapter_client.register_callback_observer(name, observer) 249 observers.append((name, observer)) 250 251 streaming_answers_task = asyncio.create_task(streaming_answers(self)) 252 253 while True: 254 logging.info('OnPairing[%s]: Wait for pairing events...', on_pairing_id) 255 address, name, variant, *variables = await pairing_events.get() 256 logging.info('OnPairing[%s]: Pairing event: address: %s, name: %s, variant: %s, variables: %s', 257 on_pairing_id, address, name, variant, variables) 258 259 event = security_pb2.PairingEvent() 260 event.address = utils.address_to(address) 261 262 # SSP 263 if variant == floss_enums.PairingVariant.PASSKEY_CONFIRMATION: 264 [passkey] = variables 265 event.numeric_comparison = passkey 266 elif variant == floss_enums.PairingVariant.PASSKEY_ENTRY: 267 event.passkey_entry_request.CopyFrom(empty_pb2.Empty()) 268 elif variant == floss_enums.PairingVariant.CONSENT: 269 event.just_works.CopyFrom(empty_pb2.Empty()) 270 elif variant == floss_enums.PairingVariant.PASSKEY_NOTIFICATION: 271 [passkey] = variables 272 event.passkey_entry_notification = passkey 273 # Legacy 274 elif variant == floss_enums.PairingVariant.PIN_ENTRY: 275 transport = self.bluetooth.get_remote_type(address) 276 277 if transport == floss_enums.BtTransport.BREDR: 278 event.pin_code_request.CopyFrom(empty_pb2.Empty()) 279 elif transport == floss_enums.BtTransport.LE: 280 event.passkey_entry_request.CopyFrom(empty_pb2.Empty()) 281 else: 282 logging.error('Cannot determine pairing variant from unknown transport.') 283 continue 284 elif variant == floss_enums.PairingVariant.PIN_16_DIGITS_ENTRY: 285 event.pin_code_request.CopyFrom(empty_pb2.Empty()) 286 elif variant == floss_enums.PairingVarint.PIN_NOTIFICATION: 287 transport = self.bluetooth.get_remote_type(address) 288 [pincode] = variables 289 290 if transport == floss_enums.BtTransport.BREDR: 291 event.pin_code_notification = pincode.encode() 292 elif transport == floss_enums.BtTransport.LE: 293 event.passkey_entry_notification = int(pincode) 294 else: 295 logging.error('Cannot determine pairing variant from unknown transport.') 296 continue 297 else: 298 logging.error('Unknown pairing variant: %s', variant) 299 continue 300 301 yield event 302 finally: 303 streaming_answers_task.cancel() 304 for name, observer in observers: 305 self.bluetooth.adapter_client.unregister_callback_observer(name, observer) 306 307 pairing_events = None 308 pairing_answers = None 309 310 async def Secure(self, request: security_pb2.SecureRequest, 311 context: grpc.ServicerContext) -> security_pb2.SecureResponse: 312 connection = utils.connection_from(request.connection) 313 address = connection.address 314 transport = connection.transport 315 316 if transport == floss_enums.BtTransport.LE: 317 if not request.HasField('le'): 318 await context.abort(grpc.StatusCode.INVALID_ARGUMENT, 'Request le field must be set.') 319 if request.le == security_pb2.LE_LEVEL1: 320 security_level_reached = True 321 elif request.le == security_pb2.LE_LEVEL4: 322 await context.abort(grpc.StatusCode.INVALID_ARGUMENT, 'Low-energy security level 4 is not supported.') 323 else: 324 if not self.bluetooth.is_bonded(address): 325 self.bluetooth.create_bond(address, transport) 326 security_level_reached = await self.wait_le_security_level(request.le, address) 327 elif transport == floss_enums.BtTransport.BREDR: 328 if not request.HasField('classic'): 329 await context.abort(grpc.StatusCode.INVALID_ARGUMENT, 'Request classic field must be set.') 330 if request.classic == security_pb2.LEVEL0: 331 security_level_reached = True 332 elif request.classic >= security_pb2.LEVEL3: 333 await context.abort(grpc.StatusCode.INVALID_ARGUMENT, 334 'Classic security level up to 3 is not supported.') 335 else: 336 if not self.bluetooth.is_bonded(address): 337 self.bluetooth.create_bond(address, transport) 338 security_level_reached = await self.wait_classic_security_level(request.classic, address) 339 else: 340 await context.abort(grpc.StatusCode.INVALID_ARGUMENT, f'Invalid bluetooth transport type: {transport}.') 341 342 secure_response = security_pb2.SecureResponse() 343 if security_level_reached: 344 secure_response.success.CopyFrom(empty_pb2.Empty()) 345 else: 346 secure_response.not_reached.CopyFrom(empty_pb2.Empty()) 347 return secure_response 348 349 async def WaitSecurity(self, request: security_pb2.WaitSecurityRequest, 350 context: grpc.ServicerContext) -> security_pb2.WaitSecurityResponse: 351 address = utils.connection_from(request.connection).address 352 transport = floss_enums.BtTransport.BREDR if request.HasField('classic') else floss_enums.BtTransport.LE 353 354 if transport == floss_enums.BtTransport.LE: 355 security_level_reached = await self.wait_le_security_level(request.le, address) 356 elif transport == floss_enums.BtTransport.BREDR: 357 security_level_reached = await self.wait_classic_security_level(request.classic, address) 358 else: 359 await context.abort(grpc.StatusCode.INVALID_ARGUMENT, f'Invalid bluetooth transport type: {transport}.') 360 361 wait_security_response = security_pb2.WaitSecurityResponse() 362 if security_level_reached: 363 wait_security_response.success.CopyFrom(empty_pb2.Empty()) 364 else: 365 wait_security_response.pairing_failure.CopyFrom(empty_pb2.Empty()) 366 return wait_security_response 367 368 369class SecurityStorageService(security_grpc_aio.SecurityStorageServicer): 370 """Service to trigger Bluetooth Host security persistent storage procedures. 371 372 This class implements the Pandora bluetooth test interfaces, 373 where the meta class definition is automatically generated by the protobuf. 374 The interface definition can be found in: 375 https://cs.android.com/android/platform/superproject/+/main:external 376 /pandora/bt-test-interfaces/pandora/security.proto 377 """ 378 379 def __init__(self, bluetooth: bluetooth_module.Bluetooth): 380 self.bluetooth = bluetooth 381 382 async def IsBonded(self, request: security_pb2.IsBondedRequest, 383 context: grpc.ServicerContext) -> wrappers_pb2.BoolValue: 384 385 address = utils.address_from(request.address) 386 is_bonded = self.bluetooth.is_bonded(address) 387 return wrappers_pb2.BoolValue(value=is_bonded) 388 389 async def DeleteBond(self, request: security_pb2.DeleteBondRequest, 390 context: grpc.ServicerContext) -> empty_pb2.Empty: 391 392 class BondingObserver(adapter_client.BluetoothCallbacks): 393 """Observer to observe the bond state""" 394 395 def __init__(self, task): 396 self.task = task 397 398 @utils.glib_callback() 399 def on_bond_state_changed(self, status, address, state): 400 if address != self.task['address']: 401 return 402 403 future = self.task['remove_bond'] 404 if status != 0: 405 future.get_loop().call_soon_threadsafe(future.set_result, 406 (False, f'{address} failed to remove bond. Status: {status},' 407 f' State: {state}')) 408 return 409 410 if state == floss_enums.BondState.NOT_BONDED: 411 future.get_loop().call_soon_threadsafe(future.set_result, (True, None)) 412 else: 413 future.get_loop().call_soon_threadsafe( 414 future.set_result, (False, f'{address} failed on remove_bond, got bond state {state},' 415 f' want {floss_enums.BondState.NOT_BONDED}')) 416 417 address = utils.address_from(request.address) 418 if not self.bluetooth.is_bonded(address): 419 return empty_pb2.Empty() 420 try: 421 remove_bond = asyncio.get_running_loop().create_future() 422 observer = BondingObserver({'remove_bond': remove_bond, 'address': address}) 423 name = utils.create_observer_name(observer) 424 self.bluetooth.adapter_client.register_callback_observer(name, observer) 425 self.bluetooth.remove_bond(address) 426 success, reason = await remove_bond 427 if not success: 428 await context.abort(grpc.StatusCode.INVALID_ARGUMENT, 429 f'Failed to remove bond of address: {address}. Reason: {reason}.') 430 finally: 431 self.bluetooth.adapter_client.unregister_callback_observer(name, observer) 432 return empty_pb2.Empty() 433