1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from google.protobuf import empty_pb2 as empty_proto 18from blueberry.tests.gd.cert.event_stream import EventStream 19from blueberry.tests.gd.cert.event_stream import FilteringEventStream 20from blueberry.tests.gd.cert.event_stream import IEventStream 21from blueberry.tests.gd.cert.closable import Closable 22from blueberry.tests.gd.cert.closable import safeClose 23from blueberry.tests.gd.cert.captures import HciCaptures 24from blueberry.tests.gd.cert.truth import assertThat 25from blueberry.tests.gd.cert.matchers import HciMatchers 26from blueberry.facade import common_pb2 as common 27import hci_packets as hci 28from blueberry.utils import bluetooth 29 30 31class PyHalAclConnection(IEventStream): 32 33 def __init__(self, handle, acl_stream, device): 34 self.handle = int(handle) 35 self.device = device 36 self.our_acl_stream = FilteringEventStream(acl_stream, None) 37 38 def send(self, pb_flag, b_flag, data: bytes): 39 assert isinstance(data, bytes) 40 acl = hci.Acl(handle=self.handle, packet_boundary_flag=pb_flag, broadcast_flag=b_flag, payload=data) 41 self.device.hal.SendAcl(common.Data(payload=acl.serialize())) 42 43 def send_first(self, data: bytes): 44 assert isinstance(data, bytes) 45 self.send(hci.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, hci.BroadcastFlag.POINT_TO_POINT, data) 46 47 def get_event_queue(self): 48 return self.our_acl_stream.get_event_queue() 49 50 51class PyHalAdvertisement(object): 52 53 def __init__(self, handle, py_hal, is_legacy): 54 self.handle = handle 55 self.py_hal = py_hal 56 self.legacy = is_legacy 57 58 def set_data(self, complete_name): 59 advertising_data = [hci.GapData(data_type=hci.GapDataType.COMPLETE_LOCAL_NAME, data=list(complete_name))] 60 61 if self.legacy: 62 self.py_hal.send_hci_command(hci.LeSetAdvertisingData(advertising_data=advertising_data)) 63 self.py_hal.wait_for_complete(hci.OpCode.LE_SET_ADVERTISING_DATA) 64 else: 65 self.py_hal.send_hci_command( 66 hci.LeSetExtendedAdvertisingData(advertising_handle=self.handle, 67 operation=hci.Operation.COMPLETE_ADVERTISEMENT, 68 fragment_preference=hci.FragmentPreference.CONTROLLER_SHOULD_NOT, 69 advertising_data=advertising_data)) 70 self.py_hal.wait_for_complete(hci.OpCode.LE_SET_EXTENDED_ADVERTISING_DATA) 71 72 def set_scan_response(self, shortened_name): 73 advertising_data = [hci.GapData(data_type=hci.GapDataType.SHORTENED_LOCAL_NAME, data=list(shortened_name))] 74 75 if self.legacy: 76 self.py_hal.send_hci_command(hci.LeSetScanResponseData(advertising_data=advertising_data)) 77 self.py_hal.wait_for_complete(hci.OpCode.LE_SET_SCAN_RESPONSE_DATA) 78 else: 79 self.py_hal.send_hci_command( 80 hci.LeSetExtendedScanResponseData(advertising_handle=self.handle, 81 operation=hci.Operation.COMPLETE_ADVERTISEMENT, 82 fragment_preference=hci.FragmentPreference.CONTROLLER_SHOULD_NOT, 83 scan_response_data=advertising_data)) 84 self.py_hal.wait_for_complete(hci.OpCode.LE_SET_EXTENDED_SCAN_RESPONSE_DATA) 85 86 def start(self): 87 if self.legacy: 88 self.py_hal.send_hci_command(hci.LeSetAdvertisingEnable(advertising_enable=hci.Enable.ENABLED)) 89 self.py_hal.wait_for_complete(hci.OpCode.LE_SET_ADVERTISING_ENABLE) 90 else: 91 self.py_hal.send_hci_command( 92 hci.LeSetExtendedAdvertisingEnable(enable=hci.Enable.ENABLED, 93 enabled_sets=[ 94 hci.EnabledSet(advertising_handle=self.handle, 95 duration=0, 96 max_extended_advertising_events=0) 97 ])) 98 self.py_hal.wait_for_complete(hci.OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE) 99 100 def stop(self): 101 if self.legacy: 102 self.py_hal.send_hci_command(hci.LeSetAdvertisingEnable(advertising_enable=hci.Enable.DISABLED)) 103 self.py_hal.wait_for_complete(hci.OpCode.LE_SET_ADVERTISING_ENABLE) 104 else: 105 self.py_hal.send_hci_command( 106 hci.LeSetExtendedAdvertisingEnable(enable=hci.Enable.DISABLED, 107 enabled_sets=[ 108 hci.EnabledSet(advertising_handle=self.handle, 109 duration=0, 110 max_extended_advertising_events=0) 111 ])) 112 self.py_hal.wait_for_complete(hci.OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE) 113 114 115class PyHal(Closable): 116 117 def __init__(self, device): 118 self.device = device 119 120 self.hci_event_stream = EventStream(self.device.hal.StreamEvents(empty_proto.Empty())) 121 self.acl_stream = EventStream(self.device.hal.StreamAcl(empty_proto.Empty())) 122 123 self.event_mask = 0x1FFF_FFFF_FFFF # Default Event Mask (Core Vol 4 [E] 7.3.1) 124 self.le_event_mask = 0x0000_0000_001F # Default LE Event Mask (Core Vol 4 [E] 7.8.1) 125 126 # We don't deal with SCO for now 127 128 def close(self): 129 safeClose(self.hci_event_stream) 130 safeClose(self.acl_stream) 131 132 def get_hci_event_stream(self): 133 return self.hci_event_stream 134 135 def wait_for_complete(self, opcode): 136 assertThat(self.hci_event_stream).emits(HciMatchers.CommandComplete(opcode)) 137 138 def wait_for_status(self, opcode): 139 assertThat(self.hci_event_stream).emits(HciMatchers.CommandStatus(opcode)) 140 141 def get_acl_stream(self): 142 return self.acl_stream 143 144 def send_hci_command(self, command: hci.Packet): 145 self.device.hal.SendCommand(common.Data(payload=command.serialize())) 146 147 def send_acl(self, handle, pb_flag, b_flag, data: bytes): 148 acl = hci.Acl(handle=handle, packet_boundary_flag=pb_flag, broadcast_flag=b_flag, payload=data) 149 self.device.hal.SendAcl(common.Data(payload=acl.serialize())) 150 151 def send_acl_first(self, handle, data: bytes): 152 self.send_acl(handle, hci.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, 153 hci.BroadcastFlag.POINT_TO_POINT, data) 154 155 def read_own_address(self) -> bluetooth.Address: 156 self.send_hci_command(hci.ReadBdAddr()) 157 read_bd_addr = HciCaptures.ReadBdAddrCompleteCapture() 158 assertThat(self.hci_event_stream).emits(read_bd_addr) 159 return read_bd_addr.get().bd_addr 160 161 def set_random_le_address(self, addr): 162 self.send_hci_command(hci.LeSetRandomAddress(random_address=bluetooth.Address(addr))) 163 self.wait_for_complete(hci.OpCode.LE_SET_RANDOM_ADDRESS) 164 165 def set_scan_parameters(self): 166 self.send_hci_command( 167 hci.LeSetExtendedScanParameters(own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS, 168 scanning_filter_policy=hci.LeScanningFilterPolicy.ACCEPT_ALL, 169 scanning_phys=1, 170 parameters=[ 171 hci.PhyScanParameters(le_scan_type=hci.LeScanType.ACTIVE, 172 le_scan_interval=6553, 173 le_scan_window=6553) 174 ])) 175 self.wait_for_complete(hci.OpCode.LE_SET_EXTENDED_SCAN_PARAMETERS) 176 177 def unmask_event(self, *event_codes): 178 for event_code in event_codes: 179 self.event_mask |= 1 << (int(event_code) - 1) 180 self.send_hci_command(hci.SetEventMask(event_mask=self.event_mask)) 181 182 def unmask_le_event(self, *subevent_codes): 183 for subevent_code in subevent_codes: 184 self.le_event_mask |= 1 << (int(subevent_code) - 1) 185 self.send_hci_command(hci.LeSetEventMask(le_event_mask=self.le_event_mask)) 186 187 def start_scanning(self): 188 self.send_hci_command( 189 hci.LeSetExtendedScanEnable(enable=hci.Enable.ENABLED, 190 filter_duplicates=hci.FilterDuplicates.DISABLED, 191 duration=0, 192 period=0)) 193 self.wait_for_complete(hci.OpCode.LE_SET_EXTENDED_SCAN_ENABLE) 194 195 def stop_scanning(self): 196 self.send_hci_command( 197 hci.LeSetExtendedScanEnable(enable=hci.Enable.DISABLED, 198 filter_duplicates=hci.FilterDuplicates.DISABLED, 199 duration=0, 200 period=0)) 201 self.wait_for_complete(hci.OpCode.LE_SET_EXTENDED_SCAN_ENABLE) 202 203 def reset(self): 204 self.send_hci_command(hci.Reset()) 205 self.wait_for_complete(hci.OpCode.RESET) 206 207 def enable_inquiry_and_page_scan(self): 208 self.send_hci_command(hci.WriteScanEnable(scan_enable=hci.ScanEnable.INQUIRY_AND_PAGE_SCAN)) 209 210 def initiate_connection(self, remote_addr): 211 self.send_hci_command( 212 hci.CreateConnection(bd_addr=bluetooth.Address(remote_addr), 213 packet_type=0xcc18, 214 page_scan_repetition_mode=hci.PageScanRepetitionMode.R1, 215 clock_offset=0x0, 216 clock_offset_valid=hci.ClockOffsetValid.INVALID, 217 allow_role_switch=hci.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH)) 218 219 def accept_connection(self): 220 connection_request = HciCaptures.ConnectionRequestCapture() 221 assertThat(self.hci_event_stream).emits(connection_request) 222 223 self.send_hci_command( 224 hci.AcceptConnectionRequest(bd_addr=connection_request.get().bd_addr, 225 role=hci.AcceptConnectionRequestRole.REMAIN_PERIPHERAL)) 226 return self.complete_connection() 227 228 def complete_connection(self): 229 connection_complete = HciCaptures.ConnectionCompleteCapture() 230 assertThat(self.hci_event_stream).emits(connection_complete) 231 232 handle = connection_complete.get().connection_handle 233 return PyHalAclConnection(handle, self.acl_stream, self.device) 234 235 def initiate_le_connection(self, remote_addr): 236 self.send_hci_command( 237 hci.LeExtendedCreateConnection(initiator_filter_policy=hci.InitiatorFilterPolicy.USE_PEER_ADDRESS, 238 own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS, 239 peer_address_type=hci.AddressType.RANDOM_DEVICE_ADDRESS, 240 peer_address=bluetooth.Address(remote_addr), 241 initiating_phys=1, 242 phy_scan_parameters=[ 243 hci.LeCreateConnPhyScanParameters(scan_interval=0x60, 244 scan_window=0x30, 245 conn_interval_min=0x18, 246 conn_interval_max=0x28, 247 conn_latency=0, 248 supervision_timeout=0x1f4, 249 min_ce_length=0, 250 max_ce_length=0) 251 ])) 252 self.wait_for_status(hci.OpCode.LE_EXTENDED_CREATE_CONNECTION) 253 254 def add_to_filter_accept_list(self, remote_addr): 255 self.send_hci_command( 256 hci.LeAddDeviceToFilterAcceptList(address_type=hci.FilterAcceptListAddressType.RANDOM, 257 address=bluetooth.Address(remote_addr))) 258 259 def initiate_le_connection_by_filter_accept_list(self, remote_addr): 260 self.send_hci_command( 261 hci.LeExtendedCreateConnection(initiator_filter_policy=hci.InitiatorFilterPolicy.USE_FILTER_ACCEPT_LIST, 262 own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS, 263 peer_address_type=hci.AddressType.RANDOM_DEVICE_ADDRESS, 264 peer_address=bluetooth.Address('00:00:00:00:00:00'), 265 initiating_phys=1, 266 phy_scan_parameters=[ 267 hci.LeCreateConnPhyScanParameters(scan_interval=0x60, 268 scan_window=0x30, 269 conn_interval_min=0x18, 270 conn_interval_max=0x28, 271 conn_latency=0, 272 supervision_timeout=0x1f4, 273 min_ce_length=0, 274 max_ce_length=0) 275 ])) 276 277 def complete_le_connection(self): 278 connection_complete = HciCaptures.LeConnectionCompleteCapture() 279 assertThat(self.hci_event_stream).emits(connection_complete) 280 281 handle = connection_complete.get().connection_handle 282 return PyHalAclConnection(handle, self.acl_stream, self.device) 283 284 def create_advertisement(self, 285 handle, 286 own_address: str, 287 properties=hci.LegacyAdvertisingEventProperties.ADV_IND, 288 min_interval=400, 289 max_interval=450, 290 channel_map=7, 291 own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS, 292 peer_address_type=hci.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, 293 peer_address='00:00:00:00:00:00', 294 filter_policy=hci.AdvertisingFilterPolicy.ALL_DEVICES, 295 tx_power=0xF8, 296 sid=1, 297 scan_request_notification=hci.Enable.DISABLED): 298 299 self.send_hci_command( 300 hci.LeSetExtendedAdvertisingParametersLegacy(advertising_handle=handle, 301 legacy_advertising_event_properties=properties, 302 primary_advertising_interval_min=min_interval, 303 primary_advertising_interval_max=max_interval, 304 primary_advertising_channel_map=channel_map, 305 own_address_type=own_address_type, 306 peer_address_type=peer_address_type, 307 peer_address=bluetooth.Address(peer_address), 308 advertising_filter_policy=filter_policy, 309 advertising_tx_power=tx_power, 310 advertising_sid=sid, 311 scan_request_notification_enable=scan_request_notification)) 312 self.wait_for_complete(hci.OpCode.LE_SET_EXTENDED_ADVERTISING_PARAMETERS) 313 self.send_hci_command( 314 hci.LeSetAdvertisingSetRandomAddress(advertising_handle=handle, 315 random_address=bluetooth.Address(own_address))) 316 self.wait_for_complete(hci.OpCode.LE_SET_ADVERTISING_SET_RANDOM_ADDRESS) 317 318 return PyHalAdvertisement(handle, self, False) 319 320 def create_legacy_advertisement(self, 321 advertising_type=hci.AdvertisingType.ADV_IND, 322 min_interval=400, 323 max_interval=450, 324 channel_map=7, 325 own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS, 326 peer_address_type=hci.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, 327 peer_address='00:00:00:00:00:00', 328 filter_policy=hci.AdvertisingFilterPolicy.ALL_DEVICES): 329 330 self.send_hci_command( 331 hci.LeSetAdvertisingParameters(advertising_interval_min=min_interval, 332 advertising_interval_max=max_interval, 333 advertising_type=advertising_type, 334 own_address_type=own_address_type, 335 peer_address_type=peer_address_type, 336 peer_address=bluetooth.Address(peer_address), 337 advertising_channel_map=channel_map, 338 advertising_filter_policy=filter_policy)) 339 self.wait_for_complete(hci.OpCode.LE_SET_ADVERTISING_PARAMETERS) 340 341 return PyHalAdvertisement(None, self, True) 342