1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import logging
18
19from bluetooth_packets_python3 import hci_packets
20from cert.closable import Closable
21from cert.closable import safeClose
22from cert.event_stream import EventStream
23from cert.truth import assertThat
24from facade import common_pb2 as common
25from google.protobuf import empty_pb2 as empty_proto
26
27from security.facade_pb2 import AuthenticationRequirements
28from security.facade_pb2 import AuthenticationRequirementsMessage
29from security.facade_pb2 import BondMsgType
30from security.facade_pb2 import SecurityPolicyMessage
31from security.facade_pb2 import IoCapabilities
32from security.facade_pb2 import IoCapabilityMessage
33from security.facade_pb2 import OobDataBondMessage
34from security.facade_pb2 import OobDataMessage
35from security.facade_pb2 import OobDataPresentMessage
36from security.facade_pb2 import UiMsgType
37from security.facade_pb2 import UiCallbackMsg
38from security.facade_pb2 import UiCallbackType
39
40
41class PySecurity(Closable):
42    """
43        Abstraction for security tasks and GRPC calls
44    """
45
46    _io_capabilities_name_lookup = {
47        IoCapabilities.DISPLAY_ONLY: "DISPLAY_ONLY",
48        IoCapabilities.DISPLAY_YES_NO_IO_CAP: "DISPLAY_YES_NO_IO_CAP",
49        IoCapabilities.KEYBOARD_ONLY: "KEYBOARD_ONLY",
50        IoCapabilities.NO_INPUT_NO_OUTPUT: "NO_INPUT_NO_OUTPUT",
51    }
52
53    _auth_reqs_name_lookup = {
54        AuthenticationRequirements.NO_BONDING: "NO_BONDING",
55        AuthenticationRequirements.NO_BONDING_MITM_PROTECTION: "NO_BONDING_MITM_PROTECTION",
56        AuthenticationRequirements.DEDICATED_BONDING: "DEDICATED_BONDING",
57        AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION: "DEDICATED_BONDING_MITM_PROTECTION",
58        AuthenticationRequirements.GENERAL_BONDING: "GENERAL_BONDING",
59        AuthenticationRequirements.GENERAL_BONDING_MITM_PROTECTION: "GENERAL_BONDING_MITM_PROTECTION",
60    }
61
62    _ui_event_stream = None
63    _bond_event_stream = None
64    _oob_data_event_stream = None
65
66    def __init__(self, device):
67        logging.info("DUT: Init")
68        self._device = device
69        self._device.wait_channel_ready()
70        self._ui_event_stream = EventStream(self._device.security.FetchUiEvents(empty_proto.Empty()))
71        self._bond_event_stream = EventStream(self._device.security.FetchBondEvents(empty_proto.Empty()))
72        self._enforce_security_policy_stream = EventStream(
73            self._device.security.FetchEnforceSecurityPolicyEvents(empty_proto.Empty()))
74        self._disconnect_event_stream = EventStream(self._device.security.FetchDisconnectEvents(empty_proto.Empty()))
75        self._oob_data_event_stream = EventStream(
76            self._device.security.FetchGetOutOfBandDataEvents(empty_proto.Empty()))
77
78    def create_bond(self, address, type):
79        """
80            Triggers stack under test to create bond
81        """
82        logging.info("DUT: Creating bond to '%s' from '%s'" % (str(address), str(self._device.address)))
83        self._device.security.CreateBond(
84            common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type))
85
86    def create_bond_out_of_band(self, address, type, p192_oob_data, p256_oob_data):
87        """
88            Triggers stack under test to create bond using Out of Band method
89        """
90
91        logging.info("DUT: Creating OOB bond to '%s' from '%s'" % (str(address), str(self._device.address)))
92
93        self._device.security.CreateBondOutOfBand(
94            OobDataBondMessage(
95                address=common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type),
96                p192_data=OobDataMessage(
97                    address=common.BluetoothAddressWithType(
98                        address=common.BluetoothAddress(address=address), type=type),
99                    confirmation_value=bytes(bytearray(p192_oob_data[0])),
100                    random_value=bytes(bytearray(p192_oob_data[1]))),
101                p256_data=OobDataMessage(
102                    address=common.BluetoothAddressWithType(
103                        address=common.BluetoothAddress(address=address), type=type),
104                    confirmation_value=bytes(bytearray(p256_oob_data[0])),
105                    random_value=bytes(bytearray(p256_oob_data[1])))))
106
107    def remove_bond(self, address, type):
108        """
109            Removes bond from stack under test
110        """
111        self._device.security.RemoveBond(
112            common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type))
113
114    def set_io_capabilities(self, io_capabilities):
115        """
116            Set the IO Capabilities used for the DUT
117        """
118        logging.info("DUT: setting IO Capabilities data to '%s'" % self._io_capabilities_name_lookup.get(
119            io_capabilities, "ERROR"))
120        self._device.security.SetIoCapability(IoCapabilityMessage(capability=io_capabilities))
121
122    def set_authentication_requirements(self, auth_reqs):
123        """
124            Establish authentication requirements for the stack
125        """
126        logging.info("DUT: setting Authentication Requirements data to '%s'" % self._auth_reqs_name_lookup.get(
127            auth_reqs, "ERROR"))
128        self._device.security.SetAuthenticationRequirements(AuthenticationRequirementsMessage(requirement=auth_reqs))
129
130    def __send_ui_callback(self, address, callback_type, b, uid, pin):
131        """
132            Send a callback from the UI as if the user pressed a button on the dialog
133        """
134        logging.info("DUT: Sending user input response uid: %d; response: %s" % (uid, b))
135        self._device.security.SendUiCallback(
136            UiCallbackMsg(
137                message_type=callback_type,
138                boolean=b,
139                unique_id=uid,
140                pin=bytes(pin),
141                address=common.BluetoothAddressWithType(
142                    address=common.BluetoothAddress(address=address),
143                    type=common.BluetoothAddressTypeEnum.PUBLIC_DEVICE_ADDRESS)))
144
145    def enable_secure_simple_pairing(self):
146        """
147            This is called when you want to enable SSP for testing
148            Since the stack under test already enables it by default
149            we do not need to do anything here for the time being
150        """
151        pass
152
153    def enable_secure_connections(self):
154        pass
155
156    def accept_pairing(self, cert_address, reply_boolean):
157        """
158            Here we pass, but in cert we perform pairing flow tasks.
159            This was added here in order to be more dynamic, but the stack
160            under test will handle the pairing flow.
161        """
162        pass
163
164    def accept_oob_pairing(self, cert_address, reply_boolean):
165        """
166            Here we pass, but in cert we perform pairing flow tasks.
167            This was added here in order to be more dynamic, but the stack
168            under test will handle the pairing flow.
169        """
170        pass
171
172    def wait_for_passkey(self, cert_address):
173        """
174            Respond to the UI event
175        """
176        passkey = -1
177
178        def get_passkey(event):
179            if event.message_type == UiMsgType.DISPLAY_PASSKEY:
180                nonlocal passkey
181                passkey = event.numeric_value
182                return True
183            return False
184
185        logging.info("DUT: Waiting for expected UI event")
186        assertThat(self._ui_event_stream).emits(get_passkey)
187        return passkey
188
189    def input_pin(self, cert_address, pin):
190        """
191            Respond to the UI event
192        """
193        logging.info("DUT: Inputting pin code: %s" % str(pin))
194        self.on_user_input(
195            cert_address=cert_address, reply_boolean=True, expected_ui_event=UiMsgType.DISPLAY_PIN_ENTRY, pin=pin)
196
197    def on_user_input(self, cert_address, reply_boolean, expected_ui_event, pin=[]):
198        """
199            Respond to the UI event
200        """
201        if expected_ui_event is None:
202            return
203
204        ui_id = -1
205
206        def get_unique_id(event):
207            if event.message_type == expected_ui_event:
208                nonlocal ui_id
209                ui_id = event.unique_id
210                return True
211            return False
212
213        logging.info("DUT: Waiting for expected UI event")
214        assertThat(self._ui_event_stream).emits(get_unique_id)
215        callback_type = UiCallbackType.YES_NO if len(pin) == 0 else UiCallbackType.PIN
216        self.__send_ui_callback(cert_address, callback_type, reply_boolean, ui_id, pin)
217
218    def get_address(self):
219        return self._device.address
220
221    def wait_for_bond_event(self, expected_bond_event):
222        """
223            A bond event will be triggered once the bond process
224            is complete.  For the DUT we need to wait for it,
225            for Cert it isn't needed.
226        """
227        logging.info("DUT: Waiting for Bond Event: %s " % expected_bond_event)
228        assertThat(self._bond_event_stream).emits(
229            lambda event: event.message_type == expected_bond_event or logging.info("DUT: Actual Bond Event: %s" % event.message_type)
230        )
231
232    def wait_for_enforce_security_event(self, expected_enforce_security_event):
233        """
234            We expect a 'True' or 'False' from the enforce security call
235
236            This interface will allow the caller to wait for a callback
237            result from enforcing security policy over the facade.
238        """
239        logging.info("DUT: Waiting for enforce security event")
240        assertThat(self._enforce_security_policy_stream).emits(
241            lambda event: event.result == expected_enforce_security_event or logging.info(event.result))
242
243    def wait_for_disconnect_event(self):
244        """
245            The Address is expected to be returned
246        """
247        logging.info("DUT: Waiting for Disconnect Event")
248        assertThat(self._disconnect_event_stream).emits(lambda event: logging.info("event: %s" % event.address) or True)
249
250    def enforce_security_policy(self, address, type, policy):
251        """
252            Call to enforce classic security policy
253        """
254        self._device.security.EnforceSecurityPolicy(
255            SecurityPolicyMessage(
256                address=common.BluetoothAddressWithType(address=common.BluetoothAddress(address=address), type=type),
257                policy=policy))
258
259    def get_oob_data_from_controller(self, oob_data_present):
260        self._device.security.GetOutOfBandData(empty_proto.Empty())
261        oob_data = []
262
263        def get_oob_data(event):
264            nonlocal oob_data
265            oob_data = [
266                list(event.p192_data.confirmation_value),
267                list(event.p192_data.random_value), [0 for i in range(0, 16)], [0 for i in range(0, 16)]
268            ]
269            return True
270
271        assertThat(self._oob_data_event_stream).emits(get_oob_data)
272        return oob_data
273
274    def close(self):
275        safeClose(self._ui_event_stream)
276        safeClose(self._bond_event_stream)
277        safeClose(self._enforce_security_policy_stream)
278        safeClose(self._disconnect_event_stream)
279        safeClose(self._oob_data_event_stream)
280