Home
last modified time | relevance | path

Searched refs:event_stream (Results 1 – 25 of 29) sorted by relevance

12

/system/bt/gd/cert/
Dpy_acl_manager.py18 from cert.event_stream import EventStream
19 from cert.event_stream import IEventStream
30 def __init__(self, acl_manager, remote_addr, handle, event_stream): argument
34 self.connection_event_stream = event_stream
79 def complete_connection(self, event_stream): argument
81 assertThat(event_stream).emits(connection_complete)
85 return PyAclManagerAclConnection(self.acl_manager, address, handle, event_stream)
89 event_stream = self.incoming_connection_event_stream
91 return self.complete_connection(event_stream)
95 event_stream = self.outgoing_connection_event_stream
[all …]
Dcert_self_test_lib.py27 from cert.event_stream import EventStream, FilteringEventStream
133 with EventStream(FetchEvents(events=[1, 2, 3, 1, 2, 3], delay_ms=40)) as event_stream:
134 event_stream.assert_event_occurs(
139 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream:
140event_stream.assert_event_occurs(lambda data: data.value_ == 1, timeout=timedelta(seconds=1))
145 with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream:
146event_stream.assert_event_occurs(lambda data: data.value_ == 4, timeout=timedelta(seconds=1))
154 with EventStream(FetchEvents(events=[1, 2, 3, 4], delay_ms=50)) as event_stream:
155 event_stream.assert_event_occurs_at_most(
161 with EventStream(FetchEvents(events=[1, 2, 3, 4], delay_ms=50)) as event_stream:
[all …]
Dpy_le_acl_manager.py19 from cert.event_stream import EventStream
20 from cert.event_stream import IEventStream
31 def __init__(self, le_acl_manager, address, remote_addr, handle, event_stream): argument
44 self.connection_event_stream = event_stream
115 def complete_connection(self, event_stream): argument
117 assertThat(event_stream).emits(connection_complete)
125 …onnection = PyLeAclManagerAclConnection(self.le_acl_manager, address, remote, handle, event_stream)
131 event_stream = self.incoming_connection_event_stream
133 return self.complete_connection(event_stream)
137 event_stream = self.outgoing_connection_event_streams.pop(token)[0]
[all …]
Dpy_hci.py18 from cert.event_stream import EventStream
19 from cert.event_stream import FilteringEventStream
20 from cert.event_stream import IEventStream
110 event_stream = None variable in PyHci
121 self.event_stream = EventStream(self.device.hci.StreamEvents(empty_proto.Empty()))
130 safeClose(self.event_stream)
135 return self.event_stream
162 assertThat(self.event_stream).emits(read_bd_addr)
177 assertThat(self.event_stream).emits(connection_request)
186 assertThat(self.event_stream).emits(connection_complete)
Dtruth.py23 from cert.event_stream import IEventStream
24 from cert.event_stream import NOT_FOR_YOU_assert_event_occurs
25 from cert.event_stream import NOT_FOR_YOU_assert_all_events_occur
26 from cert.event_stream import NOT_FOR_YOU_assert_none_matching
27 from cert.event_stream import NOT_FOR_YOU_assert_none
Dpy_le_iso.py24 from cert.event_stream import EventStream, IEventStream
25 from cert.event_stream import FilteringEventStream
Dpy_le_security.py23 from cert.event_stream import EventStream
Dpy_hal.py18 from cert.event_stream import EventStream
19 from cert.event_stream import FilteringEventStream
20 from cert.event_stream import IEventStream
Dpy_l2cap.py25 from cert.event_stream import FilteringEventStream
26 from cert.event_stream import EventStream, IEventStream
Dbehavior.py22 from cert.event_stream import static_remaining_time_delta
Dcert_self_test.py30 from cert.event_stream import EventStream, FilteringEventStream
Dpy_security.py22 from cert.event_stream import EventStream
/system/bt/gd/neighbor/cert/
Dpy_neighbor.py20 from cert.event_stream import EventStream
21 from cert.event_stream import IEventStream
/system/bt/gd/shim/cert/
Dshim_test.py21 from cert.event_stream import EventStream
/system/bt/gd/hci/cert/
Dle_advertising_manager_test.py22 from cert.event_stream import EventStream
Dle_scanning_with_security_test.py18 from cert.event_stream import EventStream
Dle_scanning_manager_test.py22 from cert.event_stream import EventStream
Dle_acl_manager_test.py19 from cert.event_stream import EventStream
/system/bt/gd/l2cap/le/cert/
Dcert_le_l2cap.py25 from cert.event_stream import FilteringEventStream
26 from cert.event_stream import IEventStream
/system/bt/gd/hal/cert/
Dsimple_hal_test.py20 from cert.event_stream import EventStream
/system/bt/gd/hci/facade/
Dle_acl_manager_facade.cc263 std::shared_ptr<::bluetooth::grpc::GrpcEventQueue<LeConnectionEvent>> event_stream) in Connection() argument
264 … : handle_(handle), connection_(std::move(connection)), event_stream_(std::move(event_stream)) {} in Connection()
Dacl_manager_facade.cc385 std::shared_ptr<::bluetooth::grpc::GrpcEventQueue<ConnectionEvent>> event_stream) in Connection() argument
386 … : handle_(handle), connection_(std::move(connection)), event_stream_(std::move(event_stream)) {} in Connection()
/system/bt/gd/l2cap/classic/cert/
Dcert_l2cap.py34 from cert.event_stream import FilteringEventStream
35 from cert.event_stream import IEventStream
/system/bt/gd/iso/cert/
Dle_iso_test.py20 from cert.event_stream import EventStream
/system/bt/gd/security/cert/
Dcert_security.py22 from cert.event_stream import EventStream

12