/system/bt/gd/cert/ |
D | py_acl_manager.py | 18 from cert.event_stream import EventStream 19 from cert.event_stream import IEventStream 30 def __init__(self, acl_manager, remote_addr, handle, event_stream): argument 34 self.connection_event_stream = event_stream 79 def complete_connection(self, event_stream): argument 81 assertThat(event_stream).emits(connection_complete) 85 return PyAclManagerAclConnection(self.acl_manager, address, handle, event_stream) 89 event_stream = self.incoming_connection_event_stream 91 return self.complete_connection(event_stream) 95 event_stream = self.outgoing_connection_event_stream [all …]
|
D | cert_self_test_lib.py | 27 from cert.event_stream import EventStream, FilteringEventStream 133 with EventStream(FetchEvents(events=[1, 2, 3, 1, 2, 3], delay_ms=40)) as event_stream: 134 event_stream.assert_event_occurs( 139 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 140 … event_stream.assert_event_occurs(lambda data: data.value_ == 1, timeout=timedelta(seconds=1)) 145 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream: 146 … event_stream.assert_event_occurs(lambda data: data.value_ == 4, timeout=timedelta(seconds=1)) 154 with EventStream(FetchEvents(events=[1, 2, 3, 4], delay_ms=50)) as event_stream: 155 event_stream.assert_event_occurs_at_most( 161 with EventStream(FetchEvents(events=[1, 2, 3, 4], delay_ms=50)) as event_stream: [all …]
|
D | py_le_acl_manager.py | 19 from cert.event_stream import EventStream 20 from cert.event_stream import IEventStream 31 def __init__(self, le_acl_manager, address, remote_addr, handle, event_stream): argument 44 self.connection_event_stream = event_stream 115 def complete_connection(self, event_stream): argument 117 assertThat(event_stream).emits(connection_complete) 125 …onnection = PyLeAclManagerAclConnection(self.le_acl_manager, address, remote, handle, event_stream) 131 event_stream = self.incoming_connection_event_stream 133 return self.complete_connection(event_stream) 137 event_stream = self.outgoing_connection_event_streams.pop(token)[0] [all …]
|
D | py_hci.py | 18 from cert.event_stream import EventStream 19 from cert.event_stream import FilteringEventStream 20 from cert.event_stream import IEventStream 110 event_stream = None variable in PyHci 121 self.event_stream = EventStream(self.device.hci.StreamEvents(empty_proto.Empty())) 130 safeClose(self.event_stream) 135 return self.event_stream 162 assertThat(self.event_stream).emits(read_bd_addr) 177 assertThat(self.event_stream).emits(connection_request) 186 assertThat(self.event_stream).emits(connection_complete)
|
D | truth.py | 23 from cert.event_stream import IEventStream 24 from cert.event_stream import NOT_FOR_YOU_assert_event_occurs 25 from cert.event_stream import NOT_FOR_YOU_assert_all_events_occur 26 from cert.event_stream import NOT_FOR_YOU_assert_none_matching 27 from cert.event_stream import NOT_FOR_YOU_assert_none
|
D | py_le_iso.py | 24 from cert.event_stream import EventStream, IEventStream 25 from cert.event_stream import FilteringEventStream
|
D | py_le_security.py | 23 from cert.event_stream import EventStream
|
D | py_hal.py | 18 from cert.event_stream import EventStream 19 from cert.event_stream import FilteringEventStream 20 from cert.event_stream import IEventStream
|
D | py_l2cap.py | 25 from cert.event_stream import FilteringEventStream 26 from cert.event_stream import EventStream, IEventStream
|
D | behavior.py | 22 from cert.event_stream import static_remaining_time_delta
|
D | cert_self_test.py | 30 from cert.event_stream import EventStream, FilteringEventStream
|
D | py_security.py | 22 from cert.event_stream import EventStream
|
/system/bt/gd/neighbor/cert/ |
D | py_neighbor.py | 20 from cert.event_stream import EventStream 21 from cert.event_stream import IEventStream
|
/system/bt/gd/shim/cert/ |
D | shim_test.py | 21 from cert.event_stream import EventStream
|
/system/bt/gd/hci/cert/ |
D | le_advertising_manager_test.py | 22 from cert.event_stream import EventStream
|
D | le_scanning_with_security_test.py | 18 from cert.event_stream import EventStream
|
D | le_scanning_manager_test.py | 22 from cert.event_stream import EventStream
|
D | le_acl_manager_test.py | 19 from cert.event_stream import EventStream
|
/system/bt/gd/l2cap/le/cert/ |
D | cert_le_l2cap.py | 25 from cert.event_stream import FilteringEventStream 26 from cert.event_stream import IEventStream
|
/system/bt/gd/hal/cert/ |
D | simple_hal_test.py | 20 from cert.event_stream import EventStream
|
/system/bt/gd/hci/facade/ |
D | le_acl_manager_facade.cc | 263 std::shared_ptr<::bluetooth::grpc::GrpcEventQueue<LeConnectionEvent>> event_stream) in Connection() argument 264 … : handle_(handle), connection_(std::move(connection)), event_stream_(std::move(event_stream)) {} in Connection()
|
D | acl_manager_facade.cc | 385 std::shared_ptr<::bluetooth::grpc::GrpcEventQueue<ConnectionEvent>> event_stream) in Connection() argument 386 … : handle_(handle), connection_(std::move(connection)), event_stream_(std::move(event_stream)) {} in Connection()
|
/system/bt/gd/l2cap/classic/cert/ |
D | cert_l2cap.py | 34 from cert.event_stream import FilteringEventStream 35 from cert.event_stream import IEventStream
|
/system/bt/gd/iso/cert/ |
D | le_iso_test.py | 20 from cert.event_stream import EventStream
|
/system/bt/gd/security/cert/ |
D | cert_security.py | 22 from cert.event_stream import EventStream
|