1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import logging
18import bluetooth_packets_python3 as bt_packets
19import logging
20
21from bluetooth_packets_python3 import hci_packets
22from bluetooth_packets_python3.hci_packets import EventCode
23from bluetooth_packets_python3 import l2cap_packets
24from bluetooth_packets_python3.l2cap_packets import CommandCode, LeCommandCode
25from bluetooth_packets_python3.l2cap_packets import ConfigurationResponseResult
26from bluetooth_packets_python3.l2cap_packets import ConnectionResponseResult
27from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType
28from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionResponseResult
29
30
31class HciMatchers(object):
32
33    @staticmethod
34    def CommandComplete(opcode):
35        return lambda msg: HciMatchers._is_matching_command_complete(msg.payload, opcode)
36
37    @staticmethod
38    def ExtractMatchingCommandComplete(packet_bytes, opcode=None):
39        return HciMatchers._extract_matching_command_complete(packet_bytes, opcode)
40
41    @staticmethod
42    def _is_matching_command_complete(packet_bytes, opcode=None):
43        return HciMatchers._extract_matching_command_complete(packet_bytes, opcode) is not None
44
45    @staticmethod
46    def _extract_matching_command_complete(packet_bytes, opcode=None):
47        event = HciMatchers._extract_matching_event(packet_bytes, EventCode.COMMAND_COMPLETE)
48        if event is None:
49            return None
50        complete = hci_packets.CommandCompleteView(event)
51        if opcode is None or complete is None:
52            return complete
53        else:
54            if complete.GetCommandOpCode() != opcode:
55                return None
56            else:
57                return complete
58
59    @staticmethod
60    def CommandStatus(opcode=None):
61        return lambda msg: HciMatchers._is_matching_command_status(msg.payload, opcode)
62
63    @staticmethod
64    def ExtractMatchingCommandStatus(packet_bytes, opcode=None):
65        return HciMatchers._extract_matching_command_complete(packet_bytes, opcode)
66
67    @staticmethod
68    def _is_matching_command_status(packet_bytes, opcode=None):
69        return HciMatchers._extract_matching_command_status(packet_bytes, opcode) is not None
70
71    @staticmethod
72    def _extract_matching_command_status(packet_bytes, opcode=None):
73        event = HciMatchers._extract_matching_event(packet_bytes, EventCode.COMMAND_STATUS)
74        if event is None:
75            return None
76        complete = hci_packets.CommandStatusView(event)
77        if opcode is None or complete is None:
78            return complete
79        else:
80            if complete.GetCommandOpCode() != opcode:
81                return None
82            else:
83                return complete
84
85    @staticmethod
86    def EventWithCode(event_code):
87        return lambda msg: HciMatchers._is_matching_event(msg.payload, event_code)
88
89    @staticmethod
90    def ExtractEventWithCode(packet_bytes, event_code):
91        return HciMatchers._extract_matching_event(packet_bytes, event_code)
92
93    @staticmethod
94    def _is_matching_event(packet_bytes, event_code):
95        return HciMatchers._extract_matching_event(packet_bytes, event_code) is not None
96
97    @staticmethod
98    def _extract_matching_event(packet_bytes, event_code):
99        event = hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))
100        if event is None:
101            return None
102        if event_code is not None and event.GetEventCode() != event_code:
103            return None
104        return event
105
106    @staticmethod
107    def LeEventWithCode(subevent_code):
108        return lambda msg: HciMatchers._extract_matching_le_event(msg.payload, subevent_code) is not None
109
110    @staticmethod
111    def ExtractLeEventWithCode(packet_bytes, subevent_code):
112        return HciMatchers._extract_matching_le_event(packet_bytes, subevent_code)
113
114    @staticmethod
115    def _extract_matching_le_event(packet_bytes, subevent_code):
116        inner_event = HciMatchers._extract_matching_event(packet_bytes, hci_packets.EventCode.LE_META_EVENT)
117        if inner_event is None:
118            return None
119        event = hci_packets.LeMetaEventView(inner_event)
120        if event.GetSubeventCode() != subevent_code:
121            return None
122        return event
123
124    @staticmethod
125    def LeConnectionComplete():
126        return lambda msg: HciMatchers._extract_le_connection_complete(msg.payload) is not None
127
128    @staticmethod
129    def ExtractLeConnectionComplete(packet_bytes):
130        return HciMatchers._extract_le_connection_complete(packet_bytes)
131
132    @staticmethod
133    def _extract_le_connection_complete(packet_bytes):
134        inner_event = HciMatchers._extract_matching_le_event(packet_bytes, hci_packets.SubeventCode.CONNECTION_COMPLETE)
135        if inner_event is not None:
136            return hci_packets.LeConnectionCompleteView(inner_event)
137
138        inner_event = HciMatchers._extract_matching_le_event(packet_bytes,
139                                                             hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE)
140        if inner_event is not None:
141            return hci_packets.LeEnhancedConnectionCompleteView(inner_event)
142
143        return None
144
145    @staticmethod
146    def LogEventCode():
147        return lambda event: logging.info("Received event: %x" % hci_packets.EventView(bt_packets.PacketViewLittleEndian(list(event.payload))).GetEventCode())
148
149    @staticmethod
150    def LinkKeyRequest():
151        return lambda event: HciMatchers.EventWithCode(EventCode.LINK_KEY_REQUEST)
152
153    @staticmethod
154    def IoCapabilityRequest():
155        return lambda event: HciMatchers.EventWithCode(EventCode.IO_CAPABILITY_REQUEST)
156
157    @staticmethod
158    def IoCapabilityResponse():
159        return lambda event: HciMatchers.EventWithCode(EventCode.IO_CAPABILITY_RESPONSE)
160
161    @staticmethod
162    def UserPasskeyNotification():
163        return lambda event: HciMatchers.EventWithCode(EventCode.USER_PASSKEY_NOTIFICATION)
164
165    @staticmethod
166    def UserPasskeyRequest():
167        return lambda event: HciMatchers.EventWithCode(EventCode.USER_PASSKEY_REQUEST)
168
169    @staticmethod
170    def UserConfirmationRequest():
171        return lambda event: HciMatchers.EventWithCode(EventCode.USER_CONFIRMATION_REQUEST)
172
173    @staticmethod
174    def RemoteHostSupportedFeaturesNotification():
175        return lambda event: HciMatchers.EventWithCode(EventCode.REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION)
176
177    @staticmethod
178    def LinkKeyNotification():
179        return lambda event: HciMatchers.EventWithCode(EventCode.LINK_KEY_NOTIFICATION)
180
181    @staticmethod
182    def SimplePairingComplete():
183        return lambda event: HciMatchers.EventWithCode(EventCode.SIMPLE_PAIRING_COMPLETE)
184
185    @staticmethod
186    def Disconnect():
187        return lambda event: HciMatchers.EventWithCode(EventCode.DISCONNECT)
188
189    @staticmethod
190    def DisconnectionComplete():
191        return lambda event: HciMatchers.EventWithCode(EventCode.DISCONNECTION_COMPLETE)
192
193    @staticmethod
194    def RemoteOobDataRequest():
195        return lambda event: HciMatchers.EventWithCode(EventCode.REMOTE_OOB_DATA_REQUEST)
196
197    @staticmethod
198    def PinCodeRequest():
199        return lambda event: HciMatchers.EventWithCode(EventCode.PIN_CODE_REQUEST)
200
201    @staticmethod
202    def LoopbackOf(packet):
203        return HciMatchers.Exactly(hci_packets.LoopbackCommandBuilder(packet))
204
205    @staticmethod
206    def Exactly(packet):
207        data = bytes(packet.Serialize())
208        return lambda event: data == event.payload
209
210
211class NeighborMatchers(object):
212
213    @staticmethod
214    def InquiryResult(address):
215        return lambda msg: NeighborMatchers._is_matching_inquiry_result(msg.packet, address)
216
217    @staticmethod
218    def _is_matching_inquiry_result(packet, address):
219        hci_event = HciMatchers.ExtractEventWithCode(packet, EventCode.INQUIRY_RESULT)
220        if hci_event is None:
221            return False
222        inquiry_view = hci_packets.InquiryResultView(hci_event)
223        if inquiry_view is None:
224            return False
225        results = inquiry_view.GetInquiryResults()
226        return any((address == result.bd_addr for result in results))
227
228    @staticmethod
229    def InquiryResultwithRssi(address):
230        return lambda msg: NeighborMatchers._is_matching_inquiry_result_with_rssi(msg.packet, address)
231
232    @staticmethod
233    def _is_matching_inquiry_result_with_rssi(packet, address):
234        hci_event = HciMatchers.ExtractEventWithCode(packet, EventCode.INQUIRY_RESULT_WITH_RSSI)
235        if hci_event is None:
236            return False
237        inquiry_view = hci_packets.InquiryResultWithRssiView(hci_event)
238        if inquiry_view is None:
239            return False
240        results = inquiry_view.GetInquiryResults()
241        return any((address == result.address for result in results))
242
243    @staticmethod
244    def ExtendedInquiryResult(address):
245        return lambda msg: NeighborMatchers._is_matching_extended_inquiry_result(msg.packet, address)
246
247    @staticmethod
248    def _is_matching_extended_inquiry_result(packet, address):
249        hci_event = HciMatchers.ExtractEventWithCode(packet, EventCode.EXTENDED_INQUIRY_RESULT)
250        if hci_event is None:
251            return False
252        extended_view = hci_packets.ExtendedInquiryResultView(hci_event)
253        if extended_view is None:
254            return False
255        return address == extended_view.GetAddress()
256
257
258class L2capMatchers(object):
259
260    @staticmethod
261    def ConnectionRequest(psm):
262        return lambda packet: L2capMatchers._is_matching_connection_request(packet, psm)
263
264    @staticmethod
265    def ConnectionResponse(scid):
266        return lambda packet: L2capMatchers._is_matching_connection_response(packet, scid)
267
268    @staticmethod
269    def ConfigurationResponse(result=ConfigurationResponseResult.SUCCESS):
270        return lambda packet: L2capMatchers._is_matching_configuration_response(packet, result)
271
272    @staticmethod
273    def ConfigurationRequest(cid=None):
274        return lambda packet: L2capMatchers._is_matching_configuration_request_with_cid(packet, cid)
275
276    @staticmethod
277    def ConfigurationRequestWithErtm():
278        return lambda packet: L2capMatchers._is_matching_configuration_request_with_ertm(packet)
279
280    @staticmethod
281    def ConfigurationRequestView(dcid):
282        return lambda request_view: request_view.GetDestinationCid() == dcid
283
284    @staticmethod
285    def DisconnectionRequest(scid, dcid):
286        return lambda packet: L2capMatchers._is_matching_disconnection_request(packet, scid, dcid)
287
288    @staticmethod
289    def DisconnectionResponse(scid, dcid):
290        return lambda packet: L2capMatchers._is_matching_disconnection_response(packet, scid, dcid)
291
292    @staticmethod
293    def EchoResponse():
294        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.ECHO_RESPONSE)
295
296    @staticmethod
297    def CommandReject():
298        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.COMMAND_REJECT)
299
300    @staticmethod
301    def LeCommandReject():
302        return lambda packet: L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.COMMAND_REJECT)
303
304    @staticmethod
305    def LeConnectionParameterUpdateRequest():
306        return lambda packet: L2capMatchers._is_le_control_frame_with_code(
307            packet, LeCommandCode.CONNECTION_PARAMETER_UPDATE_REQUEST)
308
309    @staticmethod
310    def LeConnectionParameterUpdateResponse(result=l2cap_packets.ConnectionParameterUpdateResponseResult.ACCEPTED):
311        return lambda packet: L2capMatchers._is_matching_connection_parameter_update_response(packet, result)
312
313    @staticmethod
314    def CreditBasedConnectionRequest(psm):
315        return lambda packet: L2capMatchers._is_matching_credit_based_connection_request(packet, psm)
316
317    @staticmethod
318    def CreditBasedConnectionResponse(result=LeCreditBasedConnectionResponseResult.SUCCESS):
319        return lambda packet: L2capMatchers._is_matching_credit_based_connection_response(packet, result)
320
321    @staticmethod
322    def CreditBasedConnectionResponseUsedCid():
323        return lambda packet: L2capMatchers._is_matching_credit_based_connection_response(
324            packet, LeCreditBasedConnectionResponseResult.SOURCE_CID_ALREADY_ALLOCATED
325        ) or L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.COMMAND_REJECT)
326
327    @staticmethod
328    def LeDisconnectionRequest(scid, dcid):
329        return lambda packet: L2capMatchers._is_matching_le_disconnection_request(packet, scid, dcid)
330
331    @staticmethod
332    def LeDisconnectionResponse(scid, dcid):
333        return lambda packet: L2capMatchers._is_matching_le_disconnection_response(packet, scid, dcid)
334
335    @staticmethod
336    def LeFlowControlCredit(cid):
337        return lambda packet: L2capMatchers._is_matching_le_flow_control_credit(packet, cid)
338
339    @staticmethod
340    def SFrame(req_seq=None, f=None, s=None, p=None):
341        return lambda packet: L2capMatchers._is_matching_supervisory_frame(packet, req_seq, f, s, p)
342
343    @staticmethod
344    def IFrame(tx_seq=None, payload=None, f=None):
345        return lambda packet: L2capMatchers._is_matching_information_frame(packet, tx_seq, payload, f, fcs=False)
346
347    @staticmethod
348    def IFrameWithFcs(tx_seq=None, payload=None, f=None):
349        return lambda packet: L2capMatchers._is_matching_information_frame(packet, tx_seq, payload, f, fcs=True)
350
351    @staticmethod
352    def IFrameStart(tx_seq=None, payload=None, f=None):
353        return lambda packet: L2capMatchers._is_matching_information_start_frame(packet, tx_seq, payload, f, fcs=False)
354
355    @staticmethod
356    def Data(payload):
357        return lambda packet: packet.GetPayload().GetBytes() == payload
358
359    @staticmethod
360    def FirstLeIFrame(payload, sdu_size):
361        return lambda packet: L2capMatchers._is_matching_first_le_i_frame(packet, payload, sdu_size)
362
363    # this is a hack - should be removed
364    @staticmethod
365    def PartialData(payload):
366        return lambda packet: payload in packet.GetPayload().GetBytes()
367
368    # this is a hack - should be removed
369    @staticmethod
370    def PacketPayloadRawData(payload):
371        return lambda packet: payload in packet.payload
372
373    # this is a hack - should be removed
374    @staticmethod
375    def PacketPayloadWithMatchingPsm(psm):
376        return lambda packet: None if psm != packet.psm else packet
377
378    # this is a hack - should be removed
379    @staticmethod
380    def PacketPayloadWithMatchingCid(cid):
381        return lambda packet: None if cid != packet.fixed_cid else packet
382
383    @staticmethod
384    def ExtractBasicFrame(scid):
385        return lambda packet: L2capMatchers._basic_frame_for(packet, scid)
386
387    @staticmethod
388    def ExtractBasicFrameWithFcs(scid):
389        return lambda packet: L2capMatchers._basic_frame_with_fcs_for(packet, scid)
390
391    @staticmethod
392    def InformationRequestWithType(info_type):
393        return lambda packet: L2capMatchers._information_request_with_type(packet, info_type)
394
395    @staticmethod
396    def InformationResponseExtendedFeatures(supports_ertm=None,
397                                            supports_streaming=None,
398                                            supports_fcs=None,
399                                            supports_fixed_channels=None):
400        return lambda packet: L2capMatchers._is_matching_information_response_extended_features(
401            packet, supports_ertm, supports_streaming, supports_fcs, supports_fixed_channels)
402
403    @staticmethod
404    def _basic_frame(packet):
405        if packet is None:
406            return None
407        return l2cap_packets.BasicFrameView(bt_packets.PacketViewLittleEndian(list(packet.payload)))
408
409    @staticmethod
410    def _basic_frame_with_fcs(packet):
411        if packet is None:
412            return None
413        return l2cap_packets.BasicFrameWithFcsView(bt_packets.PacketViewLittleEndian(list(packet.payload)))
414
415    @staticmethod
416    def _basic_frame_for(packet, scid):
417        frame = L2capMatchers._basic_frame(packet)
418        if frame.GetChannelId() != scid:
419            return None
420        return frame
421
422    @staticmethod
423    def _basic_frame_with_fcs_for(packet, scid):
424        frame = L2capMatchers._basic_frame(packet)
425        if frame.GetChannelId() != scid:
426            return None
427        frame = L2capMatchers._basic_frame_with_fcs(packet)
428        if frame is None:
429            return None
430        return frame
431
432    @staticmethod
433    def _information_frame(packet):
434        standard_frame = l2cap_packets.StandardFrameView(packet)
435        if standard_frame.GetFrameType() != l2cap_packets.FrameType.I_FRAME:
436            return None
437        return l2cap_packets.EnhancedInformationFrameView(standard_frame)
438
439    @staticmethod
440    def _information_frame_with_fcs(packet):
441        standard_frame = l2cap_packets.StandardFrameWithFcsView(packet)
442        if standard_frame is None:
443            return None
444        if standard_frame.GetFrameType() != l2cap_packets.FrameType.I_FRAME:
445            return None
446        return l2cap_packets.EnhancedInformationFrameWithFcsView(standard_frame)
447
448    @staticmethod
449    def _information_start_frame(packet):
450        start_frame = L2capMatchers._information_frame(packet)
451        if start_frame is None:
452            return None
453        return l2cap_packets.EnhancedInformationStartFrameView(start_frame)
454
455    @staticmethod
456    def _information_start_frame_with_fcs(packet):
457        start_frame = L2capMatchers._information_frame_with_fcs(packet)
458        if start_frame is None:
459            return None
460        return l2cap_packets.EnhancedInformationStartFrameWithFcsView(start_frame)
461
462    @staticmethod
463    def _supervisory_frame(packet):
464        standard_frame = l2cap_packets.StandardFrameView(packet)
465        if standard_frame.GetFrameType() != l2cap_packets.FrameType.S_FRAME:
466            return None
467        return l2cap_packets.EnhancedSupervisoryFrameView(standard_frame)
468
469    @staticmethod
470    def _is_matching_information_frame(packet, tx_seq, payload, f, fcs=False):
471        if fcs:
472            frame = L2capMatchers._information_frame_with_fcs(packet)
473        else:
474            frame = L2capMatchers._information_frame(packet)
475        if frame is None:
476            return False
477        if tx_seq is not None and frame.GetTxSeq() != tx_seq:
478            return False
479        if payload is not None and frame.GetPayload().GetBytes() != payload:
480            return False
481        if f is not None and frame.GetF() != f:
482            return False
483        return True
484
485    @staticmethod
486    def _is_matching_information_start_frame(packet, tx_seq, payload, f, fcs=False):
487        if fcs:
488            frame = L2capMatchers._information_start_frame_with_fcs(packet)
489        else:
490            frame = L2capMatchers._information_start_frame(packet)
491        if frame is None:
492            return False
493        if tx_seq is not None and frame.GetTxSeq() != tx_seq:
494            return False
495        if payload is not None and frame.GetPayload().GetBytes() != payload:
496            return False
497        if f is not None and frame.GetF() != f:
498            return False
499        return True
500
501    @staticmethod
502    def _is_matching_supervisory_frame(packet, req_seq, f, s, p):
503        frame = L2capMatchers._supervisory_frame(packet)
504        if frame is None:
505            return False
506        if req_seq is not None and frame.GetReqSeq() != req_seq:
507            return False
508        if f is not None and frame.GetF() != f:
509            return False
510        if s is not None and frame.GetS() != s:
511            return False
512        if p is not None and frame.GetP() != p:
513            return False
514        return True
515
516    @staticmethod
517    def _is_matching_first_le_i_frame(packet, payload, sdu_size):
518        first_le_i_frame = l2cap_packets.FirstLeInformationFrameView(packet)
519        return first_le_i_frame.GetPayload().GetBytes() == payload and first_le_i_frame.GetL2capSduLength() == sdu_size
520
521    @staticmethod
522    def _control_frame(packet):
523        if packet.GetChannelId() != 1:
524            return None
525        return l2cap_packets.ControlView(packet.GetPayload())
526
527    @staticmethod
528    def _le_control_frame(packet):
529        if packet.GetChannelId() != 5:
530            return None
531        return l2cap_packets.LeControlView(packet.GetPayload())
532
533    @staticmethod
534    def control_frame_with_code(packet, code):
535        frame = L2capMatchers._control_frame(packet)
536        if frame is None or frame.GetCode() != code:
537            return None
538        return frame
539
540    @staticmethod
541    def le_control_frame_with_code(packet, code):
542        frame = L2capMatchers._le_control_frame(packet)
543        if frame is None or frame.GetCode() != code:
544            return None
545        return frame
546
547    @staticmethod
548    def _is_control_frame_with_code(packet, code):
549        return L2capMatchers.control_frame_with_code(packet, code) is not None
550
551    @staticmethod
552    def _is_le_control_frame_with_code(packet, code):
553        return L2capMatchers.le_control_frame_with_code(packet, code) is not None
554
555    @staticmethod
556    def _is_matching_connection_request(packet, psm):
557        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONNECTION_REQUEST)
558        if frame is None:
559            return False
560        request = l2cap_packets.ConnectionRequestView(frame)
561        return request.GetPsm() == psm
562
563    @staticmethod
564    def _is_matching_connection_response(packet, scid):
565        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONNECTION_RESPONSE)
566        if frame is None:
567            return False
568        response = l2cap_packets.ConnectionResponseView(frame)
569        return response.GetSourceCid() == scid and response.GetResult(
570        ) == ConnectionResponseResult.SUCCESS and response.GetDestinationCid() != 0
571
572    @staticmethod
573    def _is_matching_configuration_request_with_cid(packet, cid=None):
574        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONFIGURATION_REQUEST)
575        if frame is None:
576            return False
577        request = l2cap_packets.ConfigurationRequestView(frame)
578        dcid = request.GetDestinationCid()
579        return cid is None or cid == dcid
580
581    @staticmethod
582    def _is_matching_configuration_request_with_ertm(packet):
583        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONFIGURATION_REQUEST)
584        if frame is None:
585            return False
586        request = l2cap_packets.ConfigurationRequestView(frame)
587        config_bytes = request.GetBytes()
588        # TODO(b/153189503): Use packet struct parser.
589        return b"\x04\x09\x03" in config_bytes
590
591    @staticmethod
592    def _is_matching_configuration_response(packet, result=ConfigurationResponseResult.SUCCESS):
593        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.CONFIGURATION_RESPONSE)
594        if frame is None:
595            return False
596        response = l2cap_packets.ConfigurationResponseView(frame)
597        return response.GetResult() == result
598
599    @staticmethod
600    def _is_matching_disconnection_request(packet, scid, dcid):
601        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.DISCONNECTION_REQUEST)
602        if frame is None:
603            return False
604        request = l2cap_packets.DisconnectionRequestView(frame)
605        return request.GetSourceCid() == scid and request.GetDestinationCid() == dcid
606
607    @staticmethod
608    def _is_matching_disconnection_response(packet, scid, dcid):
609        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.DISCONNECTION_RESPONSE)
610        if frame is None:
611            return False
612        response = l2cap_packets.DisconnectionResponseView(frame)
613        return response.GetSourceCid() == scid and response.GetDestinationCid() == dcid
614
615    @staticmethod
616    def _is_matching_le_disconnection_response(packet, scid, dcid):
617        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.DISCONNECTION_RESPONSE)
618        if frame is None:
619            return False
620        response = l2cap_packets.LeDisconnectionResponseView(frame)
621        return response.GetSourceCid() == scid and response.GetDestinationCid() == dcid
622
623    @staticmethod
624    def _is_matching_le_disconnection_request(packet, scid, dcid):
625        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.DISCONNECTION_REQUEST)
626        if frame is None:
627            return False
628        request = l2cap_packets.LeDisconnectionRequestView(frame)
629        return request.GetSourceCid() == scid and request.GetDestinationCid() == dcid
630
631    @staticmethod
632    def _is_matching_le_flow_control_credit(packet, cid):
633        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_FLOW_CONTROL_CREDIT)
634        if frame is None:
635            return False
636        request = l2cap_packets.LeFlowControlCreditView(frame)
637        return request.GetCid() == cid
638
639    @staticmethod
640    def _information_request_with_type(packet, info_type):
641        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.INFORMATION_REQUEST)
642        if frame is None:
643            return None
644        request = l2cap_packets.InformationRequestView(frame)
645        if request.GetInfoType() != info_type:
646            return None
647        return request
648
649    @staticmethod
650    def _information_response_with_type(packet, info_type):
651        frame = L2capMatchers.control_frame_with_code(packet, CommandCode.INFORMATION_RESPONSE)
652        if frame is None:
653            return None
654        response = l2cap_packets.InformationResponseView(frame)
655        if response.GetInfoType() != info_type:
656            return None
657        return response
658
659    @staticmethod
660    def _is_matching_information_response_extended_features(packet, supports_ertm, supports_streaming, supports_fcs,
661                                                            supports_fixed_channels):
662        frame = L2capMatchers._information_response_with_type(packet,
663                                                              InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED)
664        if frame is None:
665            return False
666        features = l2cap_packets.InformationResponseExtendedFeaturesView(frame)
667        if supports_ertm is not None and features.GetEnhancedRetransmissionMode() != supports_ertm:
668            return False
669        if supports_streaming is not None and features.GetStreamingMode != supports_streaming:
670            return False
671        if supports_fcs is not None and features.GetFcsOption() != supports_fcs:
672            return False
673        if supports_fixed_channels is not None and features.GetFixedChannels() != supports_fixed_channels:
674            return False
675        return True
676
677    @staticmethod
678    def _is_matching_connection_parameter_update_response(packet, result):
679        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.CONNECTION_PARAMETER_UPDATE_RESPONSE)
680        if frame is None:
681            return False
682        return l2cap_packets.ConnectionParameterUpdateResponseView(frame).GetResult() == result
683
684    @staticmethod
685    def _is_matching_credit_based_connection_request(packet, psm):
686        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_REQUEST)
687        if frame is None:
688            return False
689        request = l2cap_packets.LeCreditBasedConnectionRequestView(frame)
690        return request.GetLePsm() == psm
691
692    @staticmethod
693    def _is_matching_credit_based_connection_response(packet, result):
694        frame = L2capMatchers.le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_RESPONSE)
695        if frame is None:
696            return False
697        response = l2cap_packets.LeCreditBasedConnectionResponseView(frame)
698        return response.GetResult() == result and (result != LeCreditBasedConnectionResponseResult.SUCCESS or
699                                                   response.GetDestinationCid() != 0)
700
701    @staticmethod
702    def LinkSecurityInterfaceCallbackEvent(type):
703        return lambda event: True if event.event_type == type else False
704
705
706class SecurityMatchers(object):
707
708    @staticmethod
709    def UiMsg(type, address=None):
710        return lambda event: True if event.message_type == type and (address == None or address == event.peer) else False
711
712    @staticmethod
713    def BondMsg(type, address=None, reason=None):
714        return lambda event: True if event.message_type == type and (address == None or address == event.peer) and (reason == None or reason == event.reason) else False
715
716    @staticmethod
717    def HelperMsg(type, address=None):
718        return lambda event: True if event.message_type == type and (address == None or address == event.peer) else False
719
720
721class IsoMatchers(object):
722
723    @staticmethod
724    def Data(payload):
725        return lambda packet: packet.payload == payload
726
727    @staticmethod
728    def PacketPayloadWithMatchingCisHandle(cis_handle):
729        return lambda packet: None if cis_handle != packet.handle else packet
730