1#!/usr/bin/env python3 2# 3# Copyright 2022 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import queue 18import logging 19 20from blueberry.tests.gd.cert.closable import Closable 21from blueberry.tests.gd.cert.truth import assertThat 22from blueberry.tests.sl4a_sl4a.lib.oob_data import OobData 23 24 25class Security: 26 27 # Events sent from SL4A 28 SL4A_EVENT_GENERATE_OOB_DATA_SUCCESS = "GeneratedOobData" 29 SL4A_EVENT_GENERATE_OOB_DATA_ERROR = "ErrorOobData" 30 SL4A_EVENT_BONDED = "Bonded" 31 SL4A_EVENT_UNBONDED = "Unbonded" 32 33 # Matches tBT_TRANSPORT 34 # Used Strings because ints were causing gRPC problems 35 TRANSPORT_AUTO = "0" 36 TRANSPORT_BREDR = "1" 37 TRANSPORT_LE = "2" 38 39 __default_timeout = 10 # seconds 40 __default_bonding_timeout = 60 # seconds 41 __device = None 42 43 def __init__(self, device): 44 self.__device = device 45 self.__device.sl4a.bluetoothStartPairingHelper(True) 46 47 # Returns a tuple formatted as <statuscode, OobData>. The OobData is 48 # populated if the statuscode is 0 (SUCCESS), else it will be None 49 def generate_oob_data(self, transport, wait_for_oob_data_callback=True): 50 logging.info("Generating local OOB data") 51 self.__device.sl4a.bluetoothGenerateLocalOobData(transport) 52 53 if wait_for_oob_data_callback is False: 54 return 0, None 55 else: 56 # Check for oob data generation success 57 try: 58 generate_success_event = self.__device.ed.pop_event(self.SL4A_EVENT_GENERATE_OOB_DATA_SUCCESS, 59 self.__default_timeout) 60 except queue.Empty as error: 61 logging.error("Failed to generate OOB data!") 62 # Check if generating oob data failed without blocking 63 try: 64 generate_failure_event = self.__device.ed.pop_event(self.SL4A_EVENT_GENERATE_OOB_DATA_ERROR, 0) 65 except queue.Empty as error: 66 logging.error("Failed to generate OOB Data without error code") 67 assertThat(True).isFalse() 68 69 errorcode = generate_failure_event["data"]["Error"] 70 logging.info("Generating local oob data failed with error code %d", errorcode) 71 return errorcode, None 72 73 logging.info("OOB ADDR with Type: %s", generate_success_event["data"]["address_with_type"]) 74 return 0, OobData(generate_success_event["data"]["address_with_type"], 75 generate_success_event["data"]["confirmation"], generate_success_event["data"]["randomizer"]) 76 77 def ensure_device_bonded(self): 78 bond_state = None 79 try: 80 bond_state = self.__device.ed.pop_event(self.SL4A_EVENT_BONDED, self.__default_bonding_timeout) 81 except queue.Empty as error: 82 logging.error("Failed to get bond event!") 83 84 assertThat(bond_state).isNotNone() 85 logging.info("Bonded: %s", bond_state["data"]["bonded_state"]) 86 assertThat(bond_state["data"]["bonded_state"]).isEqualTo(True) 87 88 def create_bond_out_of_band(self, 89 oob_data, 90 bt_device_object_address=None, 91 bt_device_object_address_type=-1, 92 wait_for_device_bonded=True): 93 assertThat(oob_data).isNotNone() 94 oob_data_address = oob_data.to_sl4a_address() 95 oob_data_address_type = oob_data.to_sl4a_address_type() 96 97 # If a BT Device object address isn't specified, default to the oob data 98 # address and type 99 if bt_device_object_address is None: 100 bt_device_object_address = oob_data_address 101 bt_device_object_address_type = oob_data_address_type 102 103 logging.info("Bonding OOB with device addr=%s, device addr type=%s, oob addr=%s, oob addr type=%s", 104 bt_device_object_address, bt_device_object_address_type, oob_data_address, oob_data_address_type) 105 bond_start = self.__device.sl4a.bluetoothCreateLeBondOutOfBand( 106 oob_data_address, oob_data_address_type, oob_data.confirmation, oob_data.randomizer, 107 bt_device_object_address, bt_device_object_address_type) 108 assertThat(bond_start).isTrue() 109 110 if wait_for_device_bonded: 111 self.ensure_device_bonded() 112 113 def create_bond_numeric_comparison(self, address, transport=TRANSPORT_LE, wait_for_device_bonded=True): 114 assertThat(address).isNotNone() 115 if transport == self.TRANSPORT_LE: 116 self.__device.sl4a.bluetoothLeBond(address) 117 else: 118 self.__device.sl4a.bluetoothBond(address) 119 self.ensure_device_bonded() 120 121 def remove_all_bonded_devices(self): 122 bonded_devices = self.__device.sl4a.bluetoothGetBondedDevices() 123 for device in bonded_devices: 124 logging.info(device) 125 self.remove_bond(device["address"]) 126 127 def remove_bond(self, address): 128 if self.__device.sl4a.bluetoothUnbond(address): 129 bond_state = None 130 try: 131 bond_state = self.__device.ed.pop_event(self.SL4A_EVENT_UNBONDED, self.__default_timeout) 132 except queue.Empty as error: 133 logging.error("Failed to get bond event!") 134 assertThat(bond_state).isNotNone() 135 assertThat(bond_state["data"]["bonded_state"]).isEqualTo(False) 136 else: 137 logging.info("remove_bond: Bluetooth Device with address: %s does not exist", address) 138 139 def close(self): 140 self.remove_all_bonded_devices() 141 self.__device.sl4a.bluetoothStartPairingHelper(False) 142 self.__device = None 143