1#!/usr/bin/env python3
2#
3#   Copyright 2022 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import queue
18import logging
19
20from blueberry.tests.gd.cert.closable import Closable
21from blueberry.tests.gd.cert.truth import assertThat
22from blueberry.tests.sl4a_sl4a.lib.oob_data import OobData
23
24
25class Security:
26
27    # Events sent from SL4A
28    SL4A_EVENT_GENERATE_OOB_DATA_SUCCESS = "GeneratedOobData"
29    SL4A_EVENT_GENERATE_OOB_DATA_ERROR = "ErrorOobData"
30    SL4A_EVENT_BONDED = "Bonded"
31    SL4A_EVENT_UNBONDED = "Unbonded"
32
33    # Matches tBT_TRANSPORT
34    # Used Strings because ints were causing gRPC problems
35    TRANSPORT_AUTO = "0"
36    TRANSPORT_BREDR = "1"
37    TRANSPORT_LE = "2"
38
39    __default_timeout = 10  # seconds
40    __default_bonding_timeout = 60  # seconds
41    __device = None
42
43    def __init__(self, device):
44        self.__device = device
45        self.__device.sl4a.bluetoothStartPairingHelper(True)
46
47    # Returns a tuple formatted as <statuscode, OobData>. The OobData is
48    # populated if the statuscode is 0 (SUCCESS), else it will be None
49    def generate_oob_data(self, transport, wait_for_oob_data_callback=True):
50        logging.info("Generating local OOB data")
51        self.__device.sl4a.bluetoothGenerateLocalOobData(transport)
52
53        if wait_for_oob_data_callback is False:
54            return 0, None
55        else:
56            # Check for oob data generation success
57            try:
58                generate_success_event = self.__device.ed.pop_event(self.SL4A_EVENT_GENERATE_OOB_DATA_SUCCESS,
59                                                                    self.__default_timeout)
60            except queue.Empty as error:
61                logging.error("Failed to generate OOB data!")
62                # Check if generating oob data failed without blocking
63                try:
64                    generate_failure_event = self.__device.ed.pop_event(self.SL4A_EVENT_GENERATE_OOB_DATA_ERROR, 0)
65                except queue.Empty as error:
66                    logging.error("Failed to generate OOB Data without error code")
67                    assertThat(True).isFalse()
68
69                errorcode = generate_failure_event["data"]["Error"]
70                logging.info("Generating local oob data failed with error code %d", errorcode)
71                return errorcode, None
72
73        logging.info("OOB ADDR with Type: %s", generate_success_event["data"]["address_with_type"])
74        return 0, OobData(generate_success_event["data"]["address_with_type"],
75                          generate_success_event["data"]["confirmation"], generate_success_event["data"]["randomizer"])
76
77    def ensure_device_bonded(self):
78        bond_state = None
79        try:
80            bond_state = self.__device.ed.pop_event(self.SL4A_EVENT_BONDED, self.__default_bonding_timeout)
81        except queue.Empty as error:
82            logging.error("Failed to get bond event!")
83
84        assertThat(bond_state).isNotNone()
85        logging.info("Bonded: %s", bond_state["data"]["bonded_state"])
86        assertThat(bond_state["data"]["bonded_state"]).isEqualTo(True)
87
88    def create_bond_out_of_band(self,
89                                oob_data,
90                                bt_device_object_address=None,
91                                bt_device_object_address_type=-1,
92                                wait_for_device_bonded=True):
93        assertThat(oob_data).isNotNone()
94        oob_data_address = oob_data.to_sl4a_address()
95        oob_data_address_type = oob_data.to_sl4a_address_type()
96
97        # If a BT Device object address isn't specified, default to the oob data
98        # address and type
99        if bt_device_object_address is None:
100            bt_device_object_address = oob_data_address
101            bt_device_object_address_type = oob_data_address_type
102
103        logging.info("Bonding OOB with device addr=%s, device addr type=%s, oob addr=%s, oob addr type=%s",
104                     bt_device_object_address, bt_device_object_address_type, oob_data_address, oob_data_address_type)
105        bond_start = self.__device.sl4a.bluetoothCreateLeBondOutOfBand(
106            oob_data_address, oob_data_address_type, oob_data.confirmation, oob_data.randomizer,
107            bt_device_object_address, bt_device_object_address_type)
108        assertThat(bond_start).isTrue()
109
110        if wait_for_device_bonded:
111            self.ensure_device_bonded()
112
113    def create_bond_numeric_comparison(self, address, transport=TRANSPORT_LE, wait_for_device_bonded=True):
114        assertThat(address).isNotNone()
115        if transport == self.TRANSPORT_LE:
116            self.__device.sl4a.bluetoothLeBond(address)
117        else:
118            self.__device.sl4a.bluetoothBond(address)
119        self.ensure_device_bonded()
120
121    def remove_all_bonded_devices(self):
122        bonded_devices = self.__device.sl4a.bluetoothGetBondedDevices()
123        for device in bonded_devices:
124            logging.info(device)
125            self.remove_bond(device["address"])
126
127    def remove_bond(self, address):
128        if self.__device.sl4a.bluetoothUnbond(address):
129            bond_state = None
130            try:
131                bond_state = self.__device.ed.pop_event(self.SL4A_EVENT_UNBONDED, self.__default_timeout)
132            except queue.Empty as error:
133                logging.error("Failed to get bond event!")
134            assertThat(bond_state).isNotNone()
135            assertThat(bond_state["data"]["bonded_state"]).isEqualTo(False)
136        else:
137            logging.info("remove_bond: Bluetooth Device with address: %s does not exist", address)
138
139    def close(self):
140        self.remove_all_bonded_devices()
141        self.__device.sl4a.bluetoothStartPairingHelper(False)
142        self.__device = None
143