1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from google.protobuf import empty_pb2 as empty_proto 18from cert.event_stream import EventStream 19from cert.event_stream import FilteringEventStream 20from cert.event_stream import IEventStream 21from cert.closable import Closable 22from cert.closable import safeClose 23from cert.captures import HciCaptures 24from bluetooth_packets_python3 import hci_packets 25from cert.truth import assertThat 26from hci.facade import hci_facade_pb2 as hci_facade 27from facade import common_pb2 as common 28from cert.matchers import HciMatchers 29from bluetooth_packets_python3.hci_packets import FilterDuplicates 30from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingLegacyParametersBuilder 31from bluetooth_packets_python3.hci_packets import LegacyAdvertisingProperties 32from bluetooth_packets_python3.hci_packets import PeerAddressType 33from bluetooth_packets_python3.hci_packets import AdvertisingFilterPolicy 34from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingRandomAddressBuilder 35from bluetooth_packets_python3.hci_packets import GapData 36from bluetooth_packets_python3.hci_packets import GapDataType 37from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingDataBuilder 38from bluetooth_packets_python3.hci_packets import Operation 39from bluetooth_packets_python3.hci_packets import OwnAddressType 40from bluetooth_packets_python3.hci_packets import LeScanningFilterPolicy 41from bluetooth_packets_python3.hci_packets import Enable 42from bluetooth_packets_python3.hci_packets import FragmentPreference 43from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingScanResponseBuilder 44from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingEnableBuilder 45from bluetooth_packets_python3.hci_packets import LeSetExtendedScanEnableBuilder 46from bluetooth_packets_python3.hci_packets import EnabledSet 47from bluetooth_packets_python3.hci_packets import OpCode 48from bluetooth_packets_python3.hci_packets import AclBuilder 49from bluetooth_packets_python3 import RawBuilder 50 51 52class PyHciAclConnection(IEventStream): 53 54 def __init__(self, handle, acl_stream, device): 55 self.handle = int(handle) 56 self.device = device 57 # todo, handle we got is 0, so doesn't match - fix before enabling filtering 58 self.our_acl_stream = FilteringEventStream(acl_stream, None) 59 60 def send(self, pb_flag, b_flag, data): 61 acl = AclBuilder(self.handle, pb_flag, b_flag, RawBuilder(data)) 62 self.device.hci.SendAcl(common.Data(payload=bytes(acl.Serialize()))) 63 64 def send_first(self, data): 65 self.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, 66 hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(data)) 67 68 def send_continuing(self, data): 69 self.send(hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, hci_packets.BroadcastFlag.POINT_TO_POINT, 70 bytes(data)) 71 72 def get_event_queue(self): 73 return self.our_acl_stream.get_event_queue() 74 75 76class PyHciAdvertisement(object): 77 78 def __init__(self, handle, py_hci): 79 self.handle = handle 80 self.py_hci = py_hci 81 82 def set_data(self, complete_name): 83 data = GapData() 84 data.data_type = GapDataType.COMPLETE_LOCAL_NAME 85 data.data = list(bytes(complete_name)) 86 self.py_hci.send_command( 87 LeSetExtendedAdvertisingDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT, 88 FragmentPreference.CONTROLLER_SHOULD_NOT, [data])) 89 90 def set_scan_response(self, shortened_name): 91 data = GapData() 92 data.data_type = GapDataType.SHORTENED_LOCAL_NAME 93 data.data = list(bytes(shortened_name)) 94 self.py_hci.send_command( 95 LeSetExtendedAdvertisingScanResponseBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT, 96 FragmentPreference.CONTROLLER_SHOULD_NOT, [data])) 97 98 def start(self): 99 enabled_set = EnabledSet() 100 enabled_set.advertising_handle = self.handle 101 enabled_set.duration = 0 102 enabled_set.max_extended_advertising_events = 0 103 self.py_hci.send_command(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set])) 104 assertThat(self.py_hci.get_event_stream()).emits( 105 HciMatchers.CommandComplete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE)) 106 107 108class PyHci(Closable): 109 110 event_stream = None 111 le_event_stream = None 112 acl_stream = None 113 114 def __init__(self, device, acl_streaming=False): 115 """ 116 If you are planning on personally using the ACL data stream 117 coming from HCI, specify acl_streaming=True. You probably only 118 want this if you are testing HCI itself. 119 """ 120 self.device = device 121 self.event_stream = EventStream(self.device.hci.StreamEvents(empty_proto.Empty())) 122 self.le_event_stream = EventStream(self.device.hci.StreamLeSubevents(empty_proto.Empty())) 123 if acl_streaming: 124 self.register_for_events(hci_packets.EventCode.ROLE_CHANGE, hci_packets.EventCode.CONNECTION_REQUEST, 125 hci_packets.EventCode.CONNECTION_COMPLETE, 126 hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) 127 self.acl_stream = EventStream(self.device.hci.StreamAcl(empty_proto.Empty())) 128 129 def close(self): 130 safeClose(self.event_stream) 131 safeClose(self.le_event_stream) 132 safeClose(self.acl_stream) 133 134 def get_event_stream(self): 135 return self.event_stream 136 137 def get_le_event_stream(self): 138 return self.le_event_stream 139 140 def get_raw_acl_stream(self): 141 if self.acl_stream is None: 142 raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__) 143 return self.acl_stream 144 145 def register_for_events(self, *event_codes): 146 for event_code in event_codes: 147 self.device.hci.RequestEvent(hci_facade.EventRequest(code=int(event_code))) 148 149 def register_for_le_events(self, *event_codes): 150 for event_code in event_codes: 151 self.device.hci.RequestLeSubevent(hci_facade.EventRequest(code=int(event_code))) 152 153 def send_command(self, command): 154 self.device.hci.SendCommand(common.Data(payload=bytes(command.Serialize()))) 155 156 def enable_inquiry_and_page_scan(self): 157 self.send_command(hci_packets.WriteScanEnableBuilder(hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) 158 159 def read_own_address(self): 160 self.send_command(hci_packets.ReadBdAddrBuilder()) 161 read_bd_addr = HciCaptures.ReadBdAddrCompleteCapture() 162 assertThat(self.event_stream).emits(read_bd_addr) 163 return read_bd_addr.get().GetBdAddr() 164 165 def initiate_connection(self, remote_addr): 166 self.send_command( 167 hci_packets.CreateConnectionBuilder( 168 remote_addr if isinstance(remote_addr, str) else remote_addr.decode('utf-8'), 169 0xcc18, # Packet Type 170 hci_packets.PageScanRepetitionMode.R1, 171 0x0, 172 hci_packets.ClockOffsetValid.INVALID, 173 hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH)) 174 175 def accept_connection(self): 176 connection_request = HciCaptures.ConnectionRequestCapture() 177 assertThat(self.event_stream).emits(connection_request) 178 179 self.send_command( 180 hci_packets.AcceptConnectionRequestBuilder(connection_request.get().GetBdAddr(), 181 hci_packets.AcceptConnectionRequestRole.REMAIN_PERIPHERAL)) 182 return self.complete_connection() 183 184 def complete_connection(self): 185 connection_complete = HciCaptures.ConnectionCompleteCapture() 186 assertThat(self.event_stream).emits(connection_complete) 187 188 handle = connection_complete.get().GetConnectionHandle() 189 if self.acl_stream is None: 190 raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__) 191 return PyHciAclConnection(handle, self.acl_stream, self.device) 192 193 def create_advertisement(self, 194 handle, 195 own_address, 196 properties=LegacyAdvertisingProperties.ADV_IND, 197 min_interval=400, 198 max_interval=450, 199 channel_map=7, 200 own_address_type=OwnAddressType.RANDOM_DEVICE_ADDRESS, 201 peer_address_type=PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, 202 peer_address='00:00:00:00:00:00', 203 filter_policy=AdvertisingFilterPolicy.ALL_DEVICES, 204 tx_power=0xF8, 205 sid=1, 206 scan_request_notification=Enable.DISABLED): 207 208 self.send_command( 209 LeSetExtendedAdvertisingLegacyParametersBuilder(handle, properties, min_interval, max_interval, channel_map, 210 own_address_type, peer_address_type, peer_address, 211 filter_policy, tx_power, sid, scan_request_notification)) 212 213 self.send_command(LeSetExtendedAdvertisingRandomAddressBuilder(handle, own_address)) 214 return PyHciAdvertisement(handle, self) 215