1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18 19from cert.behavior import IHasBehaviors, SingleArgumentBehavior, ReplyStage 20from cert.closable import Closable 21from cert.closable import safeClose 22from cert.py_acl_manager import PyAclManager 23from cert.truth import assertThat 24import bluetooth_packets_python3 as bt_packets 25from bluetooth_packets_python3 import l2cap_packets 26from bluetooth_packets_python3 import RawBuilder 27from bluetooth_packets_python3.l2cap_packets import CommandCode 28from bluetooth_packets_python3.l2cap_packets import Final 29from bluetooth_packets_python3.l2cap_packets import SegmentationAndReassembly 30from bluetooth_packets_python3.l2cap_packets import SupervisoryFunction 31from bluetooth_packets_python3.l2cap_packets import Poll 32from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType 33from bluetooth_packets_python3.l2cap_packets import ConfigurationResponseResult 34from cert.event_stream import FilteringEventStream 35from cert.event_stream import IEventStream 36from cert.matchers import L2capMatchers 37from cert.captures import L2capCaptures 38 39 40class CertL2capChannel(IEventStream): 41 42 def __init__(self, device, scid, dcid, acl_stream, acl, control_channel, fcs=None): 43 self._device = device 44 self._scid = scid 45 self._dcid = dcid 46 self._acl_stream = acl_stream 47 self._acl = acl 48 self._control_channel = control_channel 49 self._config_rsp_received = False 50 self._config_rsp_sent = False 51 if fcs == l2cap_packets.FcsType.DEFAULT: 52 self._our_acl_view = FilteringEventStream(acl_stream, L2capMatchers.ExtractBasicFrameWithFcs(scid)) 53 else: 54 self._our_acl_view = FilteringEventStream(acl_stream, L2capMatchers.ExtractBasicFrame(scid)) 55 56 def get_event_queue(self): 57 return self._our_acl_view.get_event_queue() 58 59 def is_configured(self): 60 return self._config_rsp_received and self._config_rsp_sent 61 62 def send(self, packet): 63 frame = l2cap_packets.BasicFrameBuilder(self._dcid, packet) 64 self._acl.send(frame.Serialize()) 65 66 def send_i_frame(self, 67 tx_seq, 68 req_seq, 69 f=Final.NOT_SET, 70 sar=SegmentationAndReassembly.UNSEGMENTED, 71 payload=None, 72 fcs=False): 73 if fcs == l2cap_packets.FcsType.DEFAULT: 74 frame = l2cap_packets.EnhancedInformationFrameWithFcsBuilder(self._dcid, tx_seq, f, req_seq, sar, payload) 75 else: 76 frame = l2cap_packets.EnhancedInformationFrameBuilder(self._dcid, tx_seq, f, req_seq, sar, payload) 77 self._acl.send(frame.Serialize()) 78 79 def send_s_frame(self, req_seq, s=SupervisoryFunction.RECEIVER_READY, p=Poll.NOT_SET, f=Final.NOT_SET): 80 frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(self._dcid, s, p, f, req_seq) 81 self._acl.send(frame.Serialize()) 82 83 def config_request_for_me(self): 84 return L2capMatchers.ConfigurationRequest(self._scid) 85 86 def send_configure_request(self, options, sid=2, continuation=l2cap_packets.Continuation.END): 87 assertThat(self._scid).isNotEqualTo(1) 88 request = l2cap_packets.ConfigurationRequestBuilder(sid, self._dcid, continuation, options) 89 self._control_channel.send(request) 90 91 def _send_information_request(self, type): 92 assertThat(self._scid).isEqualTo(1) 93 signal_id = 3 94 information_request = l2cap_packets.InformationRequestBuilder(signal_id, type) 95 self.send(information_request) 96 97 def send_extended_features_request(self): 98 self._send_information_request(InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED) 99 100 def verify_configuration_request_and_respond(self, result=ConfigurationResponseResult.SUCCESS, options=None): 101 request_capture = L2capCaptures.ConfigurationRequest(self._scid) 102 assertThat(self._control_channel).emits(request_capture) 103 request = request_capture.get() 104 sid = request.GetIdentifier() 105 if options is None: 106 options = [] 107 config_response = l2cap_packets.ConfigurationResponseBuilder(sid, self._dcid, l2cap_packets.Continuation.END, 108 result, options) 109 self._control_channel.send(config_response) 110 111 def send_configuration_response(self, request, result=ConfigurationResponseResult.SUCCESS, options=None): 112 sid = request.GetIdentifier() 113 if options is None: 114 options = [] 115 config_response = l2cap_packets.ConfigurationResponseBuilder(sid, self._dcid, l2cap_packets.Continuation.END, 116 result, options) 117 self._control_channel.send(config_response) 118 self._config_rsp_sent = True 119 120 def verify_configuration_response(self, result=ConfigurationResponseResult.SUCCESS): 121 assertThat(self._control_channel).emits(L2capMatchers.ConfigurationResponse(result)) 122 123 def disconnect_and_verify(self): 124 assertThat(self._scid).isNotEqualTo(1) 125 self._control_channel.send(l2cap_packets.DisconnectionRequestBuilder(1, self._dcid, self._scid)) 126 127 assertThat(self._control_channel).emits(L2capMatchers.DisconnectionResponse(self._scid, self._dcid)) 128 129 def verify_disconnect_request(self): 130 assertThat(self._control_channel).emits(L2capMatchers.DisconnectionRequest(self._dcid, self._scid)) 131 132 133class CertL2capControlChannelBehaviors(object): 134 135 def __init__(self, parent): 136 self.on_config_req_behavior = SingleArgumentBehavior( 137 lambda: CertL2capControlChannelBehaviors.CertReplyStage(parent)) 138 139 def on_config_req(self, matcher): 140 return self.on_config_req_behavior.begin(matcher) 141 142 class CertReplyStage(ReplyStage): 143 144 def __init__(self, parent): 145 self.parent = parent 146 147 def send_configuration_response(self, result=ConfigurationResponseResult.SUCCESS, options=None): 148 self._commit(lambda request: self._send_configuration_response(request, result, options)) 149 return self 150 151 def _send_configuration_response(self, request, result=ConfigurationResponseResult.SUCCESS, options=None): 152 dcid = request.GetDestinationCid() 153 if dcid not in self.parent.scid_to_channel: 154 logging.warning("Received config request with unknown dcid") 155 return 156 self.parent.scid_to_channel[dcid].send_configuration_response(request, result, options) 157 158 159class CertL2cap(Closable, IHasBehaviors): 160 161 def __init__(self, device): 162 self._device = device 163 self._acl_manager = PyAclManager(device) 164 self._acl = None 165 166 self.control_table = { 167 CommandCode.CONNECTION_RESPONSE: self._on_connection_response_default, 168 CommandCode.CONFIGURATION_REQUEST: self._on_configuration_request_default, 169 CommandCode.CONFIGURATION_RESPONSE: self._on_configuration_response_default, 170 CommandCode.DISCONNECTION_REQUEST: self._on_disconnection_request_default, 171 CommandCode.DISCONNECTION_RESPONSE: self._on_disconnection_response_default, 172 CommandCode.INFORMATION_REQUEST: self._on_information_request_default, 173 CommandCode.INFORMATION_RESPONSE: self._on_information_response_default 174 } 175 176 self.scid_to_channel = {} 177 178 self.support_ertm = True 179 self.support_fcs = True 180 181 self._control_behaviors = CertL2capControlChannelBehaviors(self) 182 self._control_behaviors.on_config_req_behavior.set_default(self._send_configuration_response_default) 183 184 def close(self): 185 self._acl_manager.close() 186 safeClose(self._acl) 187 188 def get_behaviors(self): 189 return self._control_behaviors 190 191 def connect_acl(self, remote_addr): 192 self._acl_manager.initiate_connection(remote_addr) 193 self._acl = self._acl_manager.complete_outgoing_connection() 194 self.control_channel = CertL2capChannel( 195 self._device, 1, 1, self._acl.acl_stream, self._acl, control_channel=None) 196 self._acl.acl_stream.register_callback(self._handle_control_packet) 197 198 def accept_incoming_connection(self): 199 self._acl_manager.listen_for_an_incoming_connection() 200 self._acl = self._acl_manager.complete_incoming_connection() 201 202 def open_channel(self, signal_id, psm, scid, fcs=None): 203 self.control_channel.send(l2cap_packets.ConnectionRequestBuilder(signal_id, psm, scid)) 204 205 response = L2capCaptures.ConnectionResponse(scid) 206 assertThat(self.control_channel).emits(response) 207 channel = CertL2capChannel(self._device, scid, 208 response.get().GetDestinationCid(), self._acl.acl_stream, self._acl, 209 self.control_channel, fcs) 210 self.scid_to_channel[scid] = channel 211 212 return channel 213 214 def verify_and_respond_open_channel_from_remote(self, psm=0x33, scid=None, fcs=None): 215 216 request = L2capCaptures.ConnectionRequest(psm) 217 assertThat(self.control_channel).emits(request) 218 219 sid = request.get().GetIdentifier() 220 dcid = request.get().GetSourceCid() 221 if scid is None or scid in self.scid_to_channel: 222 scid = dcid 223 channel = CertL2capChannel(self._device, scid, dcid, self._acl.acl_stream, self._acl, self.control_channel, fcs) 224 self.scid_to_channel[scid] = channel 225 226 connection_response = l2cap_packets.ConnectionResponseBuilder( 227 sid, scid, dcid, l2cap_packets.ConnectionResponseResult.SUCCESS, 228 l2cap_packets.ConnectionResponseStatus.NO_FURTHER_INFORMATION_AVAILABLE) 229 self.control_channel.send(connection_response) 230 231 return channel 232 233 def verify_and_respond_open_channel_from_remote_and_send_config_req(self, psm=0x33): 234 """ 235 Verify a connection request, and send a combo packet of connection response and configuration request 236 """ 237 request = L2capCaptures.ConnectionRequest(psm) 238 assertThat(self.control_channel).emits(request) 239 240 sid = request.get().GetIdentifier() 241 dcid = request.get().GetSourceCid() 242 scid = dcid 243 channel = CertL2capChannel(self._device, scid, dcid, self._acl.acl_stream, self._acl, self.control_channel) 244 self.scid_to_channel[scid] = channel 245 246 # Connection response and config request combo packet 247 conn_rsp_and_config_req = RawBuilder([ 248 0x03, sid, 0x08, 0x00, dcid, 0x00, dcid, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, sid + 1, 0x04, 0x00, dcid, 249 0x00, 0x00, 0x00 250 ]) 251 self.control_channel.send(conn_rsp_and_config_req) 252 253 return channel 254 255 # prefer to use channel abstraction instead, if at all possible 256 def send_acl(self, packet): 257 self._acl.send(packet.Serialize()) 258 259 def get_control_channel(self): 260 return self.control_channel 261 262 # Disable ERTM when exchange extened feature 263 def claim_ertm_unsupported(self): 264 self.support_ertm = False 265 266 def _on_connection_response_default(self, l2cap_control_view): 267 pass 268 269 def _on_configuration_request_default(self, l2cap_control_view): 270 request = l2cap_packets.ConfigurationRequestView(l2cap_control_view) 271 dcid = request.GetDestinationCid() 272 if dcid not in self.scid_to_channel: 273 logging.warning("Received config request with unknown dcid") 274 return 275 self._control_behaviors.on_config_req_behavior.run(request) 276 277 def _send_configuration_response_default(self, captured_request_view): 278 dcid = captured_request_view.GetDestinationCid() 279 if dcid not in self.scid_to_channel: 280 return 281 self.scid_to_channel[dcid].send_configuration_response(captured_request_view) 282 283 @staticmethod 284 def config_option_basic_explicit(mtu=642): 285 mtu_opt = l2cap_packets.MtuConfigurationOption() 286 mtu_opt.mtu = mtu 287 rfc_opt = l2cap_packets.RetransmissionAndFlowControlConfigurationOption() 288 rfc_opt.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.L2CAP_BASIC 289 return [mtu_opt, rfc_opt] 290 291 @staticmethod 292 def config_option_mtu_explicit(mtu=642): 293 mtu_opt = l2cap_packets.MtuConfigurationOption() 294 mtu_opt.mtu = mtu 295 return [mtu_opt] 296 297 @staticmethod 298 def config_option_ertm(mtu=642, 299 fcs=None, 300 max_transmit=10, 301 mps=1010, 302 tx_window_size=10, 303 monitor_time_out=2000, 304 retransmission_time_out=1000): 305 result = [] 306 mtu_opt = l2cap_packets.MtuConfigurationOption() 307 mtu_opt.mtu = mtu 308 result.append(mtu_opt) 309 if fcs is not None: 310 fcs_opt = l2cap_packets.FrameCheckSequenceOption() 311 fcs_opt.fcs_type = fcs 312 result.append(fcs_opt) 313 rfc_opt = l2cap_packets.RetransmissionAndFlowControlConfigurationOption() 314 rfc_opt.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.ENHANCED_RETRANSMISSION 315 rfc_opt.tx_window_size = tx_window_size 316 rfc_opt.max_transmit = max_transmit 317 rfc_opt.retransmission_time_out = retransmission_time_out 318 rfc_opt.monitor_time_out = monitor_time_out 319 rfc_opt.maximum_pdu_size = mps 320 result.append(rfc_opt) 321 return result 322 323 @staticmethod 324 def config_option_ertm_with_max_transmit_one(): 325 return CertL2cap.config_option_ertm(max_transmit=1) 326 327 @staticmethod 328 def config_option_ertm_with_mps(mps=1010): 329 return CertL2cap.config_option_ertm(mps=mps) 330 331 def _on_configuration_response_default(self, l2cap_control_view): 332 response = l2cap_packets.ConfigurationResponseView(l2cap_control_view) 333 scid = response.GetSourceCid() 334 if scid not in self.scid_to_channel: 335 logging.warning("Received config request with unknown dcid") 336 return 337 result = response.GetResult() 338 if result == ConfigurationResponseResult.SUCCESS: 339 self.scid_to_channel[scid]._config_rsp_received = True 340 341 def _on_disconnection_request_default(self, l2cap_control_view): 342 disconnection_request = l2cap_packets.DisconnectionRequestView(l2cap_control_view) 343 sid = disconnection_request.GetIdentifier() 344 scid = disconnection_request.GetSourceCid() 345 dcid = disconnection_request.GetDestinationCid() 346 disconnection_response = l2cap_packets.DisconnectionResponseBuilder(sid, dcid, scid) 347 self.control_channel.send(disconnection_response) 348 349 def _on_disconnection_response_default(self, l2cap_control_view): 350 pass 351 352 def _on_information_request_default(self, l2cap_control_view): 353 information_request = l2cap_packets.InformationRequestView(l2cap_control_view) 354 sid = information_request.GetIdentifier() 355 information_type = information_request.GetInfoType() 356 if information_type == l2cap_packets.InformationRequestInfoType.CONNECTIONLESS_MTU: 357 response = l2cap_packets.InformationResponseConnectionlessMtuBuilder( 358 sid, l2cap_packets.InformationRequestResult.SUCCESS, 100) 359 self.control_channel.send(response) 360 return 361 if information_type == l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED: 362 response = l2cap_packets.InformationResponseExtendedFeaturesBuilder( 363 sid, l2cap_packets.InformationRequestResult.SUCCESS, 0, 0, 0, self.support_ertm, 0, self.support_fcs, 0, 364 0, 0, 0, 0) 365 self.control_channel.send(response) 366 return 367 if information_type == l2cap_packets.InformationRequestInfoType.FIXED_CHANNELS_SUPPORTED: 368 response = l2cap_packets.InformationResponseFixedChannelsBuilder( 369 sid, l2cap_packets.InformationRequestResult.SUCCESS, 2) 370 self.control_channel.send(response) 371 return 372 373 def _on_information_response_default(self, l2cap_control_view): 374 pass 375 376 def _handle_control_packet(self, l2cap_packet): 377 packet_bytes = l2cap_packet.payload 378 l2cap_view = l2cap_packets.BasicFrameView(bt_packets.PacketViewLittleEndian(list(packet_bytes))) 379 if l2cap_view.GetChannelId() != 1: 380 return 381 l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload()) 382 fn = self.control_table.get(l2cap_control_view.GetCode()) 383 if fn is not None: 384 fn(l2cap_control_view) 385