1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import logging
18
19from bluetooth_packets_python3 import hci_packets
20from cert.captures import HciCaptures
21from cert.closable import safeClose
22from cert.event_stream import EventStream
23from cert.matchers import HciMatchers
24from cert.py_hci import PyHci
25from cert.py_security import PySecurity
26from cert.truth import assertThat
27from datetime import datetime
28from google.protobuf import empty_pb2 as empty_proto
29from l2cap.classic import facade_pb2 as l2cap_facade
30from security.facade_pb2 import IoCapabilities
31from security.facade_pb2 import AuthenticationRequirements
32from security.facade_pb2 import OobDataPresent
33
34
35class CertSecurity(PySecurity):
36    """
37        Contain all of the certification stack logic for sending and receiving
38        HCI commands following the Classic Pairing flows.
39    """
40    _io_cap_lookup = {
41        IoCapabilities.DISPLAY_ONLY: hci_packets.IoCapability.DISPLAY_ONLY,
42        IoCapabilities.DISPLAY_YES_NO_IO_CAP: hci_packets.IoCapability.DISPLAY_YES_NO,
43        IoCapabilities.KEYBOARD_ONLY: hci_packets.IoCapability.KEYBOARD_ONLY,
44        IoCapabilities.NO_INPUT_NO_OUTPUT: hci_packets.IoCapability.NO_INPUT_NO_OUTPUT,
45    }
46
47    _auth_req_lookup = {
48        AuthenticationRequirements.NO_BONDING:
49        hci_packets.AuthenticationRequirements.NO_BONDING,
50        AuthenticationRequirements.NO_BONDING_MITM_PROTECTION:
51        hci_packets.AuthenticationRequirements.NO_BONDING_MITM_PROTECTION,
52        AuthenticationRequirements.DEDICATED_BONDING:
53        hci_packets.AuthenticationRequirements.DEDICATED_BONDING,
54        AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION:
55        hci_packets.AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION,
56        AuthenticationRequirements.GENERAL_BONDING:
57        hci_packets.AuthenticationRequirements.GENERAL_BONDING,
58        AuthenticationRequirements.GENERAL_BONDING_MITM_PROTECTION:
59        hci_packets.AuthenticationRequirements.GENERAL_BONDING_MITM_PROTECTION,
60    }
61
62    _oob_present_lookup = {
63        OobDataPresent.NOT_PRESENT: hci_packets.OobDataPresent.NOT_PRESENT,
64        OobDataPresent.P192_PRESENT: hci_packets.OobDataPresent.P_192_PRESENT,
65        OobDataPresent.P256_PRESENT: hci_packets.OobDataPresent.P_256_PRESENT,
66        OobDataPresent.P192_AND_256_PRESENT: hci_packets.OobDataPresent.P_192_AND_256_PRESENT,
67    }
68
69    _hci_event_stream = None
70    _io_caps = hci_packets.IoCapability.DISPLAY_ONLY
71    _auth_reqs = hci_packets.AuthenticationRequirements.DEDICATED_BONDING_MITM_PROTECTION
72    _secure_connections_enabled = False
73
74    _hci = None
75
76    MAX_PIN_LENGTH = 16
77    MIN_PIN_LENGTH = 1
78
79    def _enqueue_hci_command(self, command, expect_complete):
80        if (expect_complete):
81            self._hci.send_command(command)
82        else:
83            self._hci.send_command(command)
84
85    def __init__(self, device):
86        """
87            Don't call super b/c the gRPC stream setup will crash test
88        """
89        logging.info("Cert: Init")
90        self._device = device
91        self._device.wait_channel_ready()
92        self._hci = PyHci(device)
93        self._hci.register_for_events(
94            hci_packets.EventCode.ENCRYPTION_CHANGE, hci_packets.EventCode.CHANGE_CONNECTION_LINK_KEY_COMPLETE,
95            hci_packets.EventCode.CENTRAL_LINK_KEY_COMPLETE, hci_packets.EventCode.RETURN_LINK_KEYS,
96            hci_packets.EventCode.PIN_CODE_REQUEST, hci_packets.EventCode.LINK_KEY_REQUEST,
97            hci_packets.EventCode.LINK_KEY_NOTIFICATION, hci_packets.EventCode.ENCRYPTION_KEY_REFRESH_COMPLETE,
98            hci_packets.EventCode.IO_CAPABILITY_REQUEST, hci_packets.EventCode.IO_CAPABILITY_RESPONSE,
99            hci_packets.EventCode.REMOTE_OOB_DATA_REQUEST, hci_packets.EventCode.SIMPLE_PAIRING_COMPLETE,
100            hci_packets.EventCode.USER_PASSKEY_NOTIFICATION, hci_packets.EventCode.KEYPRESS_NOTIFICATION,
101            hci_packets.EventCode.USER_CONFIRMATION_REQUEST, hci_packets.EventCode.USER_PASSKEY_REQUEST,
102            hci_packets.EventCode.REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION)
103        self._hci_event_stream = self._hci.get_event_stream()
104
105    def create_bond(self, address, type):
106        """
107            Creates a bond from the cert perspective
108        """
109        logging.info("Cert: Creating bond to '%s' from '%s'" % (str(address), str(self._device.address)))
110        # TODO(optedoblivion): Trigger connection to Send AuthenticationRequested
111
112    def remove_bond(self, address, type):
113        """
114            We store the link key locally in the test and pretend
115            So to remove_bond we need to Remove the "stored" data
116        """
117        pass
118
119    def set_io_capabilities(self, io_capabilities):
120        """
121            Set the IO Capabilities used for the cert
122        """
123        logging.info("Cert: setting IO Capabilities data to '%s'" % self._io_capabilities_name_lookup.get(
124            io_capabilities, "ERROR"))
125        self._io_caps = self._io_cap_lookup.get(io_capabilities, hci_packets.IoCapability.DISPLAY_ONLY)
126
127    def set_authentication_requirements(self, auth_reqs):
128        """
129            Establish authentication requirements for the stack
130        """
131        logging.info("Cert: setting Authentication Requirements data to '%s'" % self._auth_reqs_name_lookup.get(
132            auth_reqs, "ERROR"))
133        self._auth_reqs = self._auth_req_lookup.get(auth_reqs, hci_packets.AuthenticationRequirements.GENERAL_BONDING)
134
135    def get_oob_data_from_controller(self, pb_oob_data_type):
136        """
137            Get the Out-of-band data for SSP pairing
138
139            :param pb_oob_data_type: Type of data needed
140            :return: a tuple of bytes (192c,192r,256c,256r) with increasing security; bytes may be all 0s depending on pb_oob_data_type value
141
142        """
143        oob_data_type = self._oob_present_lookup[pb_oob_data_type]
144
145        if (oob_data_type == hci_packets.OobDataPresent.NOT_PRESENT):
146            logging.warn("No data present, no need to call get_oob_data")
147            return ([0 for i in range(0, 16)], [0 for i in range(0, 16)], [0 for i in range(0, 16)],
148                    [0 for i in range(0, 16)])
149
150        logging.info("Cert: Requesting OOB data")
151        if oob_data_type == hci_packets.OobDataPresent.P_192_PRESENT:
152            # If host and controller supports secure connections we always used ReadLocalOobExtendedDataRequest
153            if self._secure_connections_enabled:
154                logging.info("Cert: Requesting P192 Data; secure connections")
155                complete_capture = HciCaptures.ReadLocalOobExtendedDataCompleteCapture()
156                self._enqueue_hci_command(hci_packets.ReadLocalOobExtendedDataBuilder(), True)
157                logging.info("Cert: Waiting for OOB response from controller")
158                assertThat(self._hci_event_stream).emits(complete_capture)
159                command_complete = complete_capture.get()
160                complete = hci_packets.ReadLocalOobExtendedDataCompleteView(command_complete)
161                return (list(complete.GetC192()), list(complete.GetR192()), [0 for i in range(0, 16)],
162                        [0 for i in range(0, 16)])
163            # else we use ReadLocalDataRequest
164            else:
165                logging.info("Cert: Requesting P192 Data; no secure connections")
166                complete_capture = HciCaptures.ReadLocalOobDataCompleteCapture()
167                self._enqueue_hci_command(hci_packets.ReadLocalOobDataBuilder(), True)
168                logging.info("Cert: Waiting for OOB response from controller")
169                assertThat(self._hci_event_stream).emits(complete_capture)
170                command_complete = complete_capture.get()
171                complete = hci_packets.ReadLocalOobDataCompleteView(command_complete)
172                return (list(complete.GetC()), list(complete.GetR()), [0 for i in range(0, 16)],
173                        [0 for i in range(0, 16)])
174
175        # Must be secure connection compatible to use these
176        elif oob_data_type == hci_packets.OobDataPresent.P_256_PRESENT:
177            logging.info("Cert: Requesting P256 Extended Data; secure connections")
178            complete_capture = HciCaptures.ReadLocalOobExtendedDataCompleteCapture()
179            self._enqueue_hci_command(hci_packets.ReadLocalOobExtendedDataBuilder(), True)
180            logging.info("Cert: Waiting for OOB response from controller")
181            assertThat(self._hci_event_stream).emits(complete_capture)
182            command_complete = complete_capture.get()
183            complete = hci_packets.ReadLocalOobExtendedDataCompleteView(command_complete)
184            return ([0 for i in range(0, 16)], [0 for i in range(0, 16)], list(complete.GetC256()),
185                    list(complete.GetR256()))
186
187        else:  # Both
188            logging.info("Cert: Requesting P192 AND P256 Extended Data; secure connections")
189            complete_capture = HciCaptures.ReadLocalOobExtendedDataCompleteCapture()
190            self._enqueue_hci_command(hci_packets.ReadLocalOobExtendedDataBuilder(), True)
191            logging.info("Cert: Waiting for OOB response from controller")
192            assertThat(self._hci_event_stream).emits(complete_capture)
193            command_complete = complete_capture.get()
194            complete = hci_packets.ReadLocalOobExtendedDataCompleteView(command_complete)
195            return (list(complete.GetC192()), list(complete.GetR192()), list(complete.GetC256()),
196                    list(complete.GetR256()))
197
198    def input_passkey(self, address, passkey):
199        """
200            Pretend to answer the pairing dialog as a user
201        """
202        logging.info("Cert: Waiting for PASSKEY request")
203        assertThat(self._hci_event_stream).emits(HciMatchers.EventWithCode(hci_packets.EventCode.USER_PASSKEY_REQUEST))
204        logging.info("Cert: Send user input passkey %d for %s" % (passkey, address))
205        peer = address.decode('utf-8')
206        self._enqueue_hci_command(
207            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.ENTRY_STARTED), True)
208        self._enqueue_hci_command(
209            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
210        self._enqueue_hci_command(
211            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
212        self._enqueue_hci_command(
213            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.CLEARED), True)
214        self._enqueue_hci_command(
215            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
216        self._enqueue_hci_command(
217            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
218        self._enqueue_hci_command(
219            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ERASED), True)
220        self._enqueue_hci_command(
221            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
222        self._enqueue_hci_command(
223            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
224        self._enqueue_hci_command(
225            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
226        self._enqueue_hci_command(
227            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
228        self._enqueue_hci_command(
229            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.DIGIT_ENTERED), True)
230        self._enqueue_hci_command(
231            hci_packets.SendKeypressNotificationBuilder(peer, hci_packets.KeypressNotificationType.ENTRY_COMPLETED),
232            True)
233        self._enqueue_hci_command(hci_packets.UserPasskeyRequestReplyBuilder(peer, passkey), True)
234
235    def input_pin(self, address, pin):
236        """
237            Pretend to answer the pairing dialog as a user
238        """
239
240        if len(pin) > self.MAX_PIN_LENGTH or len(pin) < self.MIN_PIN_LENGTH:
241            raise Exception("Pin code must be within range")
242
243        logging.info("Cert: Waiting for PIN request")
244        assertThat(self._hci_event_stream).emits(HciMatchers.PinCodeRequest())
245        logging.info("Cert: Send user input PIN %s for %s" % (pin.decode(), address))
246        peer = address.decode('utf-8')
247        pin_list = list(pin)
248        # Pad
249        for i in range(self.MAX_PIN_LENGTH - len(pin_list)):
250            pin_list.append(0)
251        self._enqueue_hci_command(hci_packets.PinCodeRequestReplyBuilder(peer, len(pin), pin_list), True)
252
253    def __send_ui_callback(self, address, callback_type, b, uid, pin):
254        """
255            Pretend to answer the pairing dailog as a user
256        """
257        logging.info("Cert: Send user input callback uid:%d; response: %s" % (uid, b))
258        # TODO(optedoblivion): Make callback and set it to the module
259
260    def enable_secure_simple_pairing(self):
261        """
262            This is called when you want to enable SSP for testing
263        """
264        logging.info("Cert: Sending WRITE_SIMPLE_PAIRING_MODE [True]")
265        self._enqueue_hci_command(hci_packets.WriteSimplePairingModeBuilder(hci_packets.Enable.ENABLED), True)
266        logging.info("Cert: Waiting for controller response")
267        assertThat(self._hci_event_stream).emits(
268            HciMatchers.CommandComplete(hci_packets.OpCode.WRITE_SIMPLE_PAIRING_MODE))
269
270    def enable_secure_connections(self):
271        """
272            This is called when you want to enable secure connections support
273        """
274        logging.info("Cert: Sending WRITE_SECURE_CONNECTIONS_HOST_SUPPORT [True]")
275        self._enqueue_hci_command(
276            hci_packets.WriteSecureConnectionsHostSupportBuilder(hci_packets.Enable.ENABLED), True)
277        logging.info("Cert: Waiting for controller response")
278        assertThat(self._hci_event_stream).emits(
279            HciMatchers.CommandComplete(hci_packets.OpCode.WRITE_SECURE_CONNECTIONS_HOST_SUPPORT))
280        # TODO(optedoblivion): Figure this out and remove (see classic_pairing_handler.cc)
281        #self._secure_connections_enabled = True
282
283    def send_io_caps(self, address):
284        logging.info("Cert: Waiting for IO_CAPABILITY_REQUEST")
285        assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityRequest())
286        logging.info("Cert: Sending IO_CAPABILITY_REQUEST_REPLY")
287        oob_data_present = hci_packets.OobDataPresent.NOT_PRESENT
288        self._enqueue_hci_command(
289            hci_packets.IoCapabilityRequestReplyBuilder(
290                address.decode('utf8'), self._io_caps, oob_data_present, self._auth_reqs), True)
291
292    def accept_pairing(self, dut_address, reply_boolean):
293        """
294            Here we handle the pairing events at the HCI level
295        """
296        logging.info("Cert: Waiting for LINK_KEY_REQUEST")
297        assertThat(self._hci_event_stream).emits(HciMatchers.LinkKeyRequest())
298        logging.info("Cert: Sending LINK_KEY_REQUEST_NEGATIVE_REPLY")
299        self._enqueue_hci_command(hci_packets.LinkKeyRequestNegativeReplyBuilder(dut_address.decode('utf8')), True)
300        self.send_io_caps(dut_address)
301        logging.info("Cert: Waiting for USER_CONFIRMATION_REQUEST")
302        assertThat(self._hci_event_stream).emits(HciMatchers.UserConfirmationRequest())
303        logging.info("Cert: Sending Simulated User Response '%s'" % reply_boolean)
304        if reply_boolean:
305            logging.info("Cert: Sending USER_CONFIRMATION_REQUEST_REPLY")
306            self._enqueue_hci_command(hci_packets.UserConfirmationRequestReplyBuilder(dut_address.decode('utf8')), True)
307            logging.info("Cert: Waiting for SIMPLE_PAIRING_COMPLETE")
308            assertThat(self._hci_event_stream).emits(HciMatchers.SimplePairingComplete())
309            logging.info("Cert: Waiting for LINK_KEY_NOTIFICATION")
310            assertThat(self._hci_event_stream).emits(HciMatchers.LinkKeyNotification())
311        else:
312            logging.info("Cert: Sending USER_CONFIRMATION_REQUEST_NEGATIVE_REPLY")
313            self._enqueue_hci_command(
314                hci_packets.UserConfirmationRequestNegativeReplyBuilder(dut_address.decode('utf8')), True)
315            logging.info("Cert: Waiting for SIMPLE_PAIRING_COMPLETE")
316            assertThat(self._hci_event_stream).emits(HciMatchers.SimplePairingComplete())
317
318    def accept_oob_pairing(self, dut_address):
319        logging.info("Cert: Waiting for IO_CAPABILITY_RESPONSE")
320        assertThat(self._hci_event_stream).emits(HciMatchers.IoCapabilityResponse())
321        self.send_io_caps(dut_address)
322        logging.info("Cert: Waiting for SIMPLE_PAIRING_COMPLETE")
323        ssp_complete_capture = HciCaptures.SimplePairingCompleteCapture()
324        assertThat(self._hci_event_stream).emits(ssp_complete_capture)
325        ssp_complete = ssp_complete_capture.get()
326        logging.info(ssp_complete.GetStatus())
327        assertThat(ssp_complete.GetStatus()).isEqualTo(hci_packets.ErrorCode.SUCCESS)
328
329    def on_user_input(self, dut_address, reply_boolean, expected_ui_event):
330        """
331            Cert doesn't need the test to respond to the ui event
332            Cert responds in accept pairing
333        """
334        pass
335
336    def wait_for_bond_event(self, expected_bond_event):
337        """
338            A bond event will be triggered once the bond process
339            is complete.  For the DUT we need to wait for it,
340            for Cert it isn't needed.
341        """
342        pass
343
344    def enforce_security_policy(self, address, type, policy):
345        """
346            Pass for now
347        """
348        pass
349
350    def wait_for_enforce_security_event(self, expected_enforce_security_event):
351        """
352            Cert side needs to pass
353        """
354        pass
355
356    def wait_for_disconnect_event(self):
357        """
358            Cert side needs to pass
359        """
360        logging.info("Cert: Waiting for DISCONNECT_COMPLETE")
361        assertThat(self._hci_event_stream).emits(HciMatchers.DisconnectionComplete())
362
363    def close(self):
364        safeClose(self._hci)
365