#!/usr/bin/env python3 # Copyright 2015 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Dump the configuration of a Bluetooth controller. The script expects to find the generated module hci_packets in PYTHONPATH. The controller is expected to be available through HCI over TCP at the port passed as parameter.""" import argparse import asyncio import collections import sys import hci_packets as hci H4_IDC_CMD = 0x01 H4_IDC_ACL = 0x02 H4_IDC_SCO = 0x03 H4_IDC_EVT = 0x04 H4_IDC_ISO = 0x05 HCI_HEADER_SIZES = dict([(H4_IDC_CMD, 3), (H4_IDC_ACL, 4), (H4_IDC_SCO, 3), (H4_IDC_EVT, 2), (H4_IDC_ISO, 4)]) class Host: def __init__(self): self.evt_queue = collections.deque() self.evt_queue_event = asyncio.Event() async def connect(self, ip: str, port: int): reader, writer = await asyncio.open_connection(ip, port) self.reader = asyncio.create_task(self._read(reader)) self.writer = writer async def _read(self, reader): try: while True: idc = await reader.readexactly(1) assert idc[0] in HCI_HEADER_SIZES header = await reader.readexactly(HCI_HEADER_SIZES[idc[0]]) if idc[0] == H4_IDC_EVT: evt = hci.Event.parse_all(header + (await reader.readexactly(header[1]))) #print(f"<< {evt.__class__.__name__}") self.evt_queue.append(evt) self.evt_queue_event.set() else: assert False except Exception as exn: print(f"Reader interrupted: {exn}") return async def send_cmd(self, cmd: hci.Command): #print(f">> {cmd.__class__.__name__}") packet = bytes([H4_IDC_CMD]) + cmd.serialize() self.writer.write(packet) async def recv_evt(self) -> hci.Event: while not self.evt_queue: await self.evt_queue_event.wait() self.evt_queue_event.clear() return self.evt_queue.popleft() async def expect_evt(self, expected_evt: type) -> hci.Event: evt = await self.recv_evt() assert isinstance(evt, expected_evt) if not isinstance(evt, expected_evt): print("Received unexpected event:") evt.show() print(f"Expected event of type {expected_evt.__name__}") print(f"{list(evt.payload)}") return evt async def br_edr_properties(host: Host): await host.send_cmd(hci.ReadLocalSupportedFeatures()) page0 = await host.expect_evt(hci.ReadLocalSupportedFeaturesComplete) await host.send_cmd(hci.ReadLocalExtendedFeatures(page_number=1)) page1 = await host.expect_evt(hci.ReadLocalExtendedFeaturesComplete) await host.send_cmd(hci.ReadLocalExtendedFeatures(page_number=2)) page2 = await host.expect_evt(hci.ReadLocalExtendedFeaturesComplete) print( f"lmp_features: {{ 0x{page0.lmp_features:x}, 0x{page1.extended_lmp_features:x}, 0x{page2.extended_lmp_features:x} }}" ) await host.send_cmd(hci.ReadBufferSize()) evt = await host.expect_evt(hci.ReadBufferSizeComplete) print(f"acl_data_packet_length: {evt.acl_data_packet_length}") print(f"total_num_acl_data_packets: {evt.total_num_acl_data_packets}") print(f"sco_data_packet_length: {evt.synchronous_data_packet_length}") print(f"total_num_sco_data_packets: {evt.total_num_synchronous_data_packets}") await host.send_cmd(hci.ReadNumberOfSupportedIac()) evt = await host.expect_evt(hci.ReadNumberOfSupportedIacComplete) print(f"num_supported_iac: {evt.num_support_iac}") async def le_properties(host: Host): await host.send_cmd(hci.LeReadLocalSupportedFeatures()) evt = await host.expect_evt(hci.LeReadLocalSupportedFeaturesComplete) print(f"le_features: 0x{evt.le_features:x}") await host.send_cmd(hci.LeReadBufferSizeV2()) evt = await host.expect_evt(hci.LeReadBufferSizeV2Complete) print(f"le_acl_data_packet_length: {evt.le_buffer_size.le_data_packet_length}") print(f"total_num_le_acl_data_packets: {evt.le_buffer_size.total_num_le_packets}") print(f"iso_data_packet_length: {evt.iso_buffer_size.le_data_packet_length}") print(f"total_num_iso_data_packets: {evt.iso_buffer_size.total_num_le_packets}") await host.send_cmd(hci.LeReadFilterAcceptListSize()) evt = await host.expect_evt(hci.LeReadFilterAcceptListSizeComplete) print(f"le_filter_accept_list_size: {evt.filter_accept_list_size}") await host.send_cmd(hci.LeReadResolvingListSize()) evt = await host.expect_evt(hci.LeReadResolvingListSizeComplete) print(f"le_resolving_list_size: {evt.resolving_list_size}") await host.send_cmd(hci.LeReadSupportedStates()) evt = await host.expect_evt(hci.LeReadSupportedStatesComplete) print(f"le_supported_states: 0x{evt.le_states:x}") await host.send_cmd(hci.LeReadMaximumAdvertisingDataLength()) evt = await host.expect_evt(hci.LeReadMaximumAdvertisingDataLengthComplete) print(f"le_max_advertising_data_length: {evt.maximum_advertising_data_length}") await host.send_cmd(hci.LeReadNumberOfSupportedAdvertisingSets()) evt = await host.expect_evt(hci.LeReadNumberOfSupportedAdvertisingSetsComplete) print(f"le_num_supported_advertising_sets: {evt.number_supported_advertising_sets}") await host.send_cmd(hci.LeReadPeriodicAdvertiserListSize()) evt = await host.expect_evt(hci.LeReadPeriodicAdvertiserListSizeComplete) print(f"le_periodic_advertiser_list_size: {evt.periodic_advertiser_list_size}") async def run(tcp_port: int): host = Host() await host.connect('127.0.0.1', tcp_port) await host.send_cmd(hci.Reset()) await host.expect_evt(hci.ResetComplete) await host.send_cmd(hci.ReadLocalVersionInformation()) evt = await host.expect_evt(hci.ReadLocalVersionInformationComplete) print(f"hci_version: {evt.local_version_information.hci_version}") print(f"hci_subversion: 0x{evt.local_version_information.hci_revision:x}") print(f"lmp_version: {evt.local_version_information.lmp_version}") print(f"lmp_subversion: 0x{evt.local_version_information.lmp_subversion:x}") print(f"company_identifier: 0x{evt.local_version_information.manufacturer_name:x}") await host.send_cmd(hci.ReadLocalSupportedCommands()) evt = await host.expect_evt(hci.ReadLocalSupportedCommandsComplete) print(f"supported_commands: {{ {', '.join([f'0x{b:x}' for b in evt.supported_commands])} }}") try: await br_edr_properties(host) except Exception: pass try: await le_properties(host) except Exception: pass def main() -> int: """Generate cxx PDL backend.""" parser = argparse.ArgumentParser(description=__doc__) parser.add_argument('tcp_port', type=int, help='HCI port') return asyncio.run(run(**vars(parser.parse_args()))) if __name__ == '__main__': sys.exit(main())