1#!/usr/bin/env python3 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from blueberry.tests.gd.cert.truth import assertThat 18from blueberry.tests.gd.cert.py_hal import PyHal 19from blueberry.tests.gd.cert.matchers import HciMatchers 20from blueberry.tests.gd.cert import gd_base_test 21from blueberry.utils import bluetooth 22from mobly import test_runner 23import hci_packets as hci 24 25_GRPC_TIMEOUT = 10 26 27 28class SimpleHalTest(gd_base_test.GdBaseTestClass): 29 30 def setup_class(self): 31 gd_base_test.GdBaseTestClass.setup_class(self, dut_module='HAL', cert_module='HAL') 32 33 def setup_test(self): 34 gd_base_test.GdBaseTestClass.setup_test(self) 35 self.dut_hal = PyHal(self.dut) 36 self.cert_hal = PyHal(self.cert) 37 38 self.dut_hal.reset() 39 self.cert_hal.reset() 40 41 def teardown_test(self): 42 self.dut_hal.close() 43 self.cert_hal.close() 44 gd_base_test.GdBaseTestClass.teardown_test(self) 45 46 def test_stream_events(self): 47 self.dut_hal.send_hci_command( 48 hci.LeAddDeviceToFilterAcceptList(address_type=hci.FilterAcceptListAddressType.RANDOM, 49 address=bluetooth.Address('0C:05:04:03:02:01'))) 50 assertThat(self.dut_hal.get_hci_event_stream()).emits( 51 HciMatchers.Exactly( 52 hci.LeAddDeviceToFilterAcceptListComplete(num_hci_command_packets=1, status=hci.ErrorCode.SUCCESS))) 53 54 def test_loopback_hci_command(self): 55 self.dut_hal.send_hci_command(hci.WriteLoopbackMode(loopback_mode=hci.LoopbackMode.ENABLE_LOCAL)) 56 57 command = hci.LeAddDeviceToFilterAcceptList(address_type=hci.FilterAcceptListAddressType.RANDOM, 58 address=bluetooth.Address('0C:05:04:03:02:01')) 59 self.dut_hal.send_hci_command(command) 60 61 assertThat(self.dut_hal.get_hci_event_stream()).emits(HciMatchers.LoopbackOf(command.serialize())) 62 63 def test_inquiry_from_dut(self): 64 self.cert_hal.send_hci_command(hci.WriteScanEnable(scan_enable=hci.ScanEnable.INQUIRY_AND_PAGE_SCAN)) 65 66 self.dut_hal.send_hci_command(hci.Inquiry(lap=hci.Lap(lap=0x33), inquiry_length=0x30, num_responses=0xff)) 67 68 assertThat(self.dut_hal.get_hci_event_stream()).emits(lambda packet: b'\x02\x0f' in packet.payload 69 # Expecting an HCI Event (code 0x02, length 0x0f) 70 ) 71 72 def test_le_ad_scan_cert_advertises(self): 73 self.dut_hal.unmask_event(hci.EventCode.LE_META_EVENT) 74 self.dut_hal.unmask_le_event(hci.SubeventCode.EXTENDED_ADVERTISING_REPORT) 75 self.dut_hal.set_random_le_address('0D:05:04:03:02:01') 76 77 self.dut_hal.set_scan_parameters() 78 self.dut_hal.start_scanning() 79 80 advertisement = self.cert_hal.create_advertisement(0, 81 '0C:05:04:03:02:01', 82 min_interval=512, 83 max_interval=768, 84 peer_address='A6:A5:A4:A3:A2:A1', 85 tx_power=0x7f, 86 sid=1) 87 advertisement.set_data(b'Im_A_Cert') 88 advertisement.start() 89 90 assertThat(self.dut_hal.get_hci_event_stream()).emits(lambda packet: b'Im_A_Cert' in packet.payload) 91 92 advertisement.stop() 93 94 self.dut_hal.stop_scanning() 95 96 def test_le_connection_dut_advertises(self): 97 self.cert_hal.unmask_event(hci.EventCode.LE_META_EVENT) 98 self.cert_hal.set_random_le_address('0C:05:04:03:02:01') 99 self.cert_hal.initiate_le_connection('0D:05:04:03:02:01') 100 101 # DUT Advertises 102 self.dut_hal.unmask_event(hci.EventCode.LE_META_EVENT) 103 advertisement = self.dut_hal.create_advertisement(0, '0D:05:04:03:02:01') 104 advertisement.set_data(b'Im_The_DUT') 105 advertisement.set_scan_response(b'Im_The_D') 106 advertisement.start() 107 108 cert_acl = self.cert_hal.complete_le_connection() 109 dut_acl = self.dut_hal.complete_le_connection() 110 111 dut_acl.send_first(b'Just SomeAclData') 112 cert_acl.send_first(b'Just SomeMoreAclData') 113 114 assertThat(self.cert_hal.get_acl_stream()).emits(lambda packet: b'SomeAclData' in packet.payload) 115 assertThat(self.dut_hal.get_acl_stream()).emits(lambda packet: b'SomeMoreAclData' in packet.payload) 116 117 def test_le_filter_accept_list_connection_cert_advertises(self): 118 self.dut_hal.unmask_event(hci.EventCode.LE_META_EVENT) 119 self.dut_hal.set_random_le_address('0D:05:04:03:02:01') 120 self.dut_hal.add_to_filter_accept_list('0C:05:04:03:02:01') 121 self.dut_hal.initiate_le_connection_by_filter_accept_list('BA:D5:A4:A3:A2:A1') 122 123 self.cert_hal.unmask_event(hci.EventCode.LE_META_EVENT) 124 advertisement = self.cert_hal.create_advertisement(1, 125 '0C:05:04:03:02:01', 126 min_interval=512, 127 max_interval=768, 128 peer_address='A6:A5:A4:A3:A2:A1', 129 tx_power=0x7F, 130 sid=0) 131 advertisement.set_data(b'Im_A_Cert') 132 advertisement.start() 133 134 assertThat(self.cert_hal.get_hci_event_stream()).emits(HciMatchers.LeConnectionComplete()) 135 assertThat(self.dut_hal.get_hci_event_stream()).emits(HciMatchers.LeConnectionComplete()) 136 137 138if __name__ == '__main__': 139 test_runner.main() 140