1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass 18from cert.event_callback_stream import EventCallbackStream 19from cert.event_asserts import EventAsserts 20from google.protobuf import empty_pb2 as empty_proto 21from facade import rootservice_pb2 as facade_rootservice 22from facade import common_pb2 as common 23from hci.facade import le_acl_manager_facade_pb2 as le_acl_manager_facade 24from hci.facade import le_advertising_manager_facade_pb2 as le_advertising_facade 25from hci.facade import facade_pb2 as hci_facade 26import bluetooth_packets_python3 as bt_packets 27from bluetooth_packets_python3 import hci_packets 28 29 30class LeAclManagerTest(GdFacadeOnlyBaseTestClass): 31 32 def setup_test(self): 33 self.device_under_test.rootservice.StartStack( 34 facade_rootservice.StartStackRequest( 35 module_under_test=facade_rootservice.BluetoothModule.Value( 36 'HCI_INTERFACES'),)) 37 self.cert_device.rootservice.StartStack( 38 facade_rootservice.StartStackRequest( 39 module_under_test=facade_rootservice.BluetoothModule.Value( 40 'HCI'),)) 41 42 self.device_under_test.wait_channel_ready() 43 self.cert_device.wait_channel_ready() 44 45 def teardown_test(self): 46 self.device_under_test.rootservice.StopStack( 47 facade_rootservice.StopStackRequest()) 48 self.cert_device.rootservice.StopStack( 49 facade_rootservice.StopStackRequest()) 50 51 def register_for_event(self, event_code): 52 msg = hci_facade.EventCodeMsg(code=int(event_code)) 53 self.cert_device.hci.RegisterEventHandler(msg) 54 55 def register_for_le_event(self, event_code): 56 msg = hci_facade.LeSubeventCodeMsg(code=int(event_code)) 57 self.cert_device.hci.RegisterLeEventHandler(msg) 58 59 def enqueue_hci_command(self, command, expect_complete): 60 cmd_bytes = bytes(command.Serialize()) 61 cmd = hci_facade.CommandMsg(command=cmd_bytes) 62 if (expect_complete): 63 self.cert_device.hci.EnqueueCommandWithComplete(cmd) 64 else: 65 self.cert_device.hci.EnqueueCommandWithStatus(cmd) 66 67 def enqueue_acl_data(self, handle, pb_flag, b_flag, acl): 68 acl_msg = hci_facade.AclMsg( 69 handle=int(handle), 70 packet_boundary_flag=int(pb_flag), 71 broadcast_flag=int(b_flag), 72 data=acl) 73 self.cert_device.hci.SendAclData(acl_msg) 74 75 def test_dut_connects(self): 76 self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE) 77 with EventCallbackStream(self.cert_device.hci.FetchLeSubevents(empty_proto.Empty())) as cert_hci_le_event_stream, \ 78 EventCallbackStream(self.cert_device.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ 79 EventCallbackStream(self.device_under_test.hci_le_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: 80 81 cert_hci_le_event_asserts = EventAsserts(cert_hci_le_event_stream) 82 acl_data_asserts = EventAsserts(acl_data_stream) 83 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 84 85 # Cert Advertises 86 advertising_handle = 0 87 self.enqueue_hci_command( 88 hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder( 89 advertising_handle, 90 hci_packets.LegacyAdvertisingProperties.ADV_IND, 91 400, 92 450, 93 7, 94 hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 95 hci_packets.PeerAddressType. 96 PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, 97 '00:00:00:00:00:00', 98 hci_packets.AdvertisingFilterPolicy.ALL_DEVICES, 99 0xF8, 100 1, #SID 101 hci_packets.Enable.DISABLED # Scan request notification 102 ), 103 True) 104 105 self.enqueue_hci_command( 106 hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder( 107 advertising_handle, '0C:05:04:03:02:01'), True) 108 109 gap_name = hci_packets.GapData() 110 gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME 111 gap_name.data = list(bytes(b'Im_A_Cert')) 112 113 self.enqueue_hci_command( 114 hci_packets.LeSetExtendedAdvertisingDataBuilder( 115 advertising_handle, 116 hci_packets.Operation.COMPLETE_ADVERTISEMENT, 117 hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, 118 [gap_name]), True) 119 120 gap_short_name = hci_packets.GapData() 121 gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME 122 gap_short_name.data = list(bytes(b'Im_A_C')) 123 124 self.enqueue_hci_command( 125 hci_packets.LeSetExtendedAdvertisingScanResponseBuilder( 126 advertising_handle, 127 hci_packets.Operation.COMPLETE_ADVERTISEMENT, 128 hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, 129 [gap_short_name]), True) 130 131 enabled_set = hci_packets.EnabledSet() 132 enabled_set.advertising_handle = advertising_handle 133 enabled_set.duration = 0 134 enabled_set.max_extended_advertising_events = 0 135 self.enqueue_hci_command( 136 hci_packets.LeSetExtendedAdvertisingEnableBuilder( 137 hci_packets.Enable.ENABLED, [enabled_set]), True) 138 139 with EventCallbackStream( 140 self.device_under_test.hci_le_acl_manager.CreateConnection( 141 le_acl_manager_facade.LeConnectionMsg( 142 address_type=int( 143 hci_packets.AddressType.RANDOM_DEVICE_ADDRESS), 144 address=bytes('0C:05:04:03:02:01', 145 'utf8')))) as connection_event_stream: 146 147 connection_event_asserts = EventAsserts(connection_event_stream) 148 149 # Cert gets ConnectionComplete with a handle and sends ACL data 150 handle = 0xfff 151 152 def get_handle(packet): 153 packet_bytes = packet.event 154 nonlocal handle 155 if b'\x3e\x13\x01\x00' in packet_bytes: 156 cc_view = hci_packets.LeConnectionCompleteView( 157 hci_packets.LeMetaEventView( 158 hci_packets.EventPacketView( 159 bt_packets.PacketViewLittleEndian( 160 list(packet_bytes))))) 161 handle = cc_view.GetConnectionHandle() 162 return True 163 if b'\x3e\x13\x0A\x00' in packet_bytes: 164 cc_view = hci_packets.LeEnhancedConnectionCompleteView( 165 hci_packets.LeMetaEventView( 166 hci_packets.EventPacketView( 167 bt_packets.PacketViewLittleEndian( 168 list(packet_bytes))))) 169 handle = cc_view.GetConnectionHandle() 170 return True 171 return False 172 173 cert_hci_le_event_asserts.assert_event_occurs(get_handle) 174 cert_handle = handle 175 176 self.enqueue_acl_data( 177 cert_handle, hci_packets.PacketBoundaryFlag. 178 FIRST_AUTOMATICALLY_FLUSHABLE, 179 hci_packets.BroadcastFlag.POINT_TO_POINT, 180 bytes(b'\x19\x00\x07\x00SomeAclData from the Cert')) 181 182 # DUT gets a connection complete event and sends and receives 183 handle = 0xfff 184 connection_event_asserts.assert_event_occurs(get_handle) 185 186 self.device_under_test.hci_le_acl_manager.SendAclData( 187 le_acl_manager_facade.LeAclData( 188 handle=handle, 189 payload=bytes( 190 b'\x1C\x00\x07\x00SomeMoreAclData from the DUT'))) 191 192 cert_acl_data_asserts.assert_event_occurs( 193 lambda packet: b'SomeMoreAclData' in packet.data) 194 acl_data_asserts.assert_event_occurs( 195 lambda packet: b'SomeAclData' in packet.payload) 196 197 def test_cert_connects(self): 198 self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE) 199 with EventCallbackStream(self.cert_device.hci.FetchLeSubevents(empty_proto.Empty())) as cert_hci_le_event_stream, \ 200 EventCallbackStream(self.cert_device.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ 201 EventCallbackStream(self.device_under_test.hci_le_acl_manager.FetchIncomingConnection(empty_proto.Empty())) as incoming_connection_stream, \ 202 EventCallbackStream(self.device_under_test.hci_le_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: 203 204 cert_hci_le_event_asserts = EventAsserts(cert_hci_le_event_stream) 205 incoming_connection_asserts = EventAsserts( 206 incoming_connection_stream) 207 acl_data_asserts = EventAsserts(acl_data_stream) 208 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 209 210 # DUT Advertises 211 gap_name = hci_packets.GapData() 212 gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME 213 gap_name.data = list(bytes(b'Im_The_DUT')) 214 gap_data = le_advertising_facade.GapDataMsg( 215 data=bytes(gap_name.Serialize())) 216 config = le_advertising_facade.AdvertisingConfig( 217 advertisement=[gap_data], 218 random_address=common.BluetoothAddress( 219 address=bytes(b'0D:05:04:03:02:01')), 220 interval_min=512, 221 interval_max=768, 222 event_type=le_advertising_facade.AdvertisingEventType.ADV_IND, 223 address_type=common.RANDOM_DEVICE_ADDRESS, 224 peer_address_type=common.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, 225 peer_address=common.BluetoothAddress( 226 address=bytes(b'A6:A5:A4:A3:A2:A1')), 227 channel_map=7, 228 filter_policy=le_advertising_facade.AdvertisingFilterPolicy. 229 ALL_DEVICES) 230 request = le_advertising_facade.CreateAdvertiserRequest( 231 config=config) 232 233 create_response = self.device_under_test.hci_le_advertising_manager.CreateAdvertiser( 234 request) 235 236 # Cert Connects 237 self.enqueue_hci_command( 238 hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01'), 239 True) 240 phy_scan_params = hci_packets.LeCreateConnPhyScanParameters() 241 phy_scan_params.scan_interval = 0x60 242 phy_scan_params.scan_window = 0x30 243 phy_scan_params.conn_interval_min = 0x18 244 phy_scan_params.conn_interval_max = 0x28 245 phy_scan_params.conn_latency = 0 246 phy_scan_params.supervision_timeout = 0x1f4 247 phy_scan_params.min_ce_length = 0 248 phy_scan_params.max_ce_length = 0 249 self.enqueue_hci_command( 250 hci_packets.LeExtendedCreateConnectionBuilder( 251 hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, 252 hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 253 hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, 254 '0D:05:04:03:02:01', 1, [phy_scan_params]), False) 255 256 # Cert gets ConnectionComplete with a handle and sends ACL data 257 handle = 0xfff 258 259 def get_handle(packet): 260 packet_bytes = packet.event 261 nonlocal handle 262 if b'\x3e\x13\x01\x00' in packet_bytes: 263 cc_view = hci_packets.LeConnectionCompleteView( 264 hci_packets.LeMetaEventView( 265 hci_packets.EventPacketView( 266 bt_packets.PacketViewLittleEndian( 267 list(packet_bytes))))) 268 handle = cc_view.GetConnectionHandle() 269 return True 270 if b'\x3e\x13\x0A\x00' in packet_bytes: 271 cc_view = hci_packets.LeEnhancedConnectionCompleteView( 272 hci_packets.LeMetaEventView( 273 hci_packets.EventPacketView( 274 bt_packets.PacketViewLittleEndian( 275 list(packet_bytes))))) 276 handle = cc_view.GetConnectionHandle() 277 return True 278 return False 279 280 cert_hci_le_event_asserts.assert_event_occurs(get_handle) 281 cert_handle = handle 282 283 self.enqueue_acl_data( 284 cert_handle, 285 hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, 286 hci_packets.BroadcastFlag.POINT_TO_POINT, 287 bytes(b'\x19\x00\x07\x00SomeAclData from the Cert')) 288 289 # DUT gets a connection complete event and sends and receives 290 handle = 0xfff 291 incoming_connection_asserts.assert_event_occurs(get_handle) 292 293 self.device_under_test.hci_le_acl_manager.SendAclData( 294 le_acl_manager_facade.LeAclData( 295 handle=handle, 296 payload=bytes( 297 b'\x1C\x00\x07\x00SomeMoreAclData from the DUT'))) 298 299 cert_acl_data_asserts.assert_event_occurs( 300 lambda packet: b'SomeMoreAclData' in packet.data) 301 acl_data_asserts.assert_event_occurs( 302 lambda packet: b'SomeAclData' in packet.payload) 303 304 def test_recombination_l2cap_packet(self): 305 self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE) 306 with EventCallbackStream(self.cert_device.hci.FetchLeSubevents(empty_proto.Empty())) as cert_hci_le_event_stream, \ 307 EventCallbackStream(self.cert_device.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ 308 EventCallbackStream(self.device_under_test.hci_le_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: 309 310 cert_hci_le_event_asserts = EventAsserts(cert_hci_le_event_stream) 311 acl_data_asserts = EventAsserts(acl_data_stream) 312 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 313 314 # Cert Advertises 315 advertising_handle = 0 316 self.enqueue_hci_command( 317 hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder( 318 advertising_handle, 319 hci_packets.LegacyAdvertisingProperties.ADV_IND, 320 400, 321 450, 322 7, 323 hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 324 hci_packets.PeerAddressType. 325 PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, 326 '00:00:00:00:00:00', 327 hci_packets.AdvertisingFilterPolicy.ALL_DEVICES, 328 0xF8, 329 1, #SID 330 hci_packets.Enable.DISABLED # Scan request notification 331 ), 332 True) 333 334 self.enqueue_hci_command( 335 hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder( 336 advertising_handle, '0C:05:04:03:02:01'), True) 337 338 gap_name = hci_packets.GapData() 339 gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME 340 gap_name.data = list(bytes(b'Im_A_Cert')) 341 342 self.enqueue_hci_command( 343 hci_packets.LeSetExtendedAdvertisingDataBuilder( 344 advertising_handle, 345 hci_packets.Operation.COMPLETE_ADVERTISEMENT, 346 hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, 347 [gap_name]), True) 348 349 gap_short_name = hci_packets.GapData() 350 gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME 351 gap_short_name.data = list(bytes(b'Im_A_C')) 352 353 self.enqueue_hci_command( 354 hci_packets.LeSetExtendedAdvertisingScanResponseBuilder( 355 advertising_handle, 356 hci_packets.Operation.COMPLETE_ADVERTISEMENT, 357 hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, 358 [gap_short_name]), True) 359 360 enabled_set = hci_packets.EnabledSet() 361 enabled_set.advertising_handle = advertising_handle 362 enabled_set.duration = 0 363 enabled_set.max_extended_advertising_events = 0 364 self.enqueue_hci_command( 365 hci_packets.LeSetExtendedAdvertisingEnableBuilder( 366 hci_packets.Enable.ENABLED, [enabled_set]), True) 367 368 with EventCallbackStream( 369 self.device_under_test.hci_le_acl_manager.CreateConnection( 370 le_acl_manager_facade.LeConnectionMsg( 371 address_type=int( 372 hci_packets.AddressType.RANDOM_DEVICE_ADDRESS), 373 address=bytes('0C:05:04:03:02:01', 374 'utf8')))) as connection_event_stream: 375 376 connection_event_asserts = EventAsserts(connection_event_stream) 377 378 # Cert gets ConnectionComplete with a handle and sends ACL data 379 handle = 0xfff 380 381 def get_handle(packet): 382 packet_bytes = packet.event 383 nonlocal handle 384 if b'\x3e\x13\x01\x00' in packet_bytes: 385 cc_view = hci_packets.LeConnectionCompleteView( 386 hci_packets.LeMetaEventView( 387 hci_packets.EventPacketView( 388 bt_packets.PacketViewLittleEndian( 389 list(packet_bytes))))) 390 handle = cc_view.GetConnectionHandle() 391 return True 392 if b'\x3e\x13\x0A\x00' in packet_bytes: 393 cc_view = hci_packets.LeEnhancedConnectionCompleteView( 394 hci_packets.LeMetaEventView( 395 hci_packets.EventPacketView( 396 bt_packets.PacketViewLittleEndian( 397 list(packet_bytes))))) 398 handle = cc_view.GetConnectionHandle() 399 return True 400 return False 401 402 cert_hci_le_event_asserts.assert_event_occurs(get_handle) 403 cert_handle = handle 404 405 # DUT gets a connection complete event 406 connection_event_asserts.assert_event_occurs(get_handle) 407 408 self.enqueue_acl_data( 409 cert_handle, hci_packets.PacketBoundaryFlag. 410 FIRST_AUTOMATICALLY_FLUSHABLE, 411 hci_packets.BroadcastFlag.POINT_TO_POINT, 412 bytes(b'\x06\x00\x07\x00Hello')) 413 self.enqueue_acl_data( 414 cert_handle, 415 hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, 416 hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'!')) 417 418 acl_data_asserts.assert_event_occurs( 419 lambda packet: b'Hello!' in packet.payload) 420