1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18import sys 19 20from blueberry.utils import bluetooth 21import hci_packets as hci 22 23 24class HciMatchers(object): 25 26 @staticmethod 27 def CommandComplete(opcode): 28 return lambda msg: HciMatchers._is_matching_command_complete(msg.payload, opcode) 29 30 @staticmethod 31 def ExtractMatchingCommandComplete(packet_bytes, opcode=None): 32 return HciMatchers._extract_matching_command_complete(packet_bytes, opcode) 33 34 @staticmethod 35 def _is_matching_command_complete(packet_bytes, opcode=None): 36 return HciMatchers._extract_matching_command_complete(packet_bytes, opcode) is not None 37 38 @staticmethod 39 def _extract_matching_command_complete(packet_bytes, opcode=None): 40 event = HciMatchers._extract_matching_event(packet_bytes, hci.EventCode.COMMAND_COMPLETE) 41 if not isinstance(event, hci.CommandComplete): 42 return None 43 if opcode and event.command_op_code != opcode: 44 return None 45 return event 46 47 @staticmethod 48 def CommandStatus(opcode=None): 49 return lambda msg: HciMatchers._is_matching_command_status(msg.payload, opcode) 50 51 @staticmethod 52 def ExtractMatchingCommandStatus(packet_bytes, opcode=None): 53 return HciMatchers._extract_matching_command_status(packet_bytes, opcode) 54 55 @staticmethod 56 def _is_matching_command_status(packet_bytes, opcode=None): 57 return HciMatchers._extract_matching_command_status(packet_bytes, opcode) is not None 58 59 @staticmethod 60 def _extract_matching_command_status(packet_bytes, opcode=None): 61 event = HciMatchers._extract_matching_event(packet_bytes, hci.EventCode.COMMAND_STATUS) 62 if not isinstance(event, hci.CommandStatus): 63 return None 64 if opcode and event.command_op_code != opcode: 65 return None 66 return event 67 68 @staticmethod 69 def EventWithCode(event_code): 70 return lambda msg: HciMatchers._is_matching_event(msg.payload, event_code) 71 72 @staticmethod 73 def ExtractEventWithCode(packet_bytes, event_code): 74 return HciMatchers._extract_matching_event(packet_bytes, event_code) 75 76 @staticmethod 77 def _is_matching_event(packet_bytes, event_code): 78 return HciMatchers._extract_matching_event(packet_bytes, event_code) is not None 79 80 @staticmethod 81 def _extract_matching_event(packet_bytes, event_code): 82 try: 83 event = hci.Event.parse_all(packet_bytes) 84 return event if event.event_code == event_code else None 85 except Exception as exn: 86 print(sys.stderr, f"Failed to parse incoming event: {exn}") 87 print(sys.stderr, f"Event data: {' '.join([f'{b:02x}' for b in packet_bytes])}") 88 return None 89 90 @staticmethod 91 def LeEventWithCode(subevent_code): 92 return lambda msg: HciMatchers._extract_matching_le_event(msg.payload, subevent_code) is not None 93 94 @staticmethod 95 def ExtractLeEventWithCode(packet_bytes, subevent_code): 96 return HciMatchers._extract_matching_le_event(packet_bytes, subevent_code) 97 98 @staticmethod 99 def _extract_matching_le_event(packet_bytes, subevent_code): 100 event = HciMatchers._extract_matching_event(packet_bytes, hci.EventCode.LE_META_EVENT) 101 if (not isinstance(event, hci.LeMetaEvent) or event.subevent_code != subevent_code): 102 return None 103 104 return event 105 106 @staticmethod 107 def LeAdvertisement(subevent_code=hci.SubeventCode.EXTENDED_ADVERTISING_REPORT, address=None, data=None): 108 return lambda msg: HciMatchers._extract_matching_le_advertisement(msg.payload, subevent_code, address, data 109 ) is not None 110 111 @staticmethod 112 def ExtractLeAdvertisement(packet_bytes, 113 subevent_code=hci.SubeventCode.EXTENDED_ADVERTISING_REPORT, 114 address=None, 115 data=None): 116 return HciMatchers._extract_matching_le_advertisement(packet_bytes, subevent_code, address, data) 117 118 @staticmethod 119 def _extract_matching_le_advertisement(packet_bytes, 120 subevent_code=hci.SubeventCode.EXTENDED_ADVERTISING_REPORT, 121 address=None, 122 data=None): 123 event = HciMatchers._extract_matching_le_event(packet_bytes, subevent_code) 124 if event is None: 125 return None 126 127 matched = False 128 for response in event.responses: 129 matched |= (address == None or response.address == bluetooth.Address(address)) and (data == None or 130 data in packet_bytes) 131 132 return event if matched else None 133 134 @staticmethod 135 def LeConnectionComplete(): 136 return lambda msg: HciMatchers._extract_le_connection_complete(msg.payload) is not None 137 138 @staticmethod 139 def ExtractLeConnectionComplete(packet_bytes): 140 return HciMatchers._extract_le_connection_complete(packet_bytes) 141 142 @staticmethod 143 def _extract_le_connection_complete(packet_bytes): 144 event = HciMatchers._extract_matching_le_event(packet_bytes, hci.SubeventCode.CONNECTION_COMPLETE) 145 if event is not None: 146 return event 147 148 return HciMatchers._extract_matching_le_event(packet_bytes, hci.SubeventCode.ENHANCED_CONNECTION_COMPLETE) 149 150 @staticmethod 151 def LogEventCode(): 152 return lambda event: logging.info("Received event: %x" % hci.Event.parse(event.payload).event_code) 153 154 @staticmethod 155 def LinkKeyRequest(): 156 return HciMatchers.EventWithCode(hci.EventCode.LINK_KEY_REQUEST) 157 158 @staticmethod 159 def IoCapabilityRequest(): 160 return HciMatchers.EventWithCode(hci.EventCode.IO_CAPABILITY_REQUEST) 161 162 @staticmethod 163 def IoCapabilityResponse(): 164 return HciMatchers.EventWithCode(hci.EventCode.IO_CAPABILITY_RESPONSE) 165 166 @staticmethod 167 def UserPasskeyNotification(): 168 return HciMatchers.EventWithCode(hci.EventCode.USER_PASSKEY_NOTIFICATION) 169 170 @staticmethod 171 def UserPasskeyRequest(): 172 return HciMatchers.EventWithCode(hci.EventCode.USER_PASSKEY_REQUEST) 173 174 @staticmethod 175 def UserConfirmationRequest(): 176 return HciMatchers.EventWithCode(hci.EventCode.USER_CONFIRMATION_REQUEST) 177 178 @staticmethod 179 def LinkKeyNotification(): 180 return HciMatchers.EventWithCode(hci.EventCode.LINK_KEY_NOTIFICATION) 181 182 @staticmethod 183 def SimplePairingComplete(): 184 return HciMatchers.EventWithCode(hci.EventCode.SIMPLE_PAIRING_COMPLETE) 185 186 @staticmethod 187 def Disconnect(): 188 return HciMatchers.EventWithCode(hci.EventCode.DISCONNECT) 189 190 @staticmethod 191 def DisconnectionComplete(): 192 return HciMatchers.EventWithCode(hci.EventCode.DISCONNECTION_COMPLETE) 193 194 @staticmethod 195 def RemoteOobDataRequest(): 196 return HciMatchers.EventWithCode(hci.EventCode.REMOTE_OOB_DATA_REQUEST) 197 198 @staticmethod 199 def PinCodeRequest(): 200 return HciMatchers.EventWithCode(hci.EventCode.PIN_CODE_REQUEST) 201 202 @staticmethod 203 def LoopbackOf(packet): 204 return HciMatchers.Exactly(hci.LoopbackCommand(payload=packet)) 205 206 @staticmethod 207 def Exactly(packet): 208 data = bytes(packet.serialize()) 209 return lambda event: data == event.payload 210 211 212class AdvertisingMatchers(object): 213 214 @staticmethod 215 def AdvertisingCallbackMsg(type, advertiser_id=None, status=None, data=None): 216 return lambda event: True if event.message_type == type and (advertiser_id == None or advertiser_id == event.advertiser_id) \ 217 and (status == None or status == event.status) and (data == None or data == event.data) else False 218 219 @staticmethod 220 def AddressMsg(type, advertiser_id=None, address=None): 221 return lambda event: True if event.message_type == type and (advertiser_id == None or advertiser_id == event.advertiser_id) \ 222 and (address == None or address == event.address) else False 223 224 225class ScanningMatchers(object): 226 227 @staticmethod 228 def ScanningCallbackMsg(type, status=None, data=None): 229 return lambda event: True if event.message_type == type and (status == None or status == event.status) \ 230 and (data == None or data == event.data) else False 231 232 233class NeighborMatchers(object): 234 235 @staticmethod 236 def InquiryResult(address): 237 return lambda msg: NeighborMatchers._is_matching_inquiry_result(msg.packet, address) 238 239 @staticmethod 240 def _is_matching_inquiry_result(packet, address): 241 event = HciMatchers.ExtractEventWithCode(packet, hci.EventCode.INQUIRY_RESULT) 242 if not isinstance(event, hci.InquiryResult): 243 return False 244 return any((bluetooth.Address(address) == response.bd_addr for response in event.responses)) 245 246 @staticmethod 247 def InquiryResultwithRssi(address): 248 return lambda msg: NeighborMatchers._is_matching_inquiry_result_with_rssi(msg.packet, address) 249 250 @staticmethod 251 def _is_matching_inquiry_result_with_rssi(packet, address): 252 event = HciMatchers.ExtractEventWithCode(packet, hci.EventCode.INQUIRY_RESULT_WITH_RSSI) 253 if not isinstance(event, hci.InquiryResultWithRssi): 254 return False 255 return any((bluetooth.Address(address) == response.address for response in event.responses)) 256 257 @staticmethod 258 def ExtendedInquiryResult(address): 259 return lambda msg: NeighborMatchers._is_matching_extended_inquiry_result(msg.packet, address) 260 261 @staticmethod 262 def _is_matching_extended_inquiry_result(packet, address): 263 event = HciMatchers.ExtractEventWithCode(packet, hci.EventCode.EXTENDED_INQUIRY_RESULT) 264 if not isinstance(event, (hci.ExtendedInquiryResult, hci.ExtendedInquiryResultRaw)): 265 return False 266 return bluetooth.Address(address) == event.address 267 268 269class SecurityMatchers(object): 270 271 @staticmethod 272 def UiMsg(type, address=None): 273 return lambda event: True if event.message_type == type and (address == None or address == event.peer 274 ) else False 275 276 @staticmethod 277 def BondMsg(type, address=None, reason=None): 278 return lambda event: True if event.message_type == type and (address == None or address == event.peer) and ( 279 reason == None or reason == event.reason) else False 280 281 @staticmethod 282 def HelperMsg(type, address=None): 283 return lambda event: True if event.message_type == type and (address == None or address == event.peer 284 ) else False 285 286 287class IsoMatchers(object): 288 289 @staticmethod 290 def Data(payload): 291 return lambda packet: packet.payload == payload 292 293 @staticmethod 294 def PacketPayloadWithMatchingCisHandle(cis_handle): 295 return lambda packet: None if cis_handle != packet.handle else packet 296