1#!/usr/bin/env python3
2#
3#   Copyright 2022 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import binascii
18import io
19import logging
20import os
21import queue
22
23from blueberry.facade import common_pb2 as common
24from blueberry.tests.gd.cert.context import get_current_context
25from blueberry.tests.gd.cert.truth import assertThat
26from blueberry.tests.gd_sl4a.lib.ble_lib import disable_bluetooth
27from blueberry.tests.gd_sl4a.lib.ble_lib import enable_bluetooth
28from blueberry.tests.gd_sl4a.lib.bt_constants import ble_address_types
29from blueberry.tests.sl4a_sl4a.lib import sl4a_sl4a_base_test
30from blueberry.tests.sl4a_sl4a.lib.security import Security
31from blueberry.utils.bt_gatt_constants import GattCallbackString
32from blueberry.utils.bt_gatt_constants import GattTransport
33from mobly import test_runner
34
35
36class IrkRotationTest(sl4a_sl4a_base_test.Sl4aSl4aBaseTestClass):
37
38    def setup_class(self):
39        super().setup_class()
40        self.default_timeout = 10  # seconds
41
42    def setup_test(self):
43        assertThat(super().setup_test()).isTrue()
44
45    def teardown_test(self):
46        current_test_dir = get_current_context().get_full_output_path()
47        self.cert.adb.pull([
48            "/data/misc/bluetooth/logs/btsnoop_hci.log",
49            os.path.join(current_test_dir, "CERT_%s_btsnoop_hci.log" % self.cert.serial)
50        ])
51        self.cert.adb.pull([
52            "/data/misc/bluetooth/logs/btsnoop_hci.log.last",
53            os.path.join(current_test_dir, "CERT_%s_btsnoop_hci.log.last" % self.cert.serial)
54        ])
55        super().teardown_test()
56        self.cert.adb.shell("setprop bluetooth.core.gap.le.privacy.enabled \'\'")
57
58    def _wait_for_event(self, expected_event_name, device):
59        try:
60            event_info = device.ed.pop_event(expected_event_name, self.default_timeout)
61            logging.info(event_info)
62        except queue.Empty as error:
63            logging.error("Failed to find event: %s", expected_event_name)
64            return False
65        return True
66
67    def __get_cert_public_address_and_irk_from_bt_config(self):
68        # Pull IRK from SL4A cert side to pass in from SL4A DUT side when scanning
69        bt_config_file_path = os.path.join(get_current_context().get_full_output_path(),
70                                           "DUT_%s_bt_config.conf" % self.cert.serial)
71        try:
72            self.cert.adb.pull(["/data/misc/bluedroid/bt_config.conf", bt_config_file_path])
73        except AdbError as error:
74            logging.error("Failed to pull SL4A cert BT config")
75            return False
76        logging.debug("Reading SL4A cert BT config")
77        with io.open(bt_config_file_path) as f:
78            for line in f.readlines():
79                stripped_line = line.strip()
80                if (stripped_line.startswith("Address")):
81                    address_fields = stripped_line.split(' ')
82                    # API currently requires public address to be capitalized
83                    address = address_fields[2].upper()
84                    logging.debug("Found cert address: %s" % address)
85                    continue
86                if (stripped_line.startswith("LE_LOCAL_KEY_IRK")):
87                    irk_fields = stripped_line.split(' ')
88                    irk = irk_fields[2]
89                    logging.debug("Found cert IRK: %s" % irk)
90                    continue
91
92        return address, irk
93
94    def __test_le_reconnect_after_irk_rotation_cert_privacy_enabled(self):
95        self._test_le_reconnect_after_irk_rotation(True)
96
97    def test_le_reconnect_after_irk_rotation_cert_privacy_disabled(self):
98        self.cert.sl4a.bluetoothDisableBLE()
99        disable_bluetooth(self.cert.sl4a, self.cert.ed)
100        self.cert.adb.shell("setprop bluetooth.core.gap.le.privacy.enabled false")
101        self.cert.adb.shell("setprop log.tag.bluetooth VERBOSE")
102        enable_bluetooth(self.cert.sl4a, self.cert.ed)
103        self.cert.sl4a.bluetoothDisableBLE()
104        self._test_le_reconnect_after_irk_rotation(False)
105
106    def _bond_remote_device(self, cert_privacy_enabled, cert_public_address):
107        if cert_privacy_enabled:
108            self.cert_advertiser_.advertise_public_extended_pdu()
109        else:
110            self.cert_advertiser_.advertise_public_extended_pdu(common.PUBLIC_DEVICE_ADDRESS)
111
112        advertising_device_name = self.cert_advertiser_.get_local_advertising_name()
113        connect_address = self.dut_scanner_.scan_for_name(advertising_device_name)
114
115        # Bond
116        logging.info("Bonding with %s", connect_address)
117        self.dut_security_.create_bond_numeric_comparison(connect_address)
118        self.dut_scanner_.stop_scanning()
119        self.cert_advertiser_.stop_advertising()
120
121        return connect_address
122
123    def _test_le_reconnect_after_irk_rotation(self, cert_privacy_enabled):
124
125        cert_public_address, irk = self.__get_cert_public_address_and_irk_from_bt_config()
126        self._bond_remote_device(cert_privacy_enabled, cert_public_address)
127
128        # Remove all bonded devices to rotate the IRK
129        logging.info("Unbonding all devices")
130        self.dut_security_.remove_all_bonded_devices()
131        self.cert_security_.remove_all_bonded_devices()
132
133        # Bond again
134        logging.info("Rebonding remote device")
135        connect_address = self._bond_remote_device(cert_privacy_enabled, cert_public_address)
136
137        # Connect GATT
138        logging.info("Connecting GATT to %s", connect_address)
139        gatt_callback = self.dut.sl4a.gattCreateGattCallback()
140        bluetooth_gatt = self.dut.sl4a.gattClientConnectGatt(gatt_callback, connect_address, False,
141                                                             GattTransport.TRANSPORT_LE, False, None)
142        assertThat(bluetooth_gatt).isNotNone()
143        expected_event_name = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback)
144        assertThat(self._wait_for_event(expected_event_name, self.dut)).isTrue()
145
146        # Close GATT connection
147        logging.info("Closing GATT connection")
148        self.dut.sl4a.gattClientClose(bluetooth_gatt)
149
150        # Reconnect GATT
151        logging.info("Reconnecting GATT")
152        gatt_callback = self.dut.sl4a.gattCreateGattCallback()
153        bluetooth_gatt = self.dut.sl4a.gattClientConnectGatt(gatt_callback, connect_address, False,
154                                                             GattTransport.TRANSPORT_LE, False, None)
155        assertThat(bluetooth_gatt).isNotNone()
156        expected_event_name = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback)
157        assertThat(self._wait_for_event(expected_event_name, self.dut)).isTrue()
158
159        # Disconnect GATT
160        logging.info("Disconnecting GATT")
161        self.dut.sl4a.gattClientDisconnect(gatt_callback)
162        expected_event_name = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback)
163        assertThat(self._wait_for_event(expected_event_name, self.dut)).isTrue()
164
165        # Reconnect GATT
166        logging.info("Reconnecting GATT")
167        self.dut.sl4a.gattClientReconnect(gatt_callback)
168        expected_event_name = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback)
169        assertThat(self._wait_for_event(expected_event_name, self.dut)).isTrue()
170
171
172if __name__ == '__main__':
173    test_runner.main()
174