1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import logging
18
19from cert.behavior import IHasBehaviors, SingleArgumentBehavior, ReplyStage
20from cert.closable import Closable
21from cert.closable import safeClose
22from cert.py_acl_manager import PyAclManager
23from cert.truth import assertThat
24import bluetooth_packets_python3 as bt_packets
25from bluetooth_packets_python3 import l2cap_packets
26from bluetooth_packets_python3 import RawBuilder
27from bluetooth_packets_python3.l2cap_packets import CommandCode
28from bluetooth_packets_python3.l2cap_packets import Final
29from bluetooth_packets_python3.l2cap_packets import SegmentationAndReassembly
30from bluetooth_packets_python3.l2cap_packets import SupervisoryFunction
31from bluetooth_packets_python3.l2cap_packets import Poll
32from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType
33from bluetooth_packets_python3.l2cap_packets import ConfigurationResponseResult
34from cert.event_stream import FilteringEventStream
35from cert.event_stream import IEventStream
36from cert.matchers import L2capMatchers
37from cert.captures import L2capCaptures
38
39
40class CertL2capChannel(IEventStream):
41
42    def __init__(self, device, scid, dcid, acl_stream, acl, control_channel, fcs=None):
43        self._device = device
44        self._scid = scid
45        self._dcid = dcid
46        self._acl_stream = acl_stream
47        self._acl = acl
48        self._control_channel = control_channel
49        self._config_rsp_received = False
50        self._config_rsp_sent = False
51        if fcs == l2cap_packets.FcsType.DEFAULT:
52            self._our_acl_view = FilteringEventStream(acl_stream, L2capMatchers.ExtractBasicFrameWithFcs(scid))
53        else:
54            self._our_acl_view = FilteringEventStream(acl_stream, L2capMatchers.ExtractBasicFrame(scid))
55
56    def get_event_queue(self):
57        return self._our_acl_view.get_event_queue()
58
59    def is_configured(self):
60        return self._config_rsp_received and self._config_rsp_sent
61
62    def send(self, packet):
63        frame = l2cap_packets.BasicFrameBuilder(self._dcid, packet)
64        self._acl.send(frame.Serialize())
65
66    def send_i_frame(self,
67                     tx_seq,
68                     req_seq,
69                     f=Final.NOT_SET,
70                     sar=SegmentationAndReassembly.UNSEGMENTED,
71                     payload=None,
72                     fcs=False):
73        if fcs == l2cap_packets.FcsType.DEFAULT:
74            frame = l2cap_packets.EnhancedInformationFrameWithFcsBuilder(self._dcid, tx_seq, f, req_seq, sar, payload)
75        else:
76            frame = l2cap_packets.EnhancedInformationFrameBuilder(self._dcid, tx_seq, f, req_seq, sar, payload)
77        self._acl.send(frame.Serialize())
78
79    def send_s_frame(self, req_seq, s=SupervisoryFunction.RECEIVER_READY, p=Poll.NOT_SET, f=Final.NOT_SET):
80        frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(self._dcid, s, p, f, req_seq)
81        self._acl.send(frame.Serialize())
82
83    def config_request_for_me(self):
84        return L2capMatchers.ConfigurationRequest(self._scid)
85
86    def send_configure_request(self, options, sid=2, continuation=l2cap_packets.Continuation.END):
87        assertThat(self._scid).isNotEqualTo(1)
88        request = l2cap_packets.ConfigurationRequestBuilder(sid, self._dcid, continuation, options)
89        self._control_channel.send(request)
90
91    def _send_information_request(self, type):
92        assertThat(self._scid).isEqualTo(1)
93        signal_id = 3
94        information_request = l2cap_packets.InformationRequestBuilder(signal_id, type)
95        self.send(information_request)
96
97    def send_extended_features_request(self):
98        self._send_information_request(InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED)
99
100    def verify_configuration_request_and_respond(self, result=ConfigurationResponseResult.SUCCESS, options=None):
101        request_capture = L2capCaptures.ConfigurationRequest(self._scid)
102        assertThat(self._control_channel).emits(request_capture)
103        request = request_capture.get()
104        sid = request.GetIdentifier()
105        if options is None:
106            options = []
107        config_response = l2cap_packets.ConfigurationResponseBuilder(sid, self._dcid, l2cap_packets.Continuation.END,
108                                                                     result, options)
109        self._control_channel.send(config_response)
110
111    def send_configuration_response(self, request, result=ConfigurationResponseResult.SUCCESS, options=None):
112        sid = request.GetIdentifier()
113        if options is None:
114            options = []
115        config_response = l2cap_packets.ConfigurationResponseBuilder(sid, self._dcid, l2cap_packets.Continuation.END,
116                                                                     result, options)
117        self._control_channel.send(config_response)
118        self._config_rsp_sent = True
119
120    def verify_configuration_response(self, result=ConfigurationResponseResult.SUCCESS):
121        assertThat(self._control_channel).emits(L2capMatchers.ConfigurationResponse(result))
122
123    def disconnect_and_verify(self):
124        assertThat(self._scid).isNotEqualTo(1)
125        self._control_channel.send(l2cap_packets.DisconnectionRequestBuilder(1, self._dcid, self._scid))
126
127        assertThat(self._control_channel).emits(L2capMatchers.DisconnectionResponse(self._scid, self._dcid))
128
129    def verify_disconnect_request(self):
130        assertThat(self._control_channel).emits(L2capMatchers.DisconnectionRequest(self._dcid, self._scid))
131
132
133class CertL2capControlChannelBehaviors(object):
134
135    def __init__(self, parent):
136        self.on_config_req_behavior = SingleArgumentBehavior(
137            lambda: CertL2capControlChannelBehaviors.CertReplyStage(parent))
138
139    def on_config_req(self, matcher):
140        return self.on_config_req_behavior.begin(matcher)
141
142    class CertReplyStage(ReplyStage):
143
144        def __init__(self, parent):
145            self.parent = parent
146
147        def send_configuration_response(self, result=ConfigurationResponseResult.SUCCESS, options=None):
148            self._commit(lambda request: self._send_configuration_response(request, result, options))
149            return self
150
151        def _send_configuration_response(self, request, result=ConfigurationResponseResult.SUCCESS, options=None):
152            dcid = request.GetDestinationCid()
153            if dcid not in self.parent.scid_to_channel:
154                logging.warning("Received config request with unknown dcid")
155                return
156            self.parent.scid_to_channel[dcid].send_configuration_response(request, result, options)
157
158
159class CertL2cap(Closable, IHasBehaviors):
160
161    def __init__(self, device):
162        self._device = device
163        self._acl_manager = PyAclManager(device)
164        self._acl = None
165
166        self.control_table = {
167            CommandCode.CONNECTION_RESPONSE: self._on_connection_response_default,
168            CommandCode.CONFIGURATION_REQUEST: self._on_configuration_request_default,
169            CommandCode.CONFIGURATION_RESPONSE: self._on_configuration_response_default,
170            CommandCode.DISCONNECTION_REQUEST: self._on_disconnection_request_default,
171            CommandCode.DISCONNECTION_RESPONSE: self._on_disconnection_response_default,
172            CommandCode.INFORMATION_REQUEST: self._on_information_request_default,
173            CommandCode.INFORMATION_RESPONSE: self._on_information_response_default
174        }
175
176        self.scid_to_channel = {}
177
178        self.support_ertm = True
179        self.support_fcs = True
180
181        self._control_behaviors = CertL2capControlChannelBehaviors(self)
182        self._control_behaviors.on_config_req_behavior.set_default(self._send_configuration_response_default)
183
184    def close(self):
185        self._acl_manager.close()
186        safeClose(self._acl)
187
188    def get_behaviors(self):
189        return self._control_behaviors
190
191    def connect_acl(self, remote_addr):
192        self._acl_manager.initiate_connection(remote_addr)
193        self._acl = self._acl_manager.complete_outgoing_connection()
194        self.control_channel = CertL2capChannel(
195            self._device, 1, 1, self._acl.acl_stream, self._acl, control_channel=None)
196        self._acl.acl_stream.register_callback(self._handle_control_packet)
197
198    def accept_incoming_connection(self):
199        self._acl_manager.listen_for_an_incoming_connection()
200        self._acl = self._acl_manager.complete_incoming_connection()
201
202    def open_channel(self, signal_id, psm, scid, fcs=None):
203        self.control_channel.send(l2cap_packets.ConnectionRequestBuilder(signal_id, psm, scid))
204
205        response = L2capCaptures.ConnectionResponse(scid)
206        assertThat(self.control_channel).emits(response)
207        channel = CertL2capChannel(self._device, scid,
208                                   response.get().GetDestinationCid(), self._acl.acl_stream, self._acl,
209                                   self.control_channel, fcs)
210        self.scid_to_channel[scid] = channel
211
212        return channel
213
214    def verify_and_respond_open_channel_from_remote(self, psm=0x33, scid=None, fcs=None):
215
216        request = L2capCaptures.ConnectionRequest(psm)
217        assertThat(self.control_channel).emits(request)
218
219        sid = request.get().GetIdentifier()
220        dcid = request.get().GetSourceCid()
221        if scid is None or scid in self.scid_to_channel:
222            scid = dcid
223        channel = CertL2capChannel(self._device, scid, dcid, self._acl.acl_stream, self._acl, self.control_channel, fcs)
224        self.scid_to_channel[scid] = channel
225
226        connection_response = l2cap_packets.ConnectionResponseBuilder(
227            sid, scid, dcid, l2cap_packets.ConnectionResponseResult.SUCCESS,
228            l2cap_packets.ConnectionResponseStatus.NO_FURTHER_INFORMATION_AVAILABLE)
229        self.control_channel.send(connection_response)
230
231        return channel
232
233    def verify_and_respond_open_channel_from_remote_and_send_config_req(self, psm=0x33):
234        """
235        Verify a connection request, and send a combo packet of connection response and configuration request
236        """
237        request = L2capCaptures.ConnectionRequest(psm)
238        assertThat(self.control_channel).emits(request)
239
240        sid = request.get().GetIdentifier()
241        dcid = request.get().GetSourceCid()
242        scid = dcid
243        channel = CertL2capChannel(self._device, scid, dcid, self._acl.acl_stream, self._acl, self.control_channel)
244        self.scid_to_channel[scid] = channel
245
246        # Connection response and config request combo packet
247        conn_rsp_and_config_req = RawBuilder([
248            0x03, sid, 0x08, 0x00, dcid, 0x00, dcid, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, sid + 1, 0x04, 0x00, dcid,
249            0x00, 0x00, 0x00
250        ])
251        self.control_channel.send(conn_rsp_and_config_req)
252
253        return channel
254
255    # prefer to use channel abstraction instead, if at all possible
256    def send_acl(self, packet):
257        self._acl.send(packet.Serialize())
258
259    def get_control_channel(self):
260        return self.control_channel
261
262    # Disable ERTM when exchange extened feature
263    def claim_ertm_unsupported(self):
264        self.support_ertm = False
265
266    def _on_connection_response_default(self, l2cap_control_view):
267        pass
268
269    def _on_configuration_request_default(self, l2cap_control_view):
270        request = l2cap_packets.ConfigurationRequestView(l2cap_control_view)
271        dcid = request.GetDestinationCid()
272        if dcid not in self.scid_to_channel:
273            logging.warning("Received config request with unknown dcid")
274            return
275        self._control_behaviors.on_config_req_behavior.run(request)
276
277    def _send_configuration_response_default(self, captured_request_view):
278        dcid = captured_request_view.GetDestinationCid()
279        if dcid not in self.scid_to_channel:
280            return
281        self.scid_to_channel[dcid].send_configuration_response(captured_request_view)
282
283    @staticmethod
284    def config_option_basic_explicit(mtu=642):
285        mtu_opt = l2cap_packets.MtuConfigurationOption()
286        mtu_opt.mtu = mtu
287        rfc_opt = l2cap_packets.RetransmissionAndFlowControlConfigurationOption()
288        rfc_opt.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.L2CAP_BASIC
289        return [mtu_opt, rfc_opt]
290
291    @staticmethod
292    def config_option_mtu_explicit(mtu=642):
293        mtu_opt = l2cap_packets.MtuConfigurationOption()
294        mtu_opt.mtu = mtu
295        return [mtu_opt]
296
297    @staticmethod
298    def config_option_ertm(mtu=642,
299                           fcs=None,
300                           max_transmit=10,
301                           mps=1010,
302                           tx_window_size=10,
303                           monitor_time_out=2000,
304                           retransmission_time_out=1000):
305        result = []
306        mtu_opt = l2cap_packets.MtuConfigurationOption()
307        mtu_opt.mtu = mtu
308        result.append(mtu_opt)
309        if fcs is not None:
310            fcs_opt = l2cap_packets.FrameCheckSequenceOption()
311            fcs_opt.fcs_type = fcs
312            result.append(fcs_opt)
313        rfc_opt = l2cap_packets.RetransmissionAndFlowControlConfigurationOption()
314        rfc_opt.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.ENHANCED_RETRANSMISSION
315        rfc_opt.tx_window_size = tx_window_size
316        rfc_opt.max_transmit = max_transmit
317        rfc_opt.retransmission_time_out = retransmission_time_out
318        rfc_opt.monitor_time_out = monitor_time_out
319        rfc_opt.maximum_pdu_size = mps
320        result.append(rfc_opt)
321        return result
322
323    @staticmethod
324    def config_option_ertm_with_max_transmit_one():
325        return CertL2cap.config_option_ertm(max_transmit=1)
326
327    @staticmethod
328    def config_option_ertm_with_mps(mps=1010):
329        return CertL2cap.config_option_ertm(mps=mps)
330
331    def _on_configuration_response_default(self, l2cap_control_view):
332        response = l2cap_packets.ConfigurationResponseView(l2cap_control_view)
333        scid = response.GetSourceCid()
334        if scid not in self.scid_to_channel:
335            logging.warning("Received config request with unknown dcid")
336            return
337        result = response.GetResult()
338        if result == ConfigurationResponseResult.SUCCESS:
339            self.scid_to_channel[scid]._config_rsp_received = True
340
341    def _on_disconnection_request_default(self, l2cap_control_view):
342        disconnection_request = l2cap_packets.DisconnectionRequestView(l2cap_control_view)
343        sid = disconnection_request.GetIdentifier()
344        scid = disconnection_request.GetSourceCid()
345        dcid = disconnection_request.GetDestinationCid()
346        disconnection_response = l2cap_packets.DisconnectionResponseBuilder(sid, dcid, scid)
347        self.control_channel.send(disconnection_response)
348
349    def _on_disconnection_response_default(self, l2cap_control_view):
350        pass
351
352    def _on_information_request_default(self, l2cap_control_view):
353        information_request = l2cap_packets.InformationRequestView(l2cap_control_view)
354        sid = information_request.GetIdentifier()
355        information_type = information_request.GetInfoType()
356        if information_type == l2cap_packets.InformationRequestInfoType.CONNECTIONLESS_MTU:
357            response = l2cap_packets.InformationResponseConnectionlessMtuBuilder(
358                sid, l2cap_packets.InformationRequestResult.SUCCESS, 100)
359            self.control_channel.send(response)
360            return
361        if information_type == l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED:
362            response = l2cap_packets.InformationResponseExtendedFeaturesBuilder(
363                sid, l2cap_packets.InformationRequestResult.SUCCESS, 0, 0, 0, self.support_ertm, 0, self.support_fcs, 0,
364                0, 0, 0, 0)
365            self.control_channel.send(response)
366            return
367        if information_type == l2cap_packets.InformationRequestInfoType.FIXED_CHANNELS_SUPPORTED:
368            response = l2cap_packets.InformationResponseFixedChannelsBuilder(
369                sid, l2cap_packets.InformationRequestResult.SUCCESS, 2)
370            self.control_channel.send(response)
371            return
372
373    def _on_information_response_default(self, l2cap_control_view):
374        pass
375
376    def _handle_control_packet(self, l2cap_packet):
377        packet_bytes = l2cap_packet.payload
378        l2cap_view = l2cap_packets.BasicFrameView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))
379        if l2cap_view.GetChannelId() != 1:
380            return
381        l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload())
382        fn = self.control_table.get(l2cap_control_view.GetCode())
383        if fn is not None:
384            fn(l2cap_control_view)
385