1#!/usr/bin/env python3
2#
3#   Copyright 2022 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import queue
18import logging
19
20from google.protobuf import empty_pb2 as empty_proto
21
22import hci_packets as hci
23
24from blueberry.tests.gd_sl4a.lib import gd_sl4a_base_test
25from blueberry.tests.gd_sl4a.lib.bt_constants import ble_scan_settings_phys
26from blueberry.tests.gd.cert.matchers import SecurityMatchers
27from blueberry.tests.gd.cert.py_le_security import PyLeSecurity
28from blueberry.tests.gd.cert.truth import assertThat
29
30from blueberry.facade import common_pb2 as common
31from blueberry.facade.hci import le_advertising_manager_facade_pb2 as le_advertising_facade
32from blueberry.facade.hci import le_initiator_address_facade_pb2 as le_initiator_address_facade
33from blueberry.facade.security.facade_pb2 import AdvertisingCallbackMsgType
34from blueberry.facade.security.facade_pb2 import BondMsgType
35from blueberry.facade.security.facade_pb2 import LeAuthRequirementsMessage
36from blueberry.facade.security.facade_pb2 import LeIoCapabilityMessage
37from blueberry.facade.security.facade_pb2 import LeOobDataPresentMessage
38from blueberry.facade.security.facade_pb2 import UiCallbackMsg
39from blueberry.facade.security.facade_pb2 import UiCallbackType
40from blueberry.facade.security.facade_pb2 import UiMsgType
41
42from mobly import test_runner
43
44LeIoCapabilities = LeIoCapabilityMessage.LeIoCapabilities
45LeOobDataFlag = LeOobDataPresentMessage.LeOobDataFlag
46
47DISPLAY_ONLY = LeIoCapabilityMessage(capabilities=LeIoCapabilities.DISPLAY_ONLY)
48
49OOB_NOT_PRESENT = LeOobDataPresentMessage(data_present=LeOobDataFlag.NOT_PRESENT)
50
51
52class OobData:
53
54    address = None
55    confirmation = None
56    randomizer = None
57
58    def __init__(self, address, confirmation, randomizer):
59        self.address = address
60        self.confirmation = confirmation
61        self.randomizer = randomizer
62
63
64class OobPairingSl4aTest(gd_sl4a_base_test.GdSl4aBaseTestClass):
65    # Events sent from SL4A
66    SL4A_EVENT_GENERATED = "GeneratedOobData"
67    SL4A_EVENT_ERROR = "ErrorOobData"
68    SL4A_EVENT_BONDED = "Bonded"
69    SL4A_EVENT_UNBONDED = "Unbonded"
70
71    # Matches tBT_TRANSPORT
72    # Used Strings because ints were causing gRPC problems
73    TRANSPORT_AUTO = "0"
74    TRANSPORT_BREDR = "1"
75    TRANSPORT_LE = "2"
76
77    def setup_class(self):
78        super().setup_class(cert_module='SECURITY')
79        self.default_timeout = 5  # seconds
80
81    def setup_test(self):
82        super().setup_test()
83        self.cert_security = PyLeSecurity(self.cert)
84
85    def teardown_test(self):
86        self.cert_security.close()
87        super().teardown_test()
88
89    def __get_test_irk(self):
90        return bytes(
91            bytearray([0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10]))
92
93    def _generate_sl4a_oob_data(self, transport):
94        logging.info("Fetching OOB data...")
95        self.dut.sl4a.bluetoothGenerateLocalOobData(transport)
96        try:
97            event_info = self.dut.ed.pop_event(self.SL4A_EVENT_GENERATED, self.default_timeout)
98        except queue.Empty as error:
99            logging.error("Failed to generate OOB data!")
100            return None
101        logging.info("Data received!")
102        logging.info(event_info["data"])
103        return OobData(event_info["data"]["address_with_type"], event_info["data"]["confirmation"],
104                       event_info["data"]["randomizer"])
105
106    def _generate_cert_oob_data(self, transport):
107        if transport == self.TRANSPORT_LE:
108            oob_data = self.cert.security.GetLeOutOfBandData(empty_proto.Empty())
109            # GetLeOutOfBandData adds null terminator to string in C code
110            # (length 17) before passing back to python via gRPC where it is
111            # converted back to bytes. Remove the null terminator for handling
112            # in python test, since length is known to be 16 for
113            # confirmation_value and random_value
114            oob_data.confirmation_value = oob_data.confirmation_value[:-1]
115            oob_data.random_value = oob_data.random_value[:-1]
116            return oob_data
117        return None
118
119    def _set_cert_privacy_policy_with_random_address_but_advertise_resolvable(self, irk):
120        # Random static address below, no random resolvable address at this point
121        random_address_bytes = "DD:34:02:05:5C:EE".encode()
122        private_policy = le_initiator_address_facade.PrivacyPolicy(
123            address_policy=le_initiator_address_facade.AddressPolicy.USE_RESOLVABLE_ADDRESS,
124            address_with_type=common.BluetoothAddressWithType(
125                address=common.BluetoothAddress(address=random_address_bytes), type=common.RANDOM_DEVICE_ADDRESS),
126            rotation_irk=irk)
127        self.cert.security.SetLeInitiatorAddressPolicy(private_policy)
128        # Bluetooth MAC address must be upper case
129        return random_address_bytes.decode('utf-8').upper()
130
131    def __advertise_rpa_random_policy(self, legacy_pdus, irk):
132        DEVICE_NAME = 'Im_The_CERT!'
133        logging.info("Getting random address")
134        ADDRESS = self._set_cert_privacy_policy_with_random_address_but_advertise_resolvable(irk)
135        logging.info("Done %s" % ADDRESS)
136
137        # Setup cert side to advertise
138        gap_name = hci.GapData(data_type=hci.GapDataType.COMPLETE_LOCAL_NAME,
139                               data=list(bytes(DEVICE_NAME, encoding='utf8')))
140        gap_data = le_advertising_facade.GapDataMsg(data=gap_name.serialize())
141        config = le_advertising_facade.AdvertisingConfig(
142            advertisement=[gap_data],
143            interval_min=512,
144            interval_max=768,
145            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
146            own_address_type=common.USE_RANDOM_DEVICE_ADDRESS,
147            channel_map=7,
148            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
149        extended_config = le_advertising_facade.ExtendedAdvertisingConfig(
150            include_tx_power=True,
151            connectable=True,
152            legacy_pdus=legacy_pdus,
153            advertising_config=config,
154            secondary_advertising_phy=ble_scan_settings_phys["1m"])
155        request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config)
156        logging.info("Creating %s PDU advertiser..." % ("Legacy" if legacy_pdus else "Extended"))
157        create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request)
158        logging.info("%s PDU advertiser created." % ("Legacy" if legacy_pdus else "Extended"))
159        return (ADDRESS, create_response)
160
161    def _advertise_rpa_random_legacy_pdu(self, irk):
162        return self.__advertise_rpa_random_policy(True, irk)
163
164    def _advertise_rpa_random_extended_pdu(self, irk):
165        return self.__advertise_rpa_random_policy(False, irk)
166
167    def _stop_advertising(self, advertiser_id):
168        logging.info("Stop advertising")
169        remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=advertiser_id)
170        self.cert.hci_le_advertising_manager.RemoveAdvertiser(remove_request)
171        logging.info("Stopped advertising")
172
173    def _wait_for_own_address(self):
174        own_address = common.BluetoothAddress()
175
176        def get_address(event):
177            if event.message_type == AdvertisingCallbackMsgType.OWN_ADDRESS_READ:
178                nonlocal own_address
179                own_address = event.address.address
180                return True
181            return False
182
183        assertThat(self.cert_security.get_advertising_callback_event_stream()).emits(get_address)
184        return own_address
185
186    def _wait_for_advertising_set_started(self):
187        advertising_started = False
188
189        def get_advertising_set_started(event):
190            if event.message_type == AdvertisingCallbackMsgType.ADVERTISING_SET_STARTED:
191                nonlocal advertising_started
192                if event.advertising_started == 1:
193                    advertising_started = True
194                return True
195            return False
196
197        assertThat(self.cert_security.get_advertising_callback_event_stream()).emits(get_advertising_set_started)
198        return advertising_started
199
200    def _wait_for_yes_no_dialog(self):
201        address_with_type = common.BluetoothAddressWithType()
202
203        def get_address_with_type(event):
204            if event.message_type == UiMsgType.DISPLAY_PAIRING_PROMPT:
205                nonlocal address_with_type
206                address_with_type = event.peer
207                return True
208            return False
209
210        assertThat(self.cert_security.get_ui_stream()).emits(get_address_with_type)
211        return address_with_type
212
213    def test_sl4a_classic_generate_oob_data(self):
214        oob_data = self._generate_sl4a_oob_data(self.TRANSPORT_BREDR)
215        logging.info("OOB data received")
216        logging.info(oob_data)
217        assertThat(oob_data).isNotNone()
218
219    def test_sl4a_classic_generate_oob_data_twice(self):
220        self.test_sl4a_classic_generate_oob_data()
221        self.test_sl4a_classic_generate_oob_data()
222
223    def test_sl4a_ble_generate_oob_data(self):
224        oob_data = self._generate_sl4a_oob_data(self.TRANSPORT_LE)
225        assertThat(oob_data).isNotNone()
226
227    def test_cert_ble_generate_oob_data(self):
228        oob_data = self._generate_cert_oob_data(self.TRANSPORT_LE)
229        assertThat(oob_data).isNotNone()
230
231    def _bond_sl4a_cert_oob(self):
232        self.cert.security.SetLeIoCapability(DISPLAY_ONLY)
233        self.cert.security.SetLeOobDataPresent(OOB_NOT_PRESENT)
234        self.cert.security.SetLeAuthRequirements(LeAuthRequirementsMessage(bond=1, mitm=1, secure_connections=1))
235
236        RANDOM_ADDRESS, create_response = self._advertise_rpa_random_extended_pdu(self.__get_test_irk())
237
238        self._wait_for_advertising_set_started()
239
240        get_own_address_request = le_advertising_facade.GetOwnAddressRequest(
241            advertiser_id=create_response.advertiser_id)
242        self.cert.hci_le_advertising_manager.GetOwnAddress(get_own_address_request)
243        advertising_address = self._wait_for_own_address()
244
245        oob_data = self._generate_cert_oob_data(self.TRANSPORT_LE)
246        assertThat(oob_data).isNotNone()
247
248        self.dut.sl4a.bluetoothCreateBondOutOfBand(
249            advertising_address.decode("utf-8").upper(), self.TRANSPORT_LE, oob_data.confirmation_value.hex(),
250            oob_data.random_value.hex())
251
252        address_with_type = self._wait_for_yes_no_dialog()
253        self.cert.security.SendUiCallback(
254            UiCallbackMsg(message_type=UiCallbackType.PAIRING_PROMPT,
255                          boolean=True,
256                          unique_id=1,
257                          address=address_with_type))
258
259        assertThat(self.cert_security.get_bond_stream()).emits(SecurityMatchers.BondMsg(BondMsgType.DEVICE_BONDED))
260
261        try:
262            bond_state = self.dut.ed.pop_event(self.SL4A_EVENT_BONDED, self.default_timeout)
263        except queue.Empty as error:
264            logging.error("Failed to get bond event!")
265
266        assertThat(bond_state).isNotNone()
267        logging.info("Bonded: %s", bond_state["data"]["bonded_state"])
268        assertThat(bond_state["data"]["bonded_state"]).isEqualTo(True)
269        self._stop_advertising(create_response.advertiser_id)
270        return (RANDOM_ADDRESS, advertising_address)
271
272    def test_sl4a_create_bond_out_of_band(self):
273        self._bond_sl4a_cert_oob()
274
275    def test_generate_oob_after_pairing(self):
276        self._bond_sl4a_cert_oob()
277        assertThat(self._generate_sl4a_oob_data(self.TRANSPORT_LE)).isNotNone()
278
279    def test_remove_bond(self):
280        self.dut.sl4a.bluetoothUnbond(self._bond_sl4a_cert_oob()[1].decode().upper())
281        bond_state = None
282        try:
283            bond_state = self.dut.ed.pop_event(self.SL4A_EVENT_UNBONDED, self.default_timeout)
284        except queue.Empty as error:
285            logging.error("Failed to get bond event!")
286
287        assertThat(bond_state).isNotNone()
288        assertThat(bond_state["data"]["bonded_state"]).isEqualTo(False)
289
290
291if __name__ == '__main__':
292    test_runner.main()
293