1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import logging
18import sys
19
20from blueberry.utils import bluetooth
21import hci_packets as hci
22
23
24class HciMatchers(object):
25
26    @staticmethod
27    def CommandComplete(opcode):
28        return lambda msg: HciMatchers._is_matching_command_complete(msg.payload, opcode)
29
30    @staticmethod
31    def ExtractMatchingCommandComplete(packet_bytes, opcode=None):
32        return HciMatchers._extract_matching_command_complete(packet_bytes, opcode)
33
34    @staticmethod
35    def _is_matching_command_complete(packet_bytes, opcode=None):
36        return HciMatchers._extract_matching_command_complete(packet_bytes, opcode) is not None
37
38    @staticmethod
39    def _extract_matching_command_complete(packet_bytes, opcode=None):
40        event = HciMatchers._extract_matching_event(packet_bytes, hci.EventCode.COMMAND_COMPLETE)
41        if not isinstance(event, hci.CommandComplete):
42            return None
43        if opcode and event.command_op_code != opcode:
44            return None
45        return event
46
47    @staticmethod
48    def CommandStatus(opcode=None):
49        return lambda msg: HciMatchers._is_matching_command_status(msg.payload, opcode)
50
51    @staticmethod
52    def ExtractMatchingCommandStatus(packet_bytes, opcode=None):
53        return HciMatchers._extract_matching_command_status(packet_bytes, opcode)
54
55    @staticmethod
56    def _is_matching_command_status(packet_bytes, opcode=None):
57        return HciMatchers._extract_matching_command_status(packet_bytes, opcode) is not None
58
59    @staticmethod
60    def _extract_matching_command_status(packet_bytes, opcode=None):
61        event = HciMatchers._extract_matching_event(packet_bytes, hci.EventCode.COMMAND_STATUS)
62        if not isinstance(event, hci.CommandStatus):
63            return None
64        if opcode and event.command_op_code != opcode:
65            return None
66        return event
67
68    @staticmethod
69    def EventWithCode(event_code):
70        return lambda msg: HciMatchers._is_matching_event(msg.payload, event_code)
71
72    @staticmethod
73    def ExtractEventWithCode(packet_bytes, event_code):
74        return HciMatchers._extract_matching_event(packet_bytes, event_code)
75
76    @staticmethod
77    def _is_matching_event(packet_bytes, event_code):
78        return HciMatchers._extract_matching_event(packet_bytes, event_code) is not None
79
80    @staticmethod
81    def _extract_matching_event(packet_bytes, event_code):
82        try:
83            event = hci.Event.parse_all(packet_bytes)
84            return event if event.event_code == event_code else None
85        except Exception as exn:
86            print(sys.stderr, f"Failed to parse incoming event: {exn}")
87            print(sys.stderr, f"Event data: {' '.join([f'{b:02x}' for b in packet_bytes])}")
88            return None
89
90    @staticmethod
91    def LeEventWithCode(subevent_code):
92        return lambda msg: HciMatchers._extract_matching_le_event(msg.payload, subevent_code) is not None
93
94    @staticmethod
95    def ExtractLeEventWithCode(packet_bytes, subevent_code):
96        return HciMatchers._extract_matching_le_event(packet_bytes, subevent_code)
97
98    @staticmethod
99    def _extract_matching_le_event(packet_bytes, subevent_code):
100        event = HciMatchers._extract_matching_event(packet_bytes, hci.EventCode.LE_META_EVENT)
101        if (not isinstance(event, hci.LeMetaEvent) or event.subevent_code != subevent_code):
102            return None
103
104        return event
105
106    @staticmethod
107    def LeAdvertisement(subevent_code=hci.SubeventCode.EXTENDED_ADVERTISING_REPORT, address=None, data=None):
108        return lambda msg: HciMatchers._extract_matching_le_advertisement(msg.payload, subevent_code, address, data
109                                                                         ) is not None
110
111    @staticmethod
112    def ExtractLeAdvertisement(packet_bytes,
113                               subevent_code=hci.SubeventCode.EXTENDED_ADVERTISING_REPORT,
114                               address=None,
115                               data=None):
116        return HciMatchers._extract_matching_le_advertisement(packet_bytes, subevent_code, address, data)
117
118    @staticmethod
119    def _extract_matching_le_advertisement(packet_bytes,
120                                           subevent_code=hci.SubeventCode.EXTENDED_ADVERTISING_REPORT,
121                                           address=None,
122                                           data=None):
123        event = HciMatchers._extract_matching_le_event(packet_bytes, subevent_code)
124        if event is None:
125            return None
126
127        matched = False
128        for response in event.responses:
129            matched |= (address == None or response.address == bluetooth.Address(address)) and (data == None or
130                                                                                                data in packet_bytes)
131
132        return event if matched else None
133
134    @staticmethod
135    def LeConnectionComplete():
136        return lambda msg: HciMatchers._extract_le_connection_complete(msg.payload) is not None
137
138    @staticmethod
139    def ExtractLeConnectionComplete(packet_bytes):
140        return HciMatchers._extract_le_connection_complete(packet_bytes)
141
142    @staticmethod
143    def _extract_le_connection_complete(packet_bytes):
144        event = HciMatchers._extract_matching_le_event(packet_bytes, hci.SubeventCode.CONNECTION_COMPLETE)
145        if event is not None:
146            return event
147
148        return HciMatchers._extract_matching_le_event(packet_bytes, hci.SubeventCode.ENHANCED_CONNECTION_COMPLETE)
149
150    @staticmethod
151    def LogEventCode():
152        return lambda event: logging.info("Received event: %x" % hci.Event.parse(event.payload).event_code)
153
154    @staticmethod
155    def LinkKeyRequest():
156        return HciMatchers.EventWithCode(hci.EventCode.LINK_KEY_REQUEST)
157
158    @staticmethod
159    def IoCapabilityRequest():
160        return HciMatchers.EventWithCode(hci.EventCode.IO_CAPABILITY_REQUEST)
161
162    @staticmethod
163    def IoCapabilityResponse():
164        return HciMatchers.EventWithCode(hci.EventCode.IO_CAPABILITY_RESPONSE)
165
166    @staticmethod
167    def UserPasskeyNotification():
168        return HciMatchers.EventWithCode(hci.EventCode.USER_PASSKEY_NOTIFICATION)
169
170    @staticmethod
171    def UserPasskeyRequest():
172        return HciMatchers.EventWithCode(hci.EventCode.USER_PASSKEY_REQUEST)
173
174    @staticmethod
175    def UserConfirmationRequest():
176        return HciMatchers.EventWithCode(hci.EventCode.USER_CONFIRMATION_REQUEST)
177
178    @staticmethod
179    def LinkKeyNotification():
180        return HciMatchers.EventWithCode(hci.EventCode.LINK_KEY_NOTIFICATION)
181
182    @staticmethod
183    def SimplePairingComplete():
184        return HciMatchers.EventWithCode(hci.EventCode.SIMPLE_PAIRING_COMPLETE)
185
186    @staticmethod
187    def Disconnect():
188        return HciMatchers.EventWithCode(hci.EventCode.DISCONNECT)
189
190    @staticmethod
191    def DisconnectionComplete():
192        return HciMatchers.EventWithCode(hci.EventCode.DISCONNECTION_COMPLETE)
193
194    @staticmethod
195    def RemoteOobDataRequest():
196        return HciMatchers.EventWithCode(hci.EventCode.REMOTE_OOB_DATA_REQUEST)
197
198    @staticmethod
199    def PinCodeRequest():
200        return HciMatchers.EventWithCode(hci.EventCode.PIN_CODE_REQUEST)
201
202    @staticmethod
203    def LoopbackOf(packet):
204        return HciMatchers.Exactly(hci.LoopbackCommand(payload=packet))
205
206    @staticmethod
207    def Exactly(packet):
208        data = bytes(packet.serialize())
209        return lambda event: data == event.payload
210
211
212class AdvertisingMatchers(object):
213
214    @staticmethod
215    def AdvertisingCallbackMsg(type, advertiser_id=None, status=None, data=None):
216        return lambda event: True if event.message_type == type and (advertiser_id == None or advertiser_id == event.advertiser_id) \
217            and (status == None or status == event.status) and (data == None or data == event.data) else False
218
219    @staticmethod
220    def AddressMsg(type, advertiser_id=None, address=None):
221        return lambda event: True if event.message_type == type and (advertiser_id == None or advertiser_id == event.advertiser_id) \
222            and (address == None or address == event.address) else False
223
224
225class ScanningMatchers(object):
226
227    @staticmethod
228    def ScanningCallbackMsg(type, status=None, data=None):
229        return lambda event: True if event.message_type == type and (status == None or status == event.status) \
230            and (data == None or data == event.data) else False
231
232
233class NeighborMatchers(object):
234
235    @staticmethod
236    def InquiryResult(address):
237        return lambda msg: NeighborMatchers._is_matching_inquiry_result(msg.packet, address)
238
239    @staticmethod
240    def _is_matching_inquiry_result(packet, address):
241        event = HciMatchers.ExtractEventWithCode(packet, hci.EventCode.INQUIRY_RESULT)
242        if not isinstance(event, hci.InquiryResult):
243            return False
244        return any((bluetooth.Address(address) == response.bd_addr for response in event.responses))
245
246    @staticmethod
247    def InquiryResultwithRssi(address):
248        return lambda msg: NeighborMatchers._is_matching_inquiry_result_with_rssi(msg.packet, address)
249
250    @staticmethod
251    def _is_matching_inquiry_result_with_rssi(packet, address):
252        event = HciMatchers.ExtractEventWithCode(packet, hci.EventCode.INQUIRY_RESULT_WITH_RSSI)
253        if not isinstance(event, hci.InquiryResultWithRssi):
254            return False
255        return any((bluetooth.Address(address) == response.address for response in event.responses))
256
257    @staticmethod
258    def ExtendedInquiryResult(address):
259        return lambda msg: NeighborMatchers._is_matching_extended_inquiry_result(msg.packet, address)
260
261    @staticmethod
262    def _is_matching_extended_inquiry_result(packet, address):
263        event = HciMatchers.ExtractEventWithCode(packet, hci.EventCode.EXTENDED_INQUIRY_RESULT)
264        if not isinstance(event, (hci.ExtendedInquiryResult, hci.ExtendedInquiryResultRaw)):
265            return False
266        return bluetooth.Address(address) == event.address
267
268
269class SecurityMatchers(object):
270
271    @staticmethod
272    def UiMsg(type, address=None):
273        return lambda event: True if event.message_type == type and (address == None or address == event.peer
274                                                                    ) else False
275
276    @staticmethod
277    def BondMsg(type, address=None, reason=None):
278        return lambda event: True if event.message_type == type and (address == None or address == event.peer) and (
279            reason == None or reason == event.reason) else False
280
281    @staticmethod
282    def HelperMsg(type, address=None):
283        return lambda event: True if event.message_type == type and (address == None or address == event.peer
284                                                                    ) else False
285
286
287class IsoMatchers(object):
288
289    @staticmethod
290    def Data(payload):
291        return lambda packet: packet.payload == payload
292
293    @staticmethod
294    def PacketPayloadWithMatchingCisHandle(cis_handle):
295        return lambda packet: None if cis_handle != packet.handle else packet
296