1#!/usr/bin/env python3 2# 3# Copyright 2022 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import binascii 18import io 19import logging 20import os 21import queue 22 23from blueberry.facade import common_pb2 as common 24from blueberry.tests.gd.cert.context import get_current_context 25from blueberry.tests.gd.cert.truth import assertThat 26from blueberry.tests.gd_sl4a.lib.ble_lib import disable_bluetooth 27from blueberry.tests.gd_sl4a.lib.ble_lib import enable_bluetooth 28from blueberry.tests.gd_sl4a.lib.bt_constants import ble_address_types 29from blueberry.tests.sl4a_sl4a.lib import sl4a_sl4a_base_test 30from blueberry.tests.sl4a_sl4a.lib.security import Security 31from blueberry.utils.bt_gatt_constants import GattCallbackString 32from blueberry.utils.bt_gatt_constants import GattTransport 33from mobly import test_runner 34 35 36class IrkRotationTest(sl4a_sl4a_base_test.Sl4aSl4aBaseTestClass): 37 38 def setup_class(self): 39 super().setup_class() 40 self.default_timeout = 10 # seconds 41 42 def setup_test(self): 43 assertThat(super().setup_test()).isTrue() 44 45 def teardown_test(self): 46 current_test_dir = get_current_context().get_full_output_path() 47 self.cert.adb.pull([ 48 "/data/misc/bluetooth/logs/btsnoop_hci.log", 49 os.path.join(current_test_dir, "CERT_%s_btsnoop_hci.log" % self.cert.serial) 50 ]) 51 self.cert.adb.pull([ 52 "/data/misc/bluetooth/logs/btsnoop_hci.log.last", 53 os.path.join(current_test_dir, "CERT_%s_btsnoop_hci.log.last" % self.cert.serial) 54 ]) 55 super().teardown_test() 56 self.cert.adb.shell("setprop bluetooth.core.gap.le.privacy.enabled \'\'") 57 58 def _wait_for_event(self, expected_event_name, device): 59 try: 60 event_info = device.ed.pop_event(expected_event_name, self.default_timeout) 61 logging.info(event_info) 62 except queue.Empty as error: 63 logging.error("Failed to find event: %s", expected_event_name) 64 return False 65 return True 66 67 def __get_cert_public_address_and_irk_from_bt_config(self): 68 # Pull IRK from SL4A cert side to pass in from SL4A DUT side when scanning 69 bt_config_file_path = os.path.join(get_current_context().get_full_output_path(), 70 "DUT_%s_bt_config.conf" % self.cert.serial) 71 try: 72 self.cert.adb.pull(["/data/misc/bluedroid/bt_config.conf", bt_config_file_path]) 73 except AdbError as error: 74 logging.error("Failed to pull SL4A cert BT config") 75 return False 76 logging.debug("Reading SL4A cert BT config") 77 with io.open(bt_config_file_path) as f: 78 for line in f.readlines(): 79 stripped_line = line.strip() 80 if (stripped_line.startswith("Address")): 81 address_fields = stripped_line.split(' ') 82 # API currently requires public address to be capitalized 83 address = address_fields[2].upper() 84 logging.debug("Found cert address: %s" % address) 85 continue 86 if (stripped_line.startswith("LE_LOCAL_KEY_IRK")): 87 irk_fields = stripped_line.split(' ') 88 irk = irk_fields[2] 89 logging.debug("Found cert IRK: %s" % irk) 90 continue 91 92 return address, irk 93 94 def __test_le_reconnect_after_irk_rotation_cert_privacy_enabled(self): 95 self._test_le_reconnect_after_irk_rotation(True) 96 97 def test_le_reconnect_after_irk_rotation_cert_privacy_disabled(self): 98 self.cert.sl4a.bluetoothDisableBLE() 99 disable_bluetooth(self.cert.sl4a, self.cert.ed) 100 self.cert.adb.shell("setprop bluetooth.core.gap.le.privacy.enabled false") 101 self.cert.adb.shell("setprop log.tag.bluetooth VERBOSE") 102 enable_bluetooth(self.cert.sl4a, self.cert.ed) 103 self.cert.sl4a.bluetoothDisableBLE() 104 self._test_le_reconnect_after_irk_rotation(False) 105 106 def _bond_remote_device(self, cert_privacy_enabled, cert_public_address): 107 if cert_privacy_enabled: 108 self.cert_advertiser_.advertise_public_extended_pdu() 109 else: 110 self.cert_advertiser_.advertise_public_extended_pdu(common.PUBLIC_DEVICE_ADDRESS) 111 112 advertising_device_name = self.cert_advertiser_.get_local_advertising_name() 113 connect_address = self.dut_scanner_.scan_for_name(advertising_device_name) 114 115 # Bond 116 logging.info("Bonding with %s", connect_address) 117 self.dut_security_.create_bond_numeric_comparison(connect_address) 118 self.dut_scanner_.stop_scanning() 119 self.cert_advertiser_.stop_advertising() 120 121 return connect_address 122 123 def _test_le_reconnect_after_irk_rotation(self, cert_privacy_enabled): 124 125 cert_public_address, irk = self.__get_cert_public_address_and_irk_from_bt_config() 126 self._bond_remote_device(cert_privacy_enabled, cert_public_address) 127 128 # Remove all bonded devices to rotate the IRK 129 logging.info("Unbonding all devices") 130 self.dut_security_.remove_all_bonded_devices() 131 self.cert_security_.remove_all_bonded_devices() 132 133 # Bond again 134 logging.info("Rebonding remote device") 135 connect_address = self._bond_remote_device(cert_privacy_enabled, cert_public_address) 136 137 # Connect GATT 138 logging.info("Connecting GATT to %s", connect_address) 139 gatt_callback = self.dut.sl4a.gattCreateGattCallback() 140 bluetooth_gatt = self.dut.sl4a.gattClientConnectGatt(gatt_callback, connect_address, False, 141 GattTransport.TRANSPORT_LE, False, None) 142 assertThat(bluetooth_gatt).isNotNone() 143 expected_event_name = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback) 144 assertThat(self._wait_for_event(expected_event_name, self.dut)).isTrue() 145 146 # Close GATT connection 147 logging.info("Closing GATT connection") 148 self.dut.sl4a.gattClientClose(bluetooth_gatt) 149 150 # Reconnect GATT 151 logging.info("Reconnecting GATT") 152 gatt_callback = self.dut.sl4a.gattCreateGattCallback() 153 bluetooth_gatt = self.dut.sl4a.gattClientConnectGatt(gatt_callback, connect_address, False, 154 GattTransport.TRANSPORT_LE, False, None) 155 assertThat(bluetooth_gatt).isNotNone() 156 expected_event_name = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback) 157 assertThat(self._wait_for_event(expected_event_name, self.dut)).isTrue() 158 159 # Disconnect GATT 160 logging.info("Disconnecting GATT") 161 self.dut.sl4a.gattClientDisconnect(gatt_callback) 162 expected_event_name = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback) 163 assertThat(self._wait_for_event(expected_event_name, self.dut)).isTrue() 164 165 # Reconnect GATT 166 logging.info("Reconnecting GATT") 167 self.dut.sl4a.gattClientReconnect(gatt_callback) 168 expected_event_name = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback) 169 assertThat(self._wait_for_event(expected_event_name, self.dut)).isTrue() 170 171 172if __name__ == '__main__': 173 test_runner.main() 174