1#!/usr/bin/env python3
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from blueberry.tests.gd.cert.truth import assertThat
18from blueberry.tests.gd.cert.py_hal import PyHal
19from blueberry.tests.gd.cert.matchers import HciMatchers
20from blueberry.tests.gd.cert import gd_base_test
21from blueberry.utils import bluetooth
22from mobly import test_runner
23import hci_packets as hci
24
25_GRPC_TIMEOUT = 10
26
27
28class SimpleHalTest(gd_base_test.GdBaseTestClass):
29
30    def setup_class(self):
31        gd_base_test.GdBaseTestClass.setup_class(self, dut_module='HAL', cert_module='HAL')
32
33    def setup_test(self):
34        gd_base_test.GdBaseTestClass.setup_test(self)
35        self.dut_hal = PyHal(self.dut)
36        self.cert_hal = PyHal(self.cert)
37
38        self.dut_hal.reset()
39        self.cert_hal.reset()
40
41    def teardown_test(self):
42        self.dut_hal.close()
43        self.cert_hal.close()
44        gd_base_test.GdBaseTestClass.teardown_test(self)
45
46    def test_stream_events(self):
47        self.dut_hal.send_hci_command(
48            hci.LeAddDeviceToFilterAcceptList(address_type=hci.FilterAcceptListAddressType.RANDOM,
49                                              address=bluetooth.Address('0C:05:04:03:02:01')))
50        assertThat(self.dut_hal.get_hci_event_stream()).emits(
51            HciMatchers.Exactly(
52                hci.LeAddDeviceToFilterAcceptListComplete(num_hci_command_packets=1, status=hci.ErrorCode.SUCCESS)))
53
54    def test_loopback_hci_command(self):
55        self.dut_hal.send_hci_command(hci.WriteLoopbackMode(loopback_mode=hci.LoopbackMode.ENABLE_LOCAL))
56
57        command = hci.LeAddDeviceToFilterAcceptList(address_type=hci.FilterAcceptListAddressType.RANDOM,
58                                                    address=bluetooth.Address('0C:05:04:03:02:01'))
59        self.dut_hal.send_hci_command(command)
60
61        assertThat(self.dut_hal.get_hci_event_stream()).emits(HciMatchers.LoopbackOf(command.serialize()))
62
63    def test_inquiry_from_dut(self):
64        self.cert_hal.send_hci_command(hci.WriteScanEnable(scan_enable=hci.ScanEnable.INQUIRY_AND_PAGE_SCAN))
65
66        self.dut_hal.send_hci_command(hci.Inquiry(lap=hci.Lap(lap=0x33), inquiry_length=0x30, num_responses=0xff))
67
68        assertThat(self.dut_hal.get_hci_event_stream()).emits(lambda packet: b'\x02\x0f' in packet.payload
69                                                              # Expecting an HCI Event (code 0x02, length 0x0f)
70                                                             )
71
72    def test_le_ad_scan_cert_advertises(self):
73        self.dut_hal.unmask_event(hci.EventCode.LE_META_EVENT)
74        self.dut_hal.unmask_le_event(hci.SubeventCode.EXTENDED_ADVERTISING_REPORT)
75        self.dut_hal.set_random_le_address('0D:05:04:03:02:01')
76
77        self.dut_hal.set_scan_parameters()
78        self.dut_hal.start_scanning()
79
80        advertisement = self.cert_hal.create_advertisement(0,
81                                                           '0C:05:04:03:02:01',
82                                                           min_interval=512,
83                                                           max_interval=768,
84                                                           peer_address='A6:A5:A4:A3:A2:A1',
85                                                           tx_power=0x7f,
86                                                           sid=1)
87        advertisement.set_data(b'Im_A_Cert')
88        advertisement.start()
89
90        assertThat(self.dut_hal.get_hci_event_stream()).emits(lambda packet: b'Im_A_Cert' in packet.payload)
91
92        advertisement.stop()
93
94        self.dut_hal.stop_scanning()
95
96    def test_le_connection_dut_advertises(self):
97        self.cert_hal.unmask_event(hci.EventCode.LE_META_EVENT)
98        self.cert_hal.set_random_le_address('0C:05:04:03:02:01')
99        self.cert_hal.initiate_le_connection('0D:05:04:03:02:01')
100
101        # DUT Advertises
102        self.dut_hal.unmask_event(hci.EventCode.LE_META_EVENT)
103        advertisement = self.dut_hal.create_advertisement(0, '0D:05:04:03:02:01')
104        advertisement.set_data(b'Im_The_DUT')
105        advertisement.set_scan_response(b'Im_The_D')
106        advertisement.start()
107
108        cert_acl = self.cert_hal.complete_le_connection()
109        dut_acl = self.dut_hal.complete_le_connection()
110
111        dut_acl.send_first(b'Just SomeAclData')
112        cert_acl.send_first(b'Just SomeMoreAclData')
113
114        assertThat(self.cert_hal.get_acl_stream()).emits(lambda packet: b'SomeAclData' in packet.payload)
115        assertThat(self.dut_hal.get_acl_stream()).emits(lambda packet: b'SomeMoreAclData' in packet.payload)
116
117    def test_le_filter_accept_list_connection_cert_advertises(self):
118        self.dut_hal.unmask_event(hci.EventCode.LE_META_EVENT)
119        self.dut_hal.set_random_le_address('0D:05:04:03:02:01')
120        self.dut_hal.add_to_filter_accept_list('0C:05:04:03:02:01')
121        self.dut_hal.initiate_le_connection_by_filter_accept_list('BA:D5:A4:A3:A2:A1')
122
123        self.cert_hal.unmask_event(hci.EventCode.LE_META_EVENT)
124        advertisement = self.cert_hal.create_advertisement(1,
125                                                           '0C:05:04:03:02:01',
126                                                           min_interval=512,
127                                                           max_interval=768,
128                                                           peer_address='A6:A5:A4:A3:A2:A1',
129                                                           tx_power=0x7F,
130                                                           sid=0)
131        advertisement.set_data(b'Im_A_Cert')
132        advertisement.start()
133
134        assertThat(self.cert_hal.get_hci_event_stream()).emits(HciMatchers.LeConnectionComplete())
135        assertThat(self.dut_hal.get_hci_event_stream()).emits(HciMatchers.LeConnectionComplete())
136
137
138if __name__ == '__main__':
139    test_runner.main()
140