1#!/usr/bin/env python3
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from datetime import datetime, timedelta
18import logging
19from threading import Timer
20import time
21import traceback
22
23from mobly import asserts
24
25from bluetooth_packets_python3 import hci_packets
26from bluetooth_packets_python3 import l2cap_packets
27from cert.event_stream import EventStream, FilteringEventStream
28from cert.truth import assertThat
29from cert.metadata import metadata
30from cert.behavior import when, wait_until
31from cert.behavior import IHasBehaviors
32from cert.behavior import anything
33from cert.behavior import SingleArgumentBehavior
34from cert.behavior import ReplyStage
35
36
37class BogusProto:
38
39    class BogusType:
40
41        def __init__(self):
42            self.name = "BogusProto"
43            self.is_extension = False
44            self.cpp_type = False
45
46        def type(self):
47            return 'BogusRpc'
48
49        def label(self):
50            return "label"
51
52    class BogusDescriptor:
53
54        def __init__(self, name):
55            self.full_name = name
56
57    def __init__(self, value):
58        self.value_ = value
59        self.DESCRIPTOR = BogusProto.BogusDescriptor(str(value))
60
61    def __str__(self):
62        return "BogusRpc value = " + str(self.value_)
63
64    def ListFields(self):
65        for field in [BogusProto.BogusType()]:
66            yield [field, self.value_]
67
68
69class FetchEvents:
70
71    def __init__(self, events, delay_ms):
72        self.events_ = events
73        self.sleep_time_ = (delay_ms * 1.0) / 1000
74        self.index_ = 0
75        self.done_ = False
76        self.then_ = datetime.now()
77
78    def __iter__(self):
79        for event in self.events_:
80            time.sleep(self.sleep_time_)
81            if self.done_:
82                return
83            logging.debug("yielding %d" % event)
84            yield BogusProto(event)
85
86    def done(self):
87        return self.done_
88
89    def cancel(self):
90        logging.debug("cancel")
91        self.done_ = True
92        return None
93
94
95class TestBehaviors(object):
96
97    def __init__(self, parent):
98        self.test_request_behavior = SingleArgumentBehavior(lambda: TestBehaviors.TestRequestReplyStage(parent))
99
100    def test_request(self, matcher):
101        return self.test_request_behavior.begin(matcher)
102
103    class TestRequestReplyStage(ReplyStage):
104
105        def __init__(self, parent):
106            self._parent = parent
107
108        def increment_count(self):
109            self._commit(lambda obj: self._increment_count(obj))
110            return self
111
112        def _increment_count(self, obj):
113            self._parent.count += 1
114            self._parent.captured.append(obj)
115
116
117class ObjectWithBehaviors(IHasBehaviors):
118
119    def __init__(self):
120        self.behaviors = TestBehaviors(self)
121        self.count = 0
122        self.captured = []
123        self.unhandled_count = 0
124
125    def get_behaviors(self):
126        return self.behaviors
127
128    def increment_unhandled(self):
129        self.unhandled_count += 1
130
131
132def test_assert_occurs_at_least_passes_core():
133    with EventStream(FetchEvents(events=[1, 2, 3, 1, 2, 3], delay_ms=40)) as event_stream:
134        event_stream.assert_event_occurs(
135            lambda data: data.value_ == 1, timeout=timedelta(milliseconds=300), at_least_times=2)
136
137
138def test_assert_occurs_passes_core():
139    with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream:
140        event_stream.assert_event_occurs(lambda data: data.value_ == 1, timeout=timedelta(seconds=1))
141
142
143def test_assert_occurs_fails_core():
144    try:
145        with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream:
146            event_stream.assert_event_occurs(lambda data: data.value_ == 4, timeout=timedelta(seconds=1))
147    except Exception as e:
148        logging.debug(e)
149        return True  # Failed as expected
150    return False
151
152
153def test_assert_occurs_at_most_passes_core():
154    with EventStream(FetchEvents(events=[1, 2, 3, 4], delay_ms=50)) as event_stream:
155        event_stream.assert_event_occurs_at_most(
156            lambda data: data.value_ < 4, timeout=timedelta(seconds=1), at_most_times=3)
157
158
159def test_assert_occurs_at_most_fails_core():
160    try:
161        with EventStream(FetchEvents(events=[1, 2, 3, 4], delay_ms=50)) as event_stream:
162            event_stream.assert_event_occurs_at_most(
163                lambda data: data.value_ > 1, timeout=timedelta(seconds=1), at_most_times=2)
164    except Exception as e:
165        logging.debug(e)
166        return True  # Failed as expected
167    return False
168
169
170def test_skip_a_test_core():
171    asserts.skip("Skipping this test because it's blocked by b/xyz")
172    assert False
173
174
175def test_nested_packets_core():
176    handle = 123
177    inside = hci_packets.ReadScanEnableBuilder()
178    logging.debug(inside.Serialize())
179    logging.debug("building outside")
180    outside = hci_packets.AclBuilder(handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
181                                     hci_packets.BroadcastFlag.POINT_TO_POINT, inside)
182    logging.debug(outside.Serialize())
183    logging.debug("Done!")
184
185
186def test_l2cap_config_options_core():
187    mtu_opt = l2cap_packets.MtuConfigurationOption()
188    mtu_opt.mtu = 123
189    fcs_opt = l2cap_packets.FrameCheckSequenceOption()
190    fcs_opt.fcs_type = l2cap_packets.FcsType.DEFAULT
191    request = l2cap_packets.ConfigurationRequestBuilder(
192        0x1d,  # Command ID
193        0xc1d,  # Channel ID
194        l2cap_packets.Continuation.END,
195        [mtu_opt, fcs_opt])
196    request_b_frame = l2cap_packets.BasicFrameBuilder(0x01, request)
197    handle = 123
198    wrapped = hci_packets.AclBuilder(handle, hci_packets.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
199                                     hci_packets.BroadcastFlag.POINT_TO_POINT, request_b_frame)
200    # Size is ACL (4) + L2CAP (4) + Configure (8) + MTU (4) + FCS (3)
201    asserts.assert_true(len(wrapped.Serialize()) == 23, "Packet serialized incorrectly")
202
203
204def test_assertThat_boolean_success_core():
205    assertThat(True).isTrue()
206    assertThat(False).isFalse()
207
208
209def test_assertThat_boolean_falseIsTrue_core():
210    try:
211        assertThat(False).isTrue()
212    except Exception as e:
213        return True
214    return False
215
216
217def test_assertThat_boolean_trueIsFalse_core():
218    try:
219        assertThat(True).isFalse()
220    except Exception as e:
221        return True
222    return False
223
224
225def test_assertThat_object_success_core():
226    assertThat("this").isEqualTo("this")
227    assertThat("this").isNotEqualTo("that")
228    assertThat(None).isNone()
229    assertThat("this").isNotNone()
230
231
232def test_assertThat_object_isEqualToFails_core():
233    try:
234        assertThat("this").isEqualTo("that")
235    except Exception as e:
236        return True
237    return False
238
239
240def test_assertThat_object_isNotEqualToFails_core():
241    try:
242        assertThat("this").isNotEqualTo("this")
243    except Exception as e:
244        return True
245    return False
246
247
248def test_assertThat_object_isNoneFails_core():
249    try:
250        assertThat("this").isNone()
251    except Exception as e:
252        return True
253    return False
254
255
256def test_assertThat_object_isNotNoneFails_core():
257    try:
258        assertThat(None).isNotNone()
259    except Exception as e:
260        return True
261    return False
262
263
264def test_assertThat_eventStream_emits_passes_core():
265    with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream:
266        assertThat(event_stream).emits(lambda data: data.value_ == 1)
267
268
269def test_assertThat_eventStream_emits_then_passes_core():
270    with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream:
271        assertThat(event_stream).emits(lambda data: data.value_ == 1).then(lambda data: data.value_ == 3)
272
273
274def test_assertThat_eventStream_emits_fails_core():
275    try:
276        with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream:
277            assertThat(event_stream).emits(lambda data: data.value_ == 4)
278    except Exception as e:
279        logging.debug(e)
280        return True  # Failed as expected
281    return False
282
283
284def test_assertThat_eventStream_emits_then_fails_core():
285    try:
286        with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream:
287            assertThat(event_stream).emits(lambda data: data.value_ == 1).emits(lambda data: data.value_ == 4)
288    except Exception as e:
289        logging.debug(e)
290        return True  # Failed as expected
291    return False
292
293
294def test_assertThat_eventStream_emitsInOrder_passes_core():
295    with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream:
296        assertThat(event_stream).emits(lambda data: data.value_ == 1, lambda data: data.value_ == 2).inOrder()
297
298
299def test_assertThat_eventStream_emitsInAnyOrder_passes_core():
300    with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream:
301        assertThat(event_stream).emits(lambda data: data.value_ == 2,
302                                       lambda data: data.value_ == 1).inAnyOrder().then(lambda data: data.value_ == 3)
303
304
305def test_assertThat_eventStream_emitsInOrder_fails_core():
306    try:
307        with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream:
308            assertThat(event_stream).emits(lambda data: data.value_ == 2, lambda data: data.value_ == 1).inOrder()
309    except Exception as e:
310        logging.debug(e)
311        return True  # Failed as expected
312    return False
313
314
315def test_assertThat_eventStream_emitsInAnyOrder_fails_core():
316    try:
317        with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream:
318            assertThat(event_stream).emits(lambda data: data.value_ == 4, lambda data: data.value_ == 1).inAnyOrder()
319    except Exception as e:
320        logging.debug(e)
321        return True  # Failed as expected
322    return False
323
324
325def test_assertThat_emitsNone_passes_core():
326    with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream:
327        assertThat(event_stream).emitsNone(
328            lambda data: data.value_ == 4, timeout=timedelta(seconds=0.15)).thenNone(
329                lambda data: data.value_ == 5, timeout=timedelta(seconds=0.15))
330
331
332def test_assertThat_emitsNone_passes_after_1_second_core():
333    with EventStream(FetchEvents(events=[1, 2, 3, 4], delay_ms=400)) as event_stream:
334        assertThat(event_stream).emitsNone(lambda data: data.value_ == 4, timeout=timedelta(seconds=1))
335
336
337def test_assertThat_emitsNone_fails_core():
338    try:
339        with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream:
340            assertThat(event_stream).emitsNone(lambda data: data.value_ == 2, timeout=timedelta(seconds=1))
341    except Exception as e:
342        logging.debug(e)
343        return True  # Failed as expected
344    return False
345
346
347def test_assertThat_emitsNone_zero_passes_core():
348    with EventStream(FetchEvents(events=[], delay_ms=50)) as event_stream:
349        assertThat(event_stream).emitsNone(timeout=timedelta(milliseconds=10)).thenNone(
350            timeout=timedelta(milliseconds=10))
351
352
353def test_assertThat_emitsNone_zero_passes_after_one_second_core():
354    with EventStream(FetchEvents([1], delay_ms=1500)) as event_stream:
355        assertThat(event_stream).emitsNone(timeout=timedelta(seconds=1.0))
356
357
358def test_assertThat_emitsNone_zero_fails_core():
359    try:
360        with EventStream(FetchEvents(events=[17], delay_ms=50)) as event_stream:
361            assertThat(event_stream).emitsNone(timeout=timedelta(seconds=1))
362    except Exception as e:
363        logging.debug(e)
364        return True  # Failed as expected
365    return False
366
367
368def test_filtering_event_stream_none_filter_function_core():
369    with EventStream(FetchEvents(events=[1, 2, 3], delay_ms=50)) as event_stream:
370        filtered_event_stream = FilteringEventStream(event_stream, None)
371        assertThat(filtered_event_stream) \
372            .emits(lambda data: data.value_ == 1) \
373            .then(lambda data: data.value_ == 3)
374
375
376def test_fluent_behavior_simple_core():
377    thing = ObjectWithBehaviors()
378
379    when(thing).test_request(anything()).then().increment_count()
380
381    thing.behaviors.test_request_behavior.run("A")
382
383    assertThat(thing.count).isEqualTo(1)
384    assertThat(thing.captured).isEqualTo(["A"])
385
386
387def test_fluent_behavior__then_single__captures_one_core():
388    thing = ObjectWithBehaviors()
389
390    thing.behaviors.test_request_behavior.set_default_to_ignore()
391
392    when(thing).test_request(anything()).then().increment_count()
393
394    thing.behaviors.test_request_behavior.run("A")
395    thing.behaviors.test_request_behavior.run("A")
396    thing.behaviors.test_request_behavior.run("A")
397
398    assertThat(thing.count).isEqualTo(1)
399    assertThat(thing.captured).isEqualTo(["A"])
400
401
402def test_fluent_behavior__then_times__captures_all_core():
403    thing = ObjectWithBehaviors()
404
405    when(thing).test_request(anything()).then(times=3).increment_count()
406
407    thing.behaviors.test_request_behavior.run("A")
408    thing.behaviors.test_request_behavior.run("B")
409    thing.behaviors.test_request_behavior.run("C")
410
411    assertThat(thing.count).isEqualTo(3)
412    assertThat(thing.captured).isEqualTo(["A", "B", "C"])
413
414
415def test_fluent_behavior__always__captures_all_core():
416    thing = ObjectWithBehaviors()
417
418    when(thing).test_request(anything()).always().increment_count()
419
420    thing.behaviors.test_request_behavior.run("A")
421    thing.behaviors.test_request_behavior.run("B")
422    thing.behaviors.test_request_behavior.run("C")
423
424    assertThat(thing.count).isEqualTo(3)
425    assertThat(thing.captured).isEqualTo(["A", "B", "C"])
426
427
428def test_fluent_behavior__matcher__captures_relevant_core():
429    thing = ObjectWithBehaviors()
430    thing.behaviors.test_request_behavior.set_default_to_ignore()
431
432    when(thing).test_request(lambda obj: obj == "B").always().increment_count()
433
434    thing.behaviors.test_request_behavior.run("A")
435    thing.behaviors.test_request_behavior.run("B")
436    thing.behaviors.test_request_behavior.run("C")
437
438    assertThat(thing.count).isEqualTo(1)
439    assertThat(thing.captured).isEqualTo(["B"])
440
441
442def test_fluent_behavior__then_repeated__captures_relevant_core():
443    thing = ObjectWithBehaviors()
444    thing.behaviors.test_request_behavior.set_default_to_ignore()
445
446    when(thing).test_request(anything()).then().increment_count().increment_count()
447
448    thing.behaviors.test_request_behavior.run("A")
449    thing.behaviors.test_request_behavior.run("B")
450    thing.behaviors.test_request_behavior.run("A")
451
452    assertThat(thing.count).isEqualTo(2)
453    assertThat(thing.captured).isEqualTo(["A", "B"])
454
455
456def test_fluent_behavior__fallback__captures_relevant_core():
457    thing = ObjectWithBehaviors()
458    thing.behaviors.test_request_behavior.set_default_to_ignore()
459
460    when(thing).test_request(lambda obj: obj == "B").then(times=1).increment_count()
461    when(thing).test_request(lambda obj: obj == "C").always().increment_count()
462
463    thing.behaviors.test_request_behavior.run("A")
464    thing.behaviors.test_request_behavior.run("B")
465    thing.behaviors.test_request_behavior.run("A")
466    thing.behaviors.test_request_behavior.run("C")
467    thing.behaviors.test_request_behavior.run("B")
468    thing.behaviors.test_request_behavior.run("C")
469
470    assertThat(thing.count).isEqualTo(3)
471    assertThat(thing.captured).isEqualTo(["B", "C", "C"])
472
473
474def test_fluent_behavior__default_unhandled_crash_core():
475    thing = ObjectWithBehaviors()
476
477    when(thing).test_request(anything()).then().increment_count()
478
479    thing.behaviors.test_request_behavior.run("A")
480    try:
481        thing.behaviors.test_request_behavior.run("A")
482    except Exception as e:
483        logging.debug(e)
484        return True  # Failed as expected
485    return False
486
487
488def test_fluent_behavior__set_default_works_core():
489    thing = ObjectWithBehaviors()
490    thing.behaviors.test_request_behavior.set_default(lambda obj: thing.increment_unhandled())
491
492    when(thing).test_request(anything()).then().increment_count()
493
494    thing.behaviors.test_request_behavior.run("A")
495    thing.behaviors.test_request_behavior.run("A")
496    assertThat(thing.unhandled_count).isEqualTo(1)
497
498
499def test_fluent_behavior__wait_until_done_core():
500    thing = ObjectWithBehaviors()
501    is_a = lambda obj: obj == "A"
502    when(thing).test_request(is_a).then().increment_count()
503
504    closure = lambda: thing.behaviors.test_request_behavior.run("A")
505    t = Timer(0.5, closure)
506    t.start()
507
508    wait_until(thing).test_request(is_a).times(1)
509    assertThat(thing.count).isEqualTo(1)
510    assertThat(thing.captured).isEqualTo(["A"])
511
512
513def test_fluent_behavior__wait_until_done_different_lambda_core():
514    thing = ObjectWithBehaviors()
515    when(thing).test_request(lambda obj: obj == "A").then().increment_count()
516
517    closure = lambda: thing.behaviors.test_request_behavior.run("A")
518    t = Timer(0.5, closure)
519    t.start()
520
521    wait_until(thing).test_request(lambda obj: obj == "A").times(1)
522    assertThat(thing.count).isEqualTo(1)
523    assertThat(thing.captured).isEqualTo(["A"])
524
525
526def test_fluent_behavior__wait_until_done_anything_core():
527    thing = ObjectWithBehaviors()
528    when(thing).test_request(lambda obj: obj == "A").then().increment_count()
529
530    closure = lambda: thing.behaviors.test_request_behavior.run("A")
531    t = Timer(0.5, closure)
532    t.start()
533
534    wait_until(thing).test_request(anything()).times(1)
535    assertThat(thing.count).isEqualTo(1)
536    assertThat(thing.captured).isEqualTo(["A"])
537
538
539def test_fluent_behavior__wait_until_done_not_happened_core():
540    thing = ObjectWithBehaviors()
541    thing.behaviors.test_request_behavior.set_default_to_ignore()
542    when(thing).test_request(lambda obj: obj == "A").then().increment_count()
543
544    closure = lambda: thing.behaviors.test_request_behavior.run("B")
545    t = Timer(0.5, closure)
546    t.start()
547    assertThat(wait_until(thing).test_request(lambda obj: obj == "A").times(1)).isFalse()
548
549
550def test_fluent_behavior__wait_until_done_with_default_core():
551    thing = ObjectWithBehaviors()
552    thing.behaviors.test_request_behavior.set_default(lambda obj: thing.increment_unhandled())
553
554    closure = lambda: thing.behaviors.test_request_behavior.run("A")
555    t = Timer(0.5, closure)
556    t.start()
557
558    wait_until(thing).test_request(anything()).times(1)
559    assertThat(thing.unhandled_count).isEqualTo(1)
560
561
562def test_fluent_behavior__wait_until_done_two_events_AA_core():
563    thing = ObjectWithBehaviors()
564    when(thing).test_request(lambda obj: obj == "A").then().increment_count().increment_count()
565
566    closure1 = lambda: thing.behaviors.test_request_behavior.run("A")
567    t1 = Timer(0.5, closure1)
568    t1.start()
569    closure2 = lambda: thing.behaviors.test_request_behavior.run("A")
570    t2 = Timer(0.5, closure2)
571    t2.start()
572
573    wait_until(thing).test_request(lambda obj: obj == "A").times(2)
574    assertThat(thing.count).isEqualTo(2)
575    assertThat(thing.captured).isEqualTo(["A", "A"])
576
577
578def test_fluent_behavior__wait_until_done_two_events_AB_core():
579    thing = ObjectWithBehaviors()
580    when(thing).test_request(anything()).always().increment_count()
581
582    closure1 = lambda: thing.behaviors.test_request_behavior.run("A")
583    t1 = Timer(0.5, closure1)
584    t1.start()
585    closure2 = lambda: thing.behaviors.test_request_behavior.run("B")
586    t2 = Timer(1, closure2)
587    t2.start()
588
589    wait_until(thing).test_request(anything()).times(2)
590    assertThat(thing.count).isEqualTo(2)
591    assertThat(thing.captured).isEqualTo(["A", "B"])
592
593
594def test_fluent_behavior__wait_until_done_only_one_event_is_done_core():
595    thing = ObjectWithBehaviors()
596    when(thing).test_request(anything()).always().increment_count()
597
598    closure1 = lambda: thing.behaviors.test_request_behavior.run("A")
599    t1 = Timer(1, closure1)
600    t1.start()
601    closure2 = lambda: thing.behaviors.test_request_behavior.run("B")
602    t2 = Timer(3, closure2)
603    t2.start()
604    assertThat(wait_until(thing).test_request(lambda obj: obj == "A").times(2)).isFalse()
605