1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from google.protobuf import empty_pb2 as empty_proto 18 19from l2cap.classic import facade_pb2 as l2cap_facade_pb2 20from l2cap.classic.facade_pb2 import LinkSecurityInterfaceCallbackEventType 21from l2cap.le import facade_pb2 as l2cap_le_facade_pb2 22from l2cap.le.facade_pb2 import SecurityLevel 23from bluetooth_packets_python3 import hci_packets 24from bluetooth_packets_python3 import l2cap_packets 25from cert.event_stream import FilteringEventStream 26from cert.event_stream import EventStream, IEventStream 27from cert.closable import Closable, safeClose 28from cert.py_hci import PyHci 29from cert.matchers import HciMatchers 30from cert.matchers import L2capMatchers 31from cert.truth import assertThat 32from facade import common_pb2 as common 33 34 35class PyL2capChannel(IEventStream): 36 37 def __init__(self, device, psm, l2cap_stream): 38 self._device = device 39 self._psm = psm 40 self._le_l2cap_stream = l2cap_stream 41 self._our_le_l2cap_view = FilteringEventStream(self._le_l2cap_stream, 42 L2capMatchers.PacketPayloadWithMatchingPsm(self._psm)) 43 44 def get_event_queue(self): 45 return self._our_le_l2cap_view.get_event_queue() 46 47 def send(self, payload): 48 self._device.l2cap.SendDynamicChannelPacket( 49 l2cap_facade_pb2.DynamicChannelPacket(psm=self._psm, payload=payload)) 50 51 def close_channel(self): 52 self._device.l2cap.CloseChannel(l2cap_facade_pb2.CloseChannelRequest(psm=self._psm)) 53 54 def set_traffic_paused(self, paused): 55 self._device.l2cap.SetTrafficPaused(l2cap_facade_pb2.SetTrafficPausedRequest(psm=self._psm, paused=paused)) 56 57 58class _ClassicConnectionResponseFutureWrapper(object): 59 """ 60 The future object returned when we send a connection request from DUT. Can be used to get connection status and 61 create the corresponding PyL2capDynamicChannel object later 62 """ 63 64 def __init__(self, grpc_response_future, device, psm, l2cap_stream): 65 self._grpc_response_future = grpc_response_future 66 self._device = device 67 self._psm = psm 68 self._l2cap_stream = l2cap_stream 69 70 def get_channel(self): 71 return PyL2capChannel(self._device, self._psm, self._l2cap_stream) 72 73 74class PyL2cap(Closable): 75 76 def __init__(self, device, cert_address, has_security=False): 77 self._device = device 78 self._cert_address = cert_address 79 self._hci = PyHci(device) 80 self._l2cap_stream = EventStream(self._device.l2cap.FetchL2capData(empty_proto.Empty())) 81 self._security_connection_event_stream = EventStream( 82 self._device.l2cap.FetchSecurityConnectionEvents(empty_proto.Empty())) 83 if has_security == False: 84 self._hci.register_for_events(hci_packets.EventCode.LINK_KEY_REQUEST) 85 86 def close(self): 87 safeClose(self._l2cap_stream) 88 safeClose(self._security_connection_event_stream) 89 safeClose(self._hci) 90 91 def register_dynamic_channel(self, psm=0x33, mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC): 92 self._device.l2cap.SetDynamicChannel( 93 l2cap_facade_pb2.SetEnableDynamicChannelRequest(psm=psm, retransmission_mode=mode)) 94 return PyL2capChannel(self._device, psm, self._l2cap_stream) 95 96 def connect_dynamic_channel_to_cert(self, psm=0x33, mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC): 97 """ 98 Send open Dynamic channel request to CERT. 99 Get a future for connection result, to be used after CERT accepts request 100 """ 101 self.register_dynamic_channel(psm, mode) 102 response_future = self._device.l2cap.OpenChannel.future( 103 l2cap_facade_pb2.OpenChannelRequest(psm=psm, remote=self._cert_address, mode=mode)) 104 105 return _ClassicConnectionResponseFutureWrapper(response_future, self._device, psm, self._l2cap_stream) 106 107 def get_channel_queue_buffer_size(self): 108 return self._device.l2cap.GetChannelQueueDepth(empty_proto.Empty()).size 109 110 def initiate_connection_for_security(self): 111 """ 112 Establish an ACL for the specific purpose of pairing devices 113 """ 114 self._device.l2cap.InitiateConnectionForSecurity(self._cert_address) 115 116 def get_security_connection_event_stream(self): 117 """ 118 Stream of Link related events. Events are returned with an address. 119 Events map to the LinkSecurityInterfaceListener callbacks 120 """ 121 return self._security_connection_event_stream 122 123 def security_link_hold(self): 124 """ 125 Holds open the ACL indefinitely allowing for the security handshake 126 to take place 127 """ 128 self._device.l2cap.SecurityLinkHold(self._cert_address) 129 130 def security_link_ensure_authenticated(self): 131 """ 132 Triggers authentication process by sending HCI event AUTHENTICATION_REQUESTED 133 """ 134 self._device.l2cap.SecurityLinkEnsureAuthenticated(self._cert_address) 135 136 def security_link_release(self): 137 """ 138 Releases a Held open ACL allowing for the ACL to time out after the default time 139 """ 140 self._device.l2cap.SecurityLinkRelease(self._cert_address) 141 142 def security_link_disconnect(self): 143 """ 144 Immediately release and disconnect ACL 145 """ 146 self._device.l2cap.SecurityLinkDisconnect(self._cert_address) 147 148 def verify_security_connection(self): 149 """ 150 Verify that we get a connection and a link key request 151 """ 152 assertThat(self.get_security_connection_event_stream()).emits( 153 lambda event: event.event_type == LinkSecurityInterfaceCallbackEventType.ON_CONNECTED) 154 assertThat(self._hci.get_event_stream()).emits(HciMatchers.LinkKeyRequest()) 155 156 157class PyLeL2capFixedChannel(IEventStream): 158 159 def __init__(self, device, cid, l2cap_stream): 160 self._device = device 161 self._cid = cid 162 self._le_l2cap_stream = l2cap_stream 163 self._our_le_l2cap_view = FilteringEventStream(self._le_l2cap_stream, 164 L2capMatchers.PacketPayloadWithMatchingCid(self._cid)) 165 166 def get_event_queue(self): 167 return self._our_le_l2cap_view.get_event_queue() 168 169 def send(self, payload): 170 self._device.l2cap_le.SendFixedChannelPacket( 171 l2cap_le_facade_pb2.FixedChannelPacket(cid=self._cid, payload=payload)) 172 173 def close_channel(self): 174 self._device.l2cap_le.SetFixedChannel( 175 l2cap_le_facade_pb2.SetEnableFixedChannelRequest(cid=self._cid, enable=False)) 176 177 178class PyLeL2capDynamicChannel(IEventStream): 179 180 def __init__(self, device, cert_address, psm, l2cap_stream): 181 self._device = device 182 self._cert_address = cert_address 183 self._psm = psm 184 self._le_l2cap_stream = l2cap_stream 185 self._our_le_l2cap_view = FilteringEventStream(self._le_l2cap_stream, 186 L2capMatchers.PacketPayloadWithMatchingPsm(self._psm)) 187 188 def get_event_queue(self): 189 return self._our_le_l2cap_view.get_event_queue() 190 191 def send(self, payload): 192 self._device.l2cap_le.SendDynamicChannelPacket( 193 l2cap_le_facade_pb2.DynamicChannelPacket(psm=self._psm, payload=payload)) 194 195 def close_channel(self): 196 self._device.l2cap_le.CloseDynamicChannel( 197 l2cap_le_facade_pb2.CloseDynamicChannelRequest(remote=self._cert_address, psm=self._psm)) 198 199 200class _CreditBasedConnectionResponseFutureWrapper(object): 201 """ 202 The future object returned when we send a connection request from DUT. Can be used to get connection status and 203 create the corresponding PyLeL2capDynamicChannel object later 204 """ 205 206 def __init__(self, grpc_response_future, device, cert_address, psm, le_l2cap_stream): 207 self._grpc_response_future = grpc_response_future 208 self._device = device 209 self._cert_address = cert_address 210 self._psm = psm 211 self._le_l2cap_stream = le_l2cap_stream 212 213 def get_status(self): 214 return l2cap_packets.LeCreditBasedConnectionResponseResult(self._grpc_response_future.result().status) 215 216 def get_channel(self): 217 assertThat(self.get_status()).isEqualTo(l2cap_packets.LeCreditBasedConnectionResponseResult.SUCCESS) 218 return PyLeL2capDynamicChannel(self._device, self._cert_address, self._psm, self._le_l2cap_stream) 219 220 221class PyLeL2cap(Closable): 222 223 def __init__(self, device): 224 self._device = device 225 self._le_l2cap_stream = EventStream(self._device.l2cap_le.FetchL2capData(empty_proto.Empty())) 226 227 def close(self): 228 safeClose(self._le_l2cap_stream) 229 230 def enable_fixed_channel(self, cid=4): 231 self._device.l2cap_le.SetFixedChannel(l2cap_le_facade_pb2.SetEnableFixedChannelRequest(cid=cid, enable=True)) 232 233 def get_fixed_channel(self, cid=4): 234 return PyLeL2capFixedChannel(self._device, cid, self._le_l2cap_stream) 235 236 def register_coc(self, cert_address, psm=0x33, security_level=SecurityLevel.NO_SECURITY): 237 self._device.l2cap_le.SetDynamicChannel( 238 l2cap_le_facade_pb2.SetEnableDynamicChannelRequest(psm=psm, enable=True, security_level=security_level)) 239 return PyLeL2capDynamicChannel(self._device, cert_address, psm, self._le_l2cap_stream) 240 241 def connect_coc_to_cert(self, cert_address, psm=0x33): 242 """ 243 Send open LE COC request to CERT. Get a future for connection result, to be used after CERT accepts request 244 """ 245 self.register_coc(cert_address, psm) 246 response_future = self._device.l2cap_le.OpenDynamicChannel.future( 247 l2cap_le_facade_pb2.OpenDynamicChannelRequest(psm=psm, remote=cert_address)) 248 249 return _CreditBasedConnectionResponseFutureWrapper(response_future, self._device, cert_address, psm, 250 self._le_l2cap_stream) 251 252 def update_connection_parameter(self, 253 conn_interval_min=0x10, 254 conn_interval_max=0x10, 255 conn_latency=0x0a, 256 supervision_timeout=0x64, 257 min_ce_length=12, 258 max_ce_length=12): 259 self._device.l2cap_le.SendConnectionParameterUpdate( 260 l2cap_le_facade_pb2.ConnectionParameter( 261 conn_interval_min=conn_interval_min, 262 conn_interval_max=conn_interval_max, 263 conn_latency=conn_latency, 264 supervision_timeout=supervision_timeout, 265 min_ce_length=min_ce_length, 266 max_ce_length=max_ce_length)) 267