1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from cert.gd_base_test import GdBaseTestClass
18from cert.truth import assertThat
19from neighbor.facade import facade_pb2 as neighbor_facade
20from bluetooth_packets_python3 import hci_packets
21from cert.py_hci import PyHci
22from cert.py_acl_manager import PyAclManager
23
24
25class AclManagerTest(GdBaseTestClass):
26
27    def setup_class(self):
28        super().setup_class(dut_module='HCI_INTERFACES', cert_module='HCI')
29
30    # todo: move into GdBaseTestClass, based on modules inited
31    def setup_test(self):
32        super().setup_test()
33        self.cert_hci = PyHci(self.cert, acl_streaming=True)
34        self.dut_acl_manager = PyAclManager(self.dut)
35
36    def teardown_test(self):
37        self.cert_hci.close()
38        super().teardown_test()
39
40    def test_dut_connects(self):
41        self.cert_hci.enable_inquiry_and_page_scan()
42        cert_address = self.cert_hci.read_own_address()
43
44        self.dut_acl_manager.initiate_connection(cert_address)
45        cert_acl = self.cert_hci.accept_connection()
46        with self.dut_acl_manager.complete_outgoing_connection() as dut_acl:
47            cert_acl.send_first(b'\x26\x00\x07\x00This is just SomeAclData from the Cert')
48            dut_acl.send(b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT')
49
50            assertThat(cert_acl).emits(lambda packet: b'SomeMoreAclData' in packet.payload)
51            assertThat(dut_acl).emits(lambda packet: b'SomeAclData' in packet.payload)
52
53    def test_cert_connects(self):
54        dut_address = self.dut.hci_controller.GetMacAddressSimple()
55        self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True))
56
57        self.dut_acl_manager.listen_for_an_incoming_connection()
58        self.cert_hci.initiate_connection(dut_address)
59        with self.dut_acl_manager.complete_incoming_connection() as dut_acl:
60            cert_acl = self.cert_hci.complete_connection()
61
62            dut_acl.send(b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT')
63
64            cert_acl.send_first(b'\x26\x00\x07\x00This is just SomeAclData from the Cert')
65
66            assertThat(cert_acl).emits(lambda packet: b'SomeMoreAclData' in packet.payload)
67            assertThat(dut_acl).emits(lambda packet: b'SomeAclData' in packet.payload)
68
69    def test_reject_broadcast(self):
70        dut_address = self.dut.hci_controller.GetMacAddressSimple()
71        self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True))
72
73        self.dut_acl_manager.listen_for_an_incoming_connection()
74        self.cert_hci.initiate_connection(dut_address)
75        with self.dut_acl_manager.complete_incoming_connection() as dut_acl:
76            cert_acl = self.cert_hci.complete_connection()
77
78            cert_acl.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE,
79                          hci_packets.BroadcastFlag.ACTIVE_PERIPHERAL_BROADCAST,
80                          b'\x26\x00\x07\x00This is a Broadcast from the Cert')
81            assertThat(dut_acl).emitsNone()
82
83            cert_acl.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE,
84                          hci_packets.BroadcastFlag.POINT_TO_POINT,
85                          b'\x26\x00\x07\x00This is just SomeAclData from the Cert')
86            assertThat(dut_acl).emits(lambda packet: b'SomeAclData' in packet.payload)
87
88    def test_cert_connects_disconnects(self):
89        dut_address = self.dut.hci_controller.GetMacAddressSimple()
90        self.dut.neighbor.EnablePageScan(neighbor_facade.EnableMsg(enabled=True))
91
92        self.dut_acl_manager.listen_for_an_incoming_connection()
93        self.cert_hci.initiate_connection(dut_address)
94        with self.dut_acl_manager.complete_incoming_connection() as dut_acl:
95            cert_acl = self.cert_hci.complete_connection()
96
97            dut_acl.send(b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT')
98
99            cert_acl.send_first(b'\x26\x00\x07\x00This is just SomeAclData from the Cert')
100
101            assertThat(cert_acl).emits(lambda packet: b'SomeMoreAclData' in packet.payload)
102            assertThat(dut_acl).emits(lambda packet: b'SomeAclData' in packet.payload)
103
104            dut_acl.disconnect(hci_packets.DisconnectReason.REMOTE_USER_TERMINATED_CONNECTION)
105            dut_acl.wait_for_disconnection_complete()
106
107    def test_recombination_l2cap_packet(self):
108        self.cert_hci.enable_inquiry_and_page_scan()
109        cert_address = self.cert_hci.read_own_address()
110
111        self.dut_acl_manager.initiate_connection(cert_address)
112        cert_acl = self.cert_hci.accept_connection()
113        with self.dut_acl_manager.complete_outgoing_connection() as dut_acl:
114            cert_acl.send_first(b'\x06\x00\x07\x00Hello')
115            cert_acl.send_continuing(b'!')
116            cert_acl.send_first(b'\xe8\x03\x07\x00' + b'Hello' * 200)
117
118            assertThat(dut_acl).emits(lambda packet: b'Hello!' in packet.payload,
119                                      lambda packet: b'Hello' * 200 in packet.payload).inOrder()
120