1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import logging
18
19from blueberry.tests.gd.cert.captures import HciCaptures
20from blueberry.tests.gd.cert.closable import safeClose
21from blueberry.tests.gd.cert.event_stream import EventStream
22from blueberry.tests.gd.cert.matchers import HciMatchers
23from blueberry.tests.gd.cert.py_hci import PyHci
24from blueberry.tests.gd.cert.py_security import PySecurity
25from blueberry.tests.gd.cert.truth import assertThat
26from google.protobuf import empty_pb2 as empty_proto
27from blueberry.facade.l2cap.classic import facade_pb2 as l2cap_facade
28from blueberry.facade.security.facade_pb2 import IoCapabilities
29from blueberry.facade.security.facade_pb2 import AuthenticationRequirements
30from blueberry.facade.security.facade_pb2 import OobDataPresent
31from blueberry.utils import bluetooth
32import hci_packets as hci
33
34
35class CertSecurity(PySecurity):
36    """
37        Contain all of the certification stack logic for sending and receiving
38        HCI commands following the Classic Pairing flows.
39    """
40    _io_cap_lookup = {
41        IoCapabilities.DISPLAY_ONLY: hci.IoCapability.DISPLAY_ONLY,
42        IoCapabilities.DISPLAY_YES_NO_IO_CAP: hci.IoCapability.DISPLAY_YES_NO,
43        IoCapabilities.KEYBOARD_ONLY: hci.IoCapability.KEYBOARD_ONLY,
44        IoCapabilities.NO_INPUT_NO_OUTPUT: hci.IoCapability.NO_INPUT_NO_OUTPUT,
45    }
46
47    _auth_req_lookup = {
48        AuthenticationRequirements.NO_BONDING:
49            hci.AuthenticationRequirements.NO_BONDING,
50        AuthenticationRequirements.NO_BONDING_MITM_PROTECTION:
51            hci.AuthenticationRequirements.NO_BONDING_MITM_PROTECTION,
52        AuthenticationRequirements.DEDICATED_BONDING:
53            hci.AuthenticationRequirements.DEDICATED_BONDING,
54        AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION:
55            hci.AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION,
56        AuthenticationRequirements.GENERAL_BONDING:
57            hci.AuthenticationRequirements.GENERAL_BONDING,
58        AuthenticationRequirements.GENERAL_BONDING_MITM_PROTECTION:
59            hci.AuthenticationRequirements.GENERAL_BONDING_MITM_PROTECTION,
60    }
61
62    _oob_present_lookup = {
63        OobDataPresent.NOT_PRESENT: hci.OobDataPresent.NOT_PRESENT,
64        OobDataPresent.P192_PRESENT: hci.OobDataPresent.P_192_PRESENT,
65        OobDataPresent.P256_PRESENT: hci.OobDataPresent.P_256_PRESENT,
66        OobDataPresent.P192_AND_256_PRESENT: hci.OobDataPresent.P_192_AND_256_PRESENT,
67    }
68
69    _hci_event_stream = None
70    _io_caps = hci.IoCapability.DISPLAY_ONLY
71    _auth_reqs = hci.AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION
72    _secure_connections_enabled = False
73
74    _hci = None
75
76    MAX_PIN_LENGTH = 16
77    MIN_PIN_LENGTH = 1
78
79    def _enqueue_hci_command(self, command, expect_complete):
80        if (expect_complete):
81            self._hci.send_command(command)
82        else:
83            self._hci.send_command(command)
84
85    def __init__(self, device):
86        """
87            Don't call super b/c the gRPC stream setup will crash test
88        """
89        logging.info("Cert: Init")
90        self._device = device
91        self._device.wait_channel_ready()
92        self._hci = PyHci(device)
93        self._hci.register_for_events(hci.EventCode.ENCRYPTION_CHANGE,
94                                      hci.EventCode.CHANGE_CONNECTION_LINK_KEY_COMPLETE,
95                                      hci.EventCode.CENTRAL_LINK_KEY_COMPLETE, hci.EventCode.RETURN_LINK_KEYS,
96                                      hci.EventCode.PIN_CODE_REQUEST, hci.EventCode.LINK_KEY_REQUEST,
97                                      hci.EventCode.LINK_KEY_NOTIFICATION,
98                                      hci.EventCode.ENCRYPTION_KEY_REFRESH_COMPLETE,
99                                      hci.EventCode.IO_CAPABILITY_REQUEST, hci.EventCode.IO_CAPABILITY_RESPONSE,
100                                      hci.EventCode.REMOTE_OOB_DATA_REQUEST, hci.EventCode.SIMPLE_PAIRING_COMPLETE,
101                                      hci.EventCode.USER_PASSKEY_NOTIFICATION, hci.EventCode.KEYPRESS_NOTIFICATION,
102                                      hci.EventCode.USER_CONFIRMATION_REQUEST, hci.EventCode.USER_PASSKEY_REQUEST)
103        self._hci_event_stream = self._hci.get_event_stream()
104
105    def create_bond(self, address, type):
106        """
107            Creates a bond from the cert perspective
108        """
109        logging.info("Cert: Creating bond to '%s' from '%s'" % (str(address), str(self._device.address)))
110        # TODO(optedoblivion): Trigger connection to Send AuthenticationRequested
111
112    def remove_bond(self, address, type):
113        """
114            We store the link key locally in the test and pretend
115            So to remove_bond we need to Remove the "stored" data
116        """
117        pass
118
119    def set_io_capabilities(self, io_capabilities):
120        """
121            Set the IO Capabilities used for the cert
122        """
123        logging.info("Cert: setting IO Capabilities data to '%s'" %
124                     self._io_capabilities_name_lookup.get(io_capabilities, "ERROR"))
125        self._io_caps = self._io_cap_lookup.get(io_capabilities, hci.IoCapability.DISPLAY_ONLY)
126
127    def set_authentication_requirements(self, auth_reqs):
128        """
129            Establish authentication requirements for the stack
130        """
131        logging.info("Cert: setting Authentication Requirements data to '%s'" %
132                     self._auth_reqs_name_lookup.get(auth_reqs, "ERROR"))
133        self._auth_reqs = self._auth_req_lookup.get(auth_reqs, hci.AuthenticationRequirements.GENERAL_BONDING)
134
135    def get_oob_data_from_controller(self, pb_oob_data_type):
136        """
137            Get the Out-of-band data for SSP pairing
138
139            :param pb_oob_data_type: Type of data needed
140            :return: a tuple of bytes (192c,192r,256c,256r) with increasing security; bytes may be all 0s depending on pb_oob_data_type value
141
142        """
143        oob_data_type = self._oob_present_lookup[pb_oob_data_type]
144
145        if (oob_data_type == hci.OobDataPresent.NOT_PRESENT):
146            logging.warn("No data present, no need to call get_oob_data")
147            return ([0 for i in range(0, 16)], [0 for i in range(0, 16)], [0 for i in range(0, 16)],
148                    [0 for i in range(0, 16)])
149
150        logging.info("Cert: Requesting OOB data")
151        if oob_data_type == hci.OobDataPresent.P_192_PRESENT:
152            # If host and controller supports secure connections we always used ReadLocalOobExtendedDataRequest
153            if self._secure_connections_enabled:
154                logging.info("Cert: Requesting P192 Data; secure connections")
155                complete_capture = HciCaptures.ReadLocalOobExtendedDataCompleteCapture()
156                self._enqueue_hci_command(hci.ReadLocalOobExtendedData(), True)
157                logging.info("Cert: Waiting for OOB response from controller")
158                assertThat(self._hci_event_stream).emits(complete_capture)
159                complete = complete_capture.get()
160                return (list(complete.c192), list(complete.r192), [0 for i in range(0, 16)], [0 for i in range(0, 16)])
161            # else we use ReadLocalDataRequest
162            else:
163                logging.info("Cert: Requesting P192 Data; no secure connections")
164                complete_capture = HciCaptures.ReadLocalOobDataCompleteCapture()
165                self._enqueue_hci_command(hci.ReadLocalOobData(), True)
166                logging.info("Cert: Waiting for OOB response from controller")
167                assertThat(self._hci_event_stream).emits(complete_capture)
168                complete = complete_capture.get()
169                return (list(complete.c), list(complete.r), [0 for i in range(0, 16)], [0 for i in range(0, 16)])
170
171        # Must be secure connection compatible to use these
172        elif oob_data_type == hci.OobDataPresent.P_256_PRESENT:
173            logging.info("Cert: Requesting P256 Extended Data; secure connections")
174            complete_capture = HciCaptures.ReadLocalOobExtendedDataCompleteCapture()
175            self._enqueue_hci_command(hci.ReadLocalOobExtendedData(), True)
176            logging.info("Cert: Waiting for OOB response from controller")
177            assertThat(self._hci_event_stream).emits(complete_capture)
178            complete = complete_capture.get()
179            return ([0 for i in range(0, 16)], [0 for i in range(0, 16)], list(complete.c256), list(complete.r256))
180
181        else:  # Both
182            logging.info("Cert: Requesting P192 AND P256 Extended Data; secure connections")
183            complete_capture = HciCaptures.ReadLocalOobExtendedDataCompleteCapture()
184            self._enqueue_hci_command(hci.ReadLocalOobExtendedData(), True)
185            logging.info("Cert: Waiting for OOB response from controller")
186            assertThat(self._hci_event_stream).emits(complete_capture)
187            complete = complete_capture.get()
188            return (list(complete.c192), list(complete.r192), list(complete.c256), list(complete.r256))
189
190    def input_passkey(self, address, passkey):
191        """
192            Pretend to answer the pairing dialog as a user
193        """
194        logging.info("Cert: Waiting for PASSKEY request")
195        assertThat(self._hci_event_stream).emits(HciMatchers.EventWithCode(hci.EventCode.USER_PASSKEY_REQUEST))
196        logging.info("Cert: Send user input passkey %d for %s" % (passkey, address))
197        peer = bluetooth.Address(address)
198        self._enqueue_hci_command(
199            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.ENTRY_STARTED),
200            True)
201        self._enqueue_hci_command(
202            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED),
203            True)
204        self._enqueue_hci_command(
205            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED),
206            True)
207        self._enqueue_hci_command(
208            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.CLEARED), True)
209        self._enqueue_hci_command(
210            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED),
211            True)
212        self._enqueue_hci_command(
213            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED),
214            True)
215        self._enqueue_hci_command(
216            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ERASED),
217            True)
218        self._enqueue_hci_command(
219            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED),
220            True)
221        self._enqueue_hci_command(
222            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED),
223            True)
224        self._enqueue_hci_command(
225            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED),
226            True)
227        self._enqueue_hci_command(
228            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED),
229            True)
230        self._enqueue_hci_command(
231            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.DIGIT_ENTERED),
232            True)
233        self._enqueue_hci_command(
234            hci.SendKeypressNotification(bd_addr=peer, notification_type=hci.KeypressNotificationType.ENTRY_COMPLETED),
235            True)
236        self._enqueue_hci_command(hci.UserPasskeyRequestReply(bd_addr=peer, numeric_value=passkey), True)
237
238    def input_pin(self, address, pin):
239        """
240            Pretend to answer the pairing dialog as a user
241        """
242
243        if len(pin) > self.MAX_PIN_LENGTH or len(pin) < self.MIN_PIN_LENGTH:
244            raise Exception("Pin code must be within range")
245
246        logging.info("Cert: Waiting for PIN request")
247        assertThat(self._hci_event_stream).emits(HciMatchers.PinCodeRequest())
248        logging.info("Cert: Send user input PIN %s for %s" % (pin.decode(), address))
249        peer = address.decode('utf-8')
250        pin_list = list(pin)
251        # Pad
252        for i in range(self.MAX_PIN_LENGTH - len(pin_list)):
253            pin_list.append(0)
254        self._enqueue_hci_command(
255            hci.PinCodeRequestReply(bd_addr=bluetooth.Address(peer), pin_code_length=len(pin), pin_code=pin_list), True)
256
257    def __send_ui_callback(self, address, callback_type, b, uid, pin):
258        """
259            Pretend to answer the pairing dailog as a user
260        """
261        logging.info("Cert: Send user input callback uid:%d; response: %s" % (uid, b))
262        # TODO(optedoblivion): Make callback and set it to the module
263
264    def enable_secure_simple_pairing(self):
265        """
266            This is called when you want to enable SSP for testing
267        """
268        logging.info("Cert: Sending WRITE_SIMPLE_PAIRING_MODE [True]")
269        self._enqueue_hci_command(hci.WriteSimplePairingMode(simple_pairing_mode=hci.Enable.ENABLED), True)
270        logging.info("Cert: Waiting for controller response")
271        assertThat(self._hci_event_stream).emits(HciMatchers.CommandComplete(hci.OpCode.WRITE_SIMPLE_PAIRING_MODE))
272
273    def enable_secure_connections(self):
274        """
275            This is called when you want to enable secure connections support
276        """
277        logging.info("Cert: Sending WRITE_SECURE_CONNECTIONS_HOST_SUPPORT [True]")
278        self._enqueue_hci_command(
279            hci.WriteSecureConnectionsHostSupport(secure_connections_host_support=hci.Enable.ENABLED), True)
280        logging.info("Cert: Waiting for controller response")
281        assertThat(self._hci_event_stream).emits(
282            HciMatchers.CommandComplete(hci.OpCode.WRITE_SECURE_CONNECTIONS_HOST_SUPPORT))
283        # TODO(optedoblivion): Figure this out and remove (see classic_pairing_handler.cc)
284        #self._secure_connections_enabled = True
285
286    def send_io_caps(self, address):
287        logging.info("Cert: Waiting for IO_CAPABILITY_REQUEST")
288        assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityRequest())
289        logging.info("Cert: Sending IO_CAPABILITY_REQUEST_REPLY")
290        oob_data_present = hci.OobDataPresent.NOT_PRESENT
291        self._enqueue_hci_command(
292            hci.IoCapabilityRequestReply(bd_addr=bluetooth.Address(address),
293                                         io_capability=self._io_caps,
294                                         oob_present=oob_data_present,
295                                         authentication_requirements=self._auth_reqs), True)
296
297    def accept_pairing(self, dut_address, reply_boolean, expect_to_fail, on_responder_reply):
298        """
299            Here we handle the pairing events at the HCI level
300        """
301        logging.info("Cert: Waiting for IO_CAPABILITY_RESPONSE")
302        assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityResponse())
303        self.send_io_caps(dut_address)
304        logging.info("Cert: Waiting for USER_CONFIRMATION_REQUEST")
305        assertThat(self._hci_event_stream).emits(HciMatchers.UserConfirmationRequest())
306        logging.info("Cert: Sending Simulated User Response '%s'" % reply_boolean)
307        if reply_boolean:
308            logging.info("Cert: Sending USER_CONFIRMATION_REQUEST_REPLY")
309            self._enqueue_hci_command(hci.UserConfirmationRequestReply(bd_addr=bluetooth.Address(dut_address)), True)
310            on_responder_reply()
311            logging.info("Cert: Waiting for SIMPLE_PAIRING_COMPLETE")
312            assertThat(self._hci_event_stream).emits(HciMatchers.SimplePairingComplete())
313            if not expect_to_fail:
314                logging.info("Cert: Waiting for LINK_KEY_NOTIFICATION")
315                assertThat(self._hci_event_stream).emits(HciMatchers.LinkKeyNotification())
316        else:
317            logging.info("Cert: Sending USER_CONFIRMATION_REQUEST_NEGATIVE_REPLY")
318            self._enqueue_hci_command(hci.UserConfirmationRequestNegativeReply(bd_addr=bluetooth.Address(dut_address)),
319                                      True)
320            on_responder_reply()
321            logging.info("Cert: Waiting for SIMPLE_PAIRING_COMPLETE")
322            assertThat(self._hci_event_stream).emits(HciMatchers.SimplePairingComplete())
323
324    def accept_oob_pairing(self, dut_address):
325        logging.info("Cert: Waiting for IO_CAPABILITY_RESPONSE")
326        assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityResponse())
327        self.send_io_caps(dut_address)
328        logging.info("Cert: Waiting for SIMPLE_PAIRING_COMPLETE")
329        ssp_complete_capture = HciCaptures.SimplePairingCompleteCapture()
330        assertThat(self._hci_event_stream).emits(ssp_complete_capture)
331        ssp_complete = ssp_complete_capture.get()
332        logging.info(ssp_complete.status)
333        assertThat(ssp_complete.status).isEqualTo(hci.ErrorCode.SUCCESS)
334
335    def on_user_input(self, dut_address, reply_boolean, expected_ui_event):
336        """
337            Cert doesn't need the test to respond to the ui event
338            Cert responds in accept pairing
339        """
340        pass
341
342    def wait_for_bond_event(self, expected_bond_event):
343        """
344            A bond event will be triggered once the bond process
345            is complete.  For the DUT we need to wait for it,
346            for Cert it isn't needed.
347        """
348        pass
349
350    def enforce_security_policy(self, address, type, policy):
351        """
352            Pass for now
353        """
354        pass
355
356    def wait_for_enforce_security_event(self, expected_enforce_security_event):
357        """
358            Cert side needs to pass
359        """
360        pass
361
362    def wait_for_disconnect_event(self):
363        """
364            Cert side needs to pass
365        """
366        pass
367        # FIXME: Gabeldorsche facade don't allow us to register for an DISCONNECT_COMPLETE event
368        # logging.info("Cert: Waiting for DISCONNECT_COMPLETE")
369        # assertThat(self._hci_event_stream).emits(HciMatchers.DisconnectionComplete())
370
371    def close(self):
372        safeClose(self._hci)
373