1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from google.protobuf import empty_pb2 as empty_proto
18
19from l2cap.classic import facade_pb2 as l2cap_facade_pb2
20from l2cap.classic.facade_pb2 import LinkSecurityInterfaceCallbackEventType
21from l2cap.le import facade_pb2 as l2cap_le_facade_pb2
22from l2cap.le.facade_pb2 import SecurityLevel
23from bluetooth_packets_python3 import hci_packets
24from bluetooth_packets_python3 import l2cap_packets
25from cert.event_stream import FilteringEventStream
26from cert.event_stream import EventStream, IEventStream
27from cert.closable import Closable, safeClose
28from cert.py_hci import PyHci
29from cert.matchers import HciMatchers
30from cert.matchers import L2capMatchers
31from cert.truth import assertThat
32from facade import common_pb2 as common
33
34
35class PyL2capChannel(IEventStream):
36
37    def __init__(self, device, psm, l2cap_stream):
38        self._device = device
39        self._psm = psm
40        self._le_l2cap_stream = l2cap_stream
41        self._our_le_l2cap_view = FilteringEventStream(self._le_l2cap_stream,
42                                                       L2capMatchers.PacketPayloadWithMatchingPsm(self._psm))
43
44    def get_event_queue(self):
45        return self._our_le_l2cap_view.get_event_queue()
46
47    def send(self, payload):
48        self._device.l2cap.SendDynamicChannelPacket(
49            l2cap_facade_pb2.DynamicChannelPacket(psm=self._psm, payload=payload))
50
51    def close_channel(self):
52        self._device.l2cap.CloseChannel(l2cap_facade_pb2.CloseChannelRequest(psm=self._psm))
53
54    def set_traffic_paused(self, paused):
55        self._device.l2cap.SetTrafficPaused(l2cap_facade_pb2.SetTrafficPausedRequest(psm=self._psm, paused=paused))
56
57
58class _ClassicConnectionResponseFutureWrapper(object):
59    """
60    The future object returned when we send a connection request from DUT. Can be used to get connection status and
61    create the corresponding PyL2capDynamicChannel object later
62    """
63
64    def __init__(self, grpc_response_future, device, psm, l2cap_stream):
65        self._grpc_response_future = grpc_response_future
66        self._device = device
67        self._psm = psm
68        self._l2cap_stream = l2cap_stream
69
70    def get_channel(self):
71        return PyL2capChannel(self._device, self._psm, self._l2cap_stream)
72
73
74class PyL2cap(Closable):
75
76    def __init__(self, device, cert_address, has_security=False):
77        self._device = device
78        self._cert_address = cert_address
79        self._hci = PyHci(device)
80        self._l2cap_stream = EventStream(self._device.l2cap.FetchL2capData(empty_proto.Empty()))
81        self._security_connection_event_stream = EventStream(
82            self._device.l2cap.FetchSecurityConnectionEvents(empty_proto.Empty()))
83        if has_security == False:
84            self._hci.register_for_events(hci_packets.EventCode.LINK_KEY_REQUEST)
85
86    def close(self):
87        safeClose(self._l2cap_stream)
88        safeClose(self._security_connection_event_stream)
89        safeClose(self._hci)
90
91    def register_dynamic_channel(self, psm=0x33, mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC):
92        self._device.l2cap.SetDynamicChannel(
93            l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm, retransmission_mode=mode))
94        return PyL2capChannel(self._device, psm, self._l2cap_stream)
95
96    def connect_dynamic_channel_to_cert(self, psm=0x33, mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC):
97        """
98        Send open Dynamic channel request to CERT.
99        Get a future for connection result, to be used after CERT accepts request
100        """
101        self.register_dynamic_channel(psm, mode)
102        response_future = self._device.l2cap.OpenChannel.future(
103            l2cap_facade_pb2.OpenChannelRequest(psm=psm, remote=self._cert_address, mode=mode))
104
105        return _ClassicConnectionResponseFutureWrapper(response_future, self._device, psm, self._l2cap_stream)
106
107    def get_channel_queue_buffer_size(self):
108        return self._device.l2cap.GetChannelQueueDepth(empty_proto.Empty()).size
109
110    def initiate_connection_for_security(self):
111        """
112        Establish an ACL for the specific purpose of pairing devices
113        """
114        self._device.l2cap.InitiateConnectionForSecurity(self._cert_address)
115
116    def get_security_connection_event_stream(self):
117        """
118        Stream of Link related events.  Events are returned with an address.
119        Events map to the LinkSecurityInterfaceListener callbacks
120        """
121        return self._security_connection_event_stream
122
123    def security_link_hold(self):
124        """
125        Holds open the ACL indefinitely allowing for the security handshake
126        to take place
127        """
128        self._device.l2cap.SecurityLinkHold(self._cert_address)
129
130    def security_link_ensure_authenticated(self):
131        """
132        Triggers authentication process by sending HCI event AUTHENTICATION_REQUESTED
133        """
134        self._device.l2cap.SecurityLinkEnsureAuthenticated(self._cert_address)
135
136    def security_link_release(self):
137        """
138        Releases a Held open ACL allowing for the ACL to time out after the default time
139        """
140        self._device.l2cap.SecurityLinkRelease(self._cert_address)
141
142    def security_link_disconnect(self):
143        """
144        Immediately release and disconnect ACL
145        """
146        self._device.l2cap.SecurityLinkDisconnect(self._cert_address)
147
148    def verify_security_connection(self):
149        """
150        Verify that we get a connection and a link key request
151        """
152        assertThat(self.get_security_connection_event_stream()).emits(
153            lambda event: event.event_type == LinkSecurityInterfaceCallbackEventType.ON_CONNECTED)
154        assertThat(self._hci.get_event_stream()).emits(HciMatchers.LinkKeyRequest())
155
156
157class PyLeL2capFixedChannel(IEventStream):
158
159    def __init__(self, device, cid, l2cap_stream):
160        self._device = device
161        self._cid = cid
162        self._le_l2cap_stream = l2cap_stream
163        self._our_le_l2cap_view = FilteringEventStream(self._le_l2cap_stream,
164                                                       L2capMatchers.PacketPayloadWithMatchingCid(self._cid))
165
166    def get_event_queue(self):
167        return self._our_le_l2cap_view.get_event_queue()
168
169    def send(self, payload):
170        self._device.l2cap_le.SendFixedChannelPacket(
171            l2cap_le_facade_pb2.FixedChannelPacket(cid=self._cid, payload=payload))
172
173    def close_channel(self):
174        self._device.l2cap_le.SetFixedChannel(
175            l2cap_le_facade_pb2.SetEnableFixedChannelRequest(cid=self._cid, enable=False))
176
177
178class PyLeL2capDynamicChannel(IEventStream):
179
180    def __init__(self, device, cert_address, psm, l2cap_stream):
181        self._device = device
182        self._cert_address = cert_address
183        self._psm = psm
184        self._le_l2cap_stream = l2cap_stream
185        self._our_le_l2cap_view = FilteringEventStream(self._le_l2cap_stream,
186                                                       L2capMatchers.PacketPayloadWithMatchingPsm(self._psm))
187
188    def get_event_queue(self):
189        return self._our_le_l2cap_view.get_event_queue()
190
191    def send(self, payload):
192        self._device.l2cap_le.SendDynamicChannelPacket(
193            l2cap_le_facade_pb2.DynamicChannelPacket(psm=self._psm, payload=payload))
194
195    def close_channel(self):
196        self._device.l2cap_le.CloseDynamicChannel(
197            l2cap_le_facade_pb2.CloseDynamicChannelRequest(remote=self._cert_address, psm=self._psm))
198
199
200class _CreditBasedConnectionResponseFutureWrapper(object):
201    """
202    The future object returned when we send a connection request from DUT. Can be used to get connection status and
203    create the corresponding PyLeL2capDynamicChannel object later
204    """
205
206    def __init__(self, grpc_response_future, device, cert_address, psm, le_l2cap_stream):
207        self._grpc_response_future = grpc_response_future
208        self._device = device
209        self._cert_address = cert_address
210        self._psm = psm
211        self._le_l2cap_stream = le_l2cap_stream
212
213    def get_status(self):
214        return l2cap_packets.LeCreditBasedConnectionResponseResult(self._grpc_response_future.result().status)
215
216    def get_channel(self):
217        assertThat(self.get_status()).isEqualTo(l2cap_packets.LeCreditBasedConnectionResponseResult.SUCCESS)
218        return PyLeL2capDynamicChannel(self._device, self._cert_address, self._psm, self._le_l2cap_stream)
219
220
221class PyLeL2cap(Closable):
222
223    def __init__(self, device):
224        self._device = device
225        self._le_l2cap_stream = EventStream(self._device.l2cap_le.FetchL2capData(empty_proto.Empty()))
226
227    def close(self):
228        safeClose(self._le_l2cap_stream)
229
230    def enable_fixed_channel(self, cid=4):
231        self._device.l2cap_le.SetFixedChannel(l2cap_le_facade_pb2.SetEnableFixedChannelRequest(cid=cid, enable=True))
232
233    def get_fixed_channel(self, cid=4):
234        return PyLeL2capFixedChannel(self._device, cid, self._le_l2cap_stream)
235
236    def register_coc(self, cert_address, psm=0x33, security_level=SecurityLevel.NO_SECURITY):
237        self._device.l2cap_le.SetDynamicChannel(
238            l2cap_le_facade_pb2.SetEnableDynamicChannelRequest(psm=psm, enable=True, security_level=security_level))
239        return PyLeL2capDynamicChannel(self._device, cert_address, psm, self._le_l2cap_stream)
240
241    def connect_coc_to_cert(self, cert_address, psm=0x33):
242        """
243        Send open LE COC request to CERT. Get a future for connection result, to be used after CERT accepts request
244        """
245        self.register_coc(cert_address, psm)
246        response_future = self._device.l2cap_le.OpenDynamicChannel.future(
247            l2cap_le_facade_pb2.OpenDynamicChannelRequest(psm=psm, remote=cert_address))
248
249        return _CreditBasedConnectionResponseFutureWrapper(response_future, self._device, cert_address, psm,
250                                                           self._le_l2cap_stream)
251
252    def update_connection_parameter(self,
253                                    conn_interval_min=0x10,
254                                    conn_interval_max=0x10,
255                                    conn_latency=0x0a,
256                                    supervision_timeout=0x64,
257                                    min_ce_length=12,
258                                    max_ce_length=12):
259        self._device.l2cap_le.SendConnectionParameterUpdate(
260            l2cap_le_facade_pb2.ConnectionParameter(
261                conn_interval_min=conn_interval_min,
262                conn_interval_max=conn_interval_max,
263                conn_latency=conn_latency,
264                supervision_timeout=supervision_timeout,
265                min_ce_length=min_ce_length,
266                max_ce_length=max_ce_length))
267