1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16from datetime import timedelta 17 18from google.protobuf import empty_pb2 as empty_proto 19from blueberry.tests.gd.cert.event_stream import EventStream 20from blueberry.tests.gd.cert.event_stream import FilteringEventStream 21from blueberry.tests.gd.cert.event_stream import IEventStream 22from blueberry.tests.gd.cert.closable import Closable 23from blueberry.tests.gd.cert.closable import safeClose 24from blueberry.tests.gd.cert.captures import HciCaptures 25from blueberry.tests.gd.cert.truth import assertThat 26from blueberry.facade.hci import hci_facade_pb2 as hci_facade 27from blueberry.facade import common_pb2 as common 28from blueberry.tests.gd.cert.matchers import HciMatchers 29import hci_packets as hci 30import blueberry.utils.bluetooth as bluetooth 31 32 33class PyHciAclConnection(IEventStream): 34 35 def __init__(self, handle, acl_stream, device): 36 self.handle = int(handle) 37 self.device = device 38 # todo, handle we got is 0, so doesn't match - fix before enabling filtering 39 self.our_acl_stream = FilteringEventStream(acl_stream, None) 40 41 def send(self, pb_flag, b_flag, data: bytes): 42 assert isinstance(data, bytes) 43 acl = hci.Acl(handle=self.handle, packet_boundary_flag=pb_flag, broadcast_flag=b_flag, payload=data) 44 self.device.hci.SendAcl(common.Data(payload=acl.serialize())) 45 46 def send_first(self, data: bytes): 47 self.send(hci.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, hci.BroadcastFlag.POINT_TO_POINT, data) 48 49 def send_continuing(self, data): 50 self.send(hci.PacketBoundaryFlag.CONTINUING_FRAGMENT, hci.BroadcastFlag.POINT_TO_POINT, data) 51 52 def get_event_queue(self): 53 return self.our_acl_stream.get_event_queue() 54 55 56class PyHciLeAclConnection(IEventStream): 57 58 def __init__(self, handle, acl_stream, device, peer, peer_type, peer_resolvable, local_resolvable): 59 self.handle = int(handle) 60 self.device = device 61 self.peer = peer 62 self.peer_type = peer_type 63 self.peer_resolvable = peer_resolvable 64 self.local_resolvable = local_resolvable 65 # todo, handle we got is 0, so doesn't match - fix before enabling filtering 66 self.our_acl_stream = FilteringEventStream(acl_stream, None) 67 68 def send(self, pb_flag, b_flag, data: bytes): 69 assert isinstance(data, bytes) 70 acl = hci.Acl(handle=self.handle, packet_boundary_flag=pb_flag, broadcast_flag=b_flag, payload=data) 71 self.device.hci.SendAcl(common.Data(payload=acl.serialize())) 72 73 def send_first(self, data: bytes): 74 self.send(hci.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, hci.BroadcastFlag.POINT_TO_POINT, data) 75 76 def send_continuing(self, data: bytes): 77 self.send(hci.PacketBoundaryFlag.CONTINUING_FRAGMENT, hci.BroadcastFlag.POINT_TO_POINT, data) 78 79 def get_event_queue(self): 80 return self.our_acl_stream.get_event_queue() 81 82 def local_resolvable_address(self): 83 return self.local_resolvable 84 85 def peer_resolvable_address(self): 86 return self.peer_resolvable 87 88 def peer_address(self): 89 return self.peer 90 91 92class PyHciAdvertisement(object): 93 94 def __init__(self, handle, py_hci): 95 self.handle = handle 96 self.py_hci = py_hci 97 98 def set_data(self, complete_name): 99 self.py_hci.send_command( 100 hci.LeSetExtendedAdvertisingData( 101 advertising_handle=self.handle, 102 operation=hci.Operation.COMPLETE_ADVERTISEMENT, 103 fragment_preference=hci.FragmentPreference.CONTROLLER_SHOULD_NOT, 104 advertising_data=[hci.GapData(data_type=hci.GapDataType.COMPLETE_LOCAL_NAME, 105 data=list(complete_name))])) 106 107 def set_scan_response(self, shortened_name): 108 self.py_hci.send_command( 109 hci.LeSetExtendedScanResponseData(advertising_handle=self.handle, 110 operation=hci.Operation.COMPLETE_ADVERTISEMENT, 111 fragment_preference=hci.FragmentPreference.CONTROLLER_SHOULD_NOT, 112 scan_response_data=[ 113 hci.GapData(data_type=hci.GapDataType.SHORTENED_LOCAL_NAME, 114 data=list(shortened_name)) 115 ])) 116 117 def start(self): 118 self.py_hci.send_command( 119 hci.LeSetExtendedAdvertisingEnable(enable=hci.Enable.ENABLED, 120 enabled_sets=[ 121 hci.EnabledSet(advertising_handle=self.handle, 122 duration=0, 123 max_extended_advertising_events=0) 124 ])) 125 assertThat(self.py_hci.get_event_stream()).emits( 126 HciMatchers.CommandComplete(hci.OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE)) 127 128 129class PyHci(Closable): 130 131 event_stream = None 132 le_event_stream = None 133 acl_stream = None 134 135 def __init__(self, device, acl_streaming=False): 136 """ 137 If you are planning on personally using the ACL data stream 138 coming from HCI, specify acl_streaming=True. You probably only 139 want this if you are testing HCI itself. 140 """ 141 self.device = device 142 self.event_stream = EventStream(self.device.hci.StreamEvents(empty_proto.Empty())) 143 self.le_event_stream = EventStream(self.device.hci.StreamLeSubevents(empty_proto.Empty())) 144 if acl_streaming: 145 self.register_for_events(hci.EventCode.ROLE_CHANGE, hci.EventCode.CONNECTION_REQUEST, 146 hci.EventCode.CONNECTION_COMPLETE, hci.EventCode.CONNECTION_PACKET_TYPE_CHANGED) 147 self.register_for_le_events(hci.SubeventCode.ENHANCED_CONNECTION_COMPLETE) 148 self.acl_stream = EventStream(self.device.hci.StreamAcl(empty_proto.Empty())) 149 150 def close(self): 151 safeClose(self.event_stream) 152 safeClose(self.le_event_stream) 153 safeClose(self.acl_stream) 154 155 def get_event_stream(self): 156 return self.event_stream 157 158 def get_le_event_stream(self): 159 return self.le_event_stream 160 161 def get_raw_acl_stream(self): 162 if self.acl_stream is None: 163 raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__) 164 return self.acl_stream 165 166 def register_for_events(self, *event_codes): 167 for event_code in event_codes: 168 self.device.hci.RequestEvent(hci_facade.EventRequest(code=int(event_code))) 169 170 def register_for_le_events(self, *event_codes): 171 for event_code in event_codes: 172 self.device.hci.RequestLeSubevent(hci_facade.EventRequest(code=int(event_code))) 173 174 def send_command(self, command: hci.Packet): 175 self.device.hci.SendCommand(common.Data(payload=command.serialize())) 176 177 def enable_inquiry_and_page_scan(self): 178 self.send_command(hci.WriteScanEnable(scan_enable=hci.ScanEnable.INQUIRY_AND_PAGE_SCAN)) 179 180 def read_own_address(self) -> bluetooth.Address: 181 self.send_command(hci.ReadBdAddr()) 182 read_bd_addr = HciCaptures.ReadBdAddrCompleteCapture() 183 assertThat(self.event_stream).emits(read_bd_addr) 184 return read_bd_addr.get().bd_addr 185 186 def initiate_connection(self, remote_addr): 187 self.send_command( 188 hci.CreateConnection(bd_addr=bluetooth.Address(remote_addr), 189 packet_type=0xcc18, 190 page_scan_repetition_mode=hci.PageScanRepetitionMode.R1, 191 clock_offset=0x0, 192 clock_offset_valid=hci.ClockOffsetValid.INVALID, 193 allow_role_switch=hci.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH)) 194 195 def accept_connection(self): 196 connection_request = HciCaptures.ConnectionRequestCapture() 197 assertThat(self.event_stream).emits(connection_request) 198 199 self.send_command( 200 hci.AcceptConnectionRequest(bd_addr=bluetooth.Address(connection_request.get().bd_addr), 201 role=hci.AcceptConnectionRequestRole.REMAIN_PERIPHERAL)) 202 return self.complete_connection() 203 204 def complete_connection(self): 205 connection_complete = HciCaptures.ConnectionCompleteCapture() 206 assertThat(self.event_stream).emits(connection_complete) 207 208 handle = connection_complete.get().connection_handle 209 if self.acl_stream is None: 210 raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__) 211 return PyHciAclConnection(handle, self.acl_stream, self.device) 212 213 def set_random_le_address(self, addr): 214 self.send_command(hci.LeSetRandomAddress(random_address=bluetooth.Address(addr))) 215 assertThat(self.event_stream).emits(HciMatchers.CommandComplete(hci.OpCode.LE_SET_RANDOM_ADDRESS)) 216 217 def initiate_le_connection(self, remote_addr): 218 self.send_command( 219 hci.LeExtendedCreateConnection(initiator_filter_policy=hci.InitiatorFilterPolicy.USE_PEER_ADDRESS, 220 own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS, 221 peer_address_type=hci.AddressType.RANDOM_DEVICE_ADDRESS, 222 peer_address=bluetooth.Address(remote_addr), 223 initiating_phys=1, 224 phy_scan_parameters=[ 225 hci.LeCreateConnPhyScanParameters(scan_interval=0x60, 226 scan_window=0x30, 227 conn_interval_min=0x18, 228 conn_interval_max=0x28, 229 conn_latency=0, 230 supervision_timeout=0x1f4, 231 min_ce_length=0, 232 max_ce_length=0) 233 ])) 234 assertThat(self.event_stream).emits(HciMatchers.CommandStatus(hci.OpCode.LE_EXTENDED_CREATE_CONNECTION)) 235 236 def incoming_le_connection(self): 237 connection_complete = HciCaptures.LeConnectionCompleteCapture() 238 assertThat(self.le_event_stream).emits(connection_complete) 239 240 handle = connection_complete.get().connection_handle 241 peer = connection_complete.get().peer_address 242 peer_type = connection_complete.get().peer_address_type 243 local_resolvable = connection_complete.get().local_resolvable_private_address 244 peer_resolvable = connection_complete.get().peer_resolvable_private_address 245 if self.acl_stream is None: 246 raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__) 247 return PyHciLeAclConnection(handle, self.acl_stream, self.device, repr(peer), peer_type, repr(peer_resolvable), 248 repr(local_resolvable)) 249 250 def incoming_le_connection_fails(self): 251 connection_complete = HciCaptures.LeConnectionCompleteCapture() 252 assertThat(self.le_event_stream).emitsNone(connection_complete, timeout=timedelta(seconds=5)) 253 254 def add_device_to_resolving_list(self, peer_address_type, peer_address, peer_irk, local_irk): 255 self.send_command( 256 hci.LeAddDeviceToResolvingList(peer_identity_address_type=peer_address_type, 257 peer_identity_address=bluetooth.Address(peer_address), 258 peer_irk=peer_irk, 259 local_irk=local_irk)) 260 261 def create_advertisement(self, 262 handle, 263 own_address: str, 264 properties=hci.LegacyAdvertisingEventProperties.ADV_IND, 265 min_interval=400, 266 max_interval=450, 267 channel_map=7, 268 own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS, 269 peer_address_type=hci.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, 270 peer_address='00:00:00:00:00:00', 271 filter_policy=hci.AdvertisingFilterPolicy.ALL_DEVICES, 272 tx_power=0xF8, 273 sid=1, 274 scan_request_notification=hci.Enable.DISABLED): 275 276 self.send_command( 277 hci.LeSetExtendedAdvertisingParametersLegacy(advertising_handle=handle, 278 legacy_advertising_event_properties=properties, 279 primary_advertising_interval_min=min_interval, 280 primary_advertising_interval_max=max_interval, 281 primary_advertising_channel_map=channel_map, 282 own_address_type=own_address_type, 283 peer_address_type=peer_address_type, 284 peer_address=bluetooth.Address(peer_address), 285 advertising_filter_policy=filter_policy, 286 advertising_tx_power=tx_power, 287 advertising_sid=sid, 288 scan_request_notification_enable=scan_request_notification)) 289 290 self.send_command( 291 hci.LeSetAdvertisingSetRandomAddress(advertising_handle=handle, 292 random_address=bluetooth.Address(own_address))) 293 294 return PyHciAdvertisement(handle, self) 295