1#!/usr/bin/env python3 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from datetime import timedelta 18import os 19import sys 20import logging 21 22from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass 23from cert.event_callback_stream import EventCallbackStream 24from cert.event_asserts import EventAsserts 25from google.protobuf import empty_pb2 as empty_proto 26from facade import rootservice_pb2 as facade_rootservice 27from hal import facade_pb2 as hal_facade 28from hci.facade import facade_pb2 as hci_facade 29from bluetooth_packets_python3 import hci_packets 30import bluetooth_packets_python3 as bt_packets 31 32 33class DirectHciTest(GdFacadeOnlyBaseTestClass): 34 35 def setup_test(self): 36 self.device_under_test.rootservice.StartStack( 37 facade_rootservice.StartStackRequest( 38 module_under_test=facade_rootservice.BluetoothModule.Value( 39 'HCI'),)) 40 self.cert_device.rootservice.StartStack( 41 facade_rootservice.StartStackRequest( 42 module_under_test=facade_rootservice.BluetoothModule.Value( 43 'HAL'),)) 44 45 self.device_under_test.wait_channel_ready() 46 self.cert_device.wait_channel_ready() 47 48 self.cert_device.hal.SendHciCommand( 49 hal_facade.HciCommandPacket( 50 payload=bytes(hci_packets.ResetBuilder().Serialize()))) 51 52 def teardown_test(self): 53 self.device_under_test.rootservice.StopStack( 54 facade_rootservice.StopStackRequest()) 55 self.cert_device.rootservice.StopStack( 56 facade_rootservice.StopStackRequest()) 57 58 def register_for_event(self, event_code): 59 msg = hci_facade.EventCodeMsg(code=int(event_code)) 60 self.device_under_test.hci.RegisterEventHandler(msg) 61 62 def register_for_le_event(self, event_code): 63 msg = hci_facade.LeSubeventCodeMsg(code=int(event_code)) 64 self.device_under_test.hci.RegisterLeEventHandler(msg) 65 66 def enqueue_hci_command(self, command, expect_complete): 67 cmd_bytes = bytes(command.Serialize()) 68 cmd = hci_facade.CommandMsg(command=cmd_bytes) 69 if (expect_complete): 70 self.device_under_test.hci.EnqueueCommandWithComplete(cmd) 71 else: 72 self.device_under_test.hci.EnqueueCommandWithStatus(cmd) 73 74 def send_hal_hci_command(self, command): 75 self.cert_device.hal.SendHciCommand( 76 hal_facade.HciCommandPacket(payload=bytes(command.Serialize()))) 77 78 def enqueue_acl_data(self, handle, pb_flag, b_flag, acl): 79 acl_msg = hci_facade.AclMsg( 80 handle=int(handle), 81 packet_boundary_flag=int(pb_flag), 82 broadcast_flag=int(b_flag), 83 data=acl) 84 self.device_under_test.hci.SendAclData(acl_msg) 85 86 def send_hal_acl_data(self, handle, pb_flag, b_flag, acl): 87 lower = handle & 0xff 88 upper = (handle >> 8) & 0xf 89 upper = upper | int(pb_flag) & 0x3 90 upper = upper | ((int(b_flag) & 0x3) << 2) 91 lower_length = len(acl) & 0xff 92 upper_length = (len(acl) & 0xff00) >> 8 93 concatenated = bytes([lower, upper, lower_length, upper_length] + 94 list(acl)) 95 self.cert_device.hal.SendHciAcl( 96 hal_facade.HciAclPacket(payload=concatenated)) 97 98 def test_local_hci_cmd_and_event(self): 99 # Loopback mode responds with ACL and SCO connection complete 100 self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE) 101 self.register_for_event(hci_packets.EventCode.LOOPBACK_COMMAND) 102 self.register_for_event( 103 hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) 104 with EventCallbackStream( 105 self.device_under_test.hci.FetchEvents( 106 empty_proto.Empty())) as hci_event_stream: 107 hci_event_asserts = EventAsserts(hci_event_stream) 108 109 self.enqueue_hci_command( 110 hci_packets.WriteLoopbackModeBuilder( 111 hci_packets.LoopbackMode.ENABLE_LOCAL), True) 112 113 cmd2loop = hci_packets.ReadLocalNameBuilder() 114 self.enqueue_hci_command(cmd2loop, True) 115 116 looped_bytes = bytes(cmd2loop.Serialize()) 117 hci_event_asserts.assert_event_occurs( 118 lambda packet: looped_bytes in packet.event) 119 120 def test_inquiry_from_dut(self): 121 self.register_for_event(hci_packets.EventCode.INQUIRY_RESULT) 122 with EventCallbackStream( 123 self.device_under_test.hci.FetchEvents( 124 empty_proto.Empty())) as hci_event_stream: 125 126 hci_event_asserts = EventAsserts(hci_event_stream) 127 128 self.send_hal_hci_command( 129 hci_packets.WriteScanEnableBuilder( 130 hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) 131 lap = hci_packets.Lap() 132 lap.lap = 0x33 133 self.enqueue_hci_command( 134 hci_packets.InquiryBuilder(lap, 0x30, 0xff), False) 135 hci_event_asserts.assert_event_occurs( 136 lambda packet: b'\x02\x0f' in packet.event 137 # Expecting an HCI Event (code 0x02, length 0x0f) 138 ) 139 140 def test_le_ad_scan_cert_advertises(self): 141 self.register_for_le_event( 142 hci_packets.SubeventCode.EXTENDED_ADVERTISING_REPORT) 143 self.register_for_le_event(hci_packets.SubeventCode.ADVERTISING_REPORT) 144 with EventCallbackStream( 145 self.device_under_test.hci.FetchLeSubevents( 146 empty_proto.Empty())) as hci_le_event_stream: 147 148 hci_event_asserts = EventAsserts(hci_le_event_stream) 149 150 # DUT Scans 151 self.enqueue_hci_command( 152 hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01'), 153 True) 154 phy_scan_params = hci_packets.PhyScanParameters() 155 phy_scan_params.le_scan_interval = 6553 156 phy_scan_params.le_scan_window = 6553 157 phy_scan_params.le_scan_type = hci_packets.LeScanType.ACTIVE 158 159 self.enqueue_hci_command( 160 hci_packets.LeSetExtendedScanParametersBuilder( 161 hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, 162 hci_packets.LeSetScanningFilterPolicy.ACCEPT_ALL, 1, 163 [phy_scan_params]), True) 164 self.enqueue_hci_command( 165 hci_packets.LeSetExtendedScanEnableBuilder( 166 hci_packets.Enable.ENABLED, 167 hci_packets.FilterDuplicates.DISABLED, 0, 0), True) 168 169 # CERT Advertises 170 advertising_handle = 0 171 self.send_hal_hci_command( 172 hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder( 173 advertising_handle, 174 hci_packets.LegacyAdvertisingProperties.ADV_IND, 175 512, 176 768, 177 7, 178 hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 179 hci_packets.PeerAddressType. 180 PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, 181 'A6:A5:A4:A3:A2:A1', 182 hci_packets.AdvertisingFilterPolicy.ALL_DEVICES, 183 0xF7, 184 1, # SID 185 hci_packets.Enable.DISABLED # Scan request notification 186 )) 187 188 self.send_hal_hci_command( 189 hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder( 190 advertising_handle, '0C:05:04:03:02:01')) 191 gap_name = hci_packets.GapData() 192 gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME 193 gap_name.data = list(bytes(b'Im_A_Cert!')) # TODO: Fix and remove ! 194 195 self.send_hal_hci_command( 196 hci_packets.LeSetExtendedAdvertisingDataBuilder( 197 advertising_handle, 198 hci_packets.Operation.COMPLETE_ADVERTISEMENT, 199 hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, 200 [gap_name])) 201 202 gap_short_name = hci_packets.GapData() 203 gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME 204 gap_short_name.data = list(bytes(b'Im_A_C')) 205 206 self.send_hal_hci_command( 207 hci_packets.LeSetExtendedAdvertisingScanResponseBuilder( 208 advertising_handle, 209 hci_packets.Operation.COMPLETE_ADVERTISEMENT, 210 hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, 211 [gap_short_name])) 212 213 enabled_set = hci_packets.EnabledSet() 214 enabled_set.advertising_handle = 0 215 enabled_set.duration = 0 216 enabled_set.max_extended_advertising_events = 0 217 self.send_hal_hci_command( 218 hci_packets.LeSetExtendedAdvertisingEnableBuilder( 219 hci_packets.Enable.ENABLED, [enabled_set])) 220 221 hci_event_asserts.assert_event_occurs( 222 lambda packet: b'Im_A_Cert' in packet.event) 223 224 self.send_hal_hci_command( 225 hci_packets.LeSetExtendedAdvertisingEnableBuilder( 226 hci_packets.Enable.DISABLED, [enabled_set])) 227 self.enqueue_hci_command( 228 hci_packets.LeSetExtendedScanEnableBuilder( 229 hci_packets.Enable.DISABLED, 230 hci_packets.FilterDuplicates.DISABLED, 0, 0), True) 231 232 def test_le_connection_dut_advertises(self): 233 self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE) 234 with EventCallbackStream(self.device_under_test.hci.FetchLeSubevents(empty_proto.Empty())) as le_event_stream, \ 235 EventCallbackStream(self.device_under_test.hci.FetchEvents(empty_proto.Empty())) as event_stream, \ 236 EventCallbackStream(self.device_under_test.hci.FetchAclPackets(empty_proto.Empty())) as acl_data_stream, \ 237 EventCallbackStream(self.cert_device.hal.FetchHciEvent(empty_proto.Empty())) as cert_hci_event_stream, \ 238 EventCallbackStream(self.cert_device.hal.FetchHciAcl(empty_proto.Empty())) as cert_acl_data_stream: 239 240 le_event_asserts = EventAsserts(le_event_stream) 241 event_asserts = EventAsserts(event_stream) 242 cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) 243 acl_data_asserts = EventAsserts(acl_data_stream) 244 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 245 246 # Cert Connects 247 self.send_hal_hci_command( 248 hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01')) 249 phy_scan_params = hci_packets.LeCreateConnPhyScanParameters() 250 phy_scan_params.scan_interval = 0x60 251 phy_scan_params.scan_window = 0x30 252 phy_scan_params.conn_interval_min = 0x18 253 phy_scan_params.conn_interval_max = 0x28 254 phy_scan_params.conn_latency = 0 255 phy_scan_params.supervision_timeout = 0x1f4 256 phy_scan_params.min_ce_length = 0 257 phy_scan_params.max_ce_length = 0 258 self.send_hal_hci_command( 259 hci_packets.LeExtendedCreateConnectionBuilder( 260 hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, 261 hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 262 hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, 263 '0D:05:04:03:02:01', 1, [phy_scan_params])) 264 265 # DUT Advertises 266 advertising_handle = 0 267 self.enqueue_hci_command( 268 hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder( 269 advertising_handle, 270 hci_packets.LegacyAdvertisingProperties.ADV_IND, 271 400, 272 450, 273 7, 274 hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 275 hci_packets.PeerAddressType. 276 PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, 277 '00:00:00:00:00:00', 278 hci_packets.AdvertisingFilterPolicy.ALL_DEVICES, 279 0xF8, 280 1, #SID 281 hci_packets.Enable.DISABLED # Scan request notification 282 ), 283 True) 284 285 self.enqueue_hci_command( 286 hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder( 287 advertising_handle, '0D:05:04:03:02:01'), True) 288 289 gap_name = hci_packets.GapData() 290 gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME 291 gap_name.data = list( 292 bytes(b'Im_The_DUT!')) # TODO: Fix and remove ! 293 294 self.enqueue_hci_command( 295 hci_packets.LeSetExtendedAdvertisingDataBuilder( 296 advertising_handle, 297 hci_packets.Operation.COMPLETE_ADVERTISEMENT, 298 hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, 299 [gap_name]), True) 300 301 gap_short_name = hci_packets.GapData() 302 gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME 303 gap_short_name.data = list(bytes(b'Im_The_D')) 304 305 self.enqueue_hci_command( 306 hci_packets.LeSetExtendedAdvertisingScanResponseBuilder( 307 advertising_handle, 308 hci_packets.Operation.COMPLETE_ADVERTISEMENT, 309 hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, 310 [gap_short_name]), True) 311 312 enabled_set = hci_packets.EnabledSet() 313 enabled_set.advertising_handle = advertising_handle 314 enabled_set.duration = 0 315 enabled_set.max_extended_advertising_events = 0 316 self.enqueue_hci_command( 317 hci_packets.LeSetExtendedAdvertisingEnableBuilder( 318 hci_packets.Enable.ENABLED, [enabled_set]), True) 319 320 # Check for success of Enable 321 event_asserts.assert_event_occurs( 322 lambda packet: b'\x0e\x04\x01\x39\x20\x00' in packet.event) 323 324 conn_handle = 0xfff 325 326 def event_handle(packet): 327 packet_bytes = packet.event 328 if b'\x3e\x13\x01\x00' in packet_bytes: 329 nonlocal conn_handle 330 cc_view = hci_packets.LeConnectionCompleteView( 331 hci_packets.LeMetaEventView( 332 hci_packets.EventPacketView( 333 bt_packets.PacketViewLittleEndian( 334 list(packet_bytes))))) 335 conn_handle = cc_view.GetConnectionHandle() 336 return True 337 return False 338 339 def payload_handle(packet): 340 packet_bytes = packet.payload 341 if b'\x3e\x13\x01\x00' in packet_bytes: 342 nonlocal conn_handle 343 cc_view = hci_packets.LeConnectionCompleteView( 344 hci_packets.LeMetaEventView( 345 hci_packets.EventPacketView( 346 bt_packets.PacketViewLittleEndian( 347 list(packet_bytes))))) 348 conn_handle = cc_view.GetConnectionHandle() 349 return True 350 return False 351 352 cert_hci_event_asserts.assert_event_occurs(payload_handle) 353 cert_handle = conn_handle 354 conn_handle = 0xfff 355 le_event_asserts.assert_event_occurs(event_handle) 356 dut_handle = conn_handle 357 if dut_handle == 0xfff: 358 logging.warning("Failed to get the DUT handle") 359 return False 360 if cert_handle == 0xfff: 361 logging.warning("Failed to get the CERT handle") 362 return False 363 364 # Send ACL Data 365 self.enqueue_acl_data( 366 dut_handle, hci_packets.PacketBoundaryFlag. 367 FIRST_NON_AUTOMATICALLY_FLUSHABLE, 368 hci_packets.BroadcastFlag.POINT_TO_POINT, 369 bytes(b'Just SomeAclData')) 370 self.send_hal_acl_data( 371 cert_handle, hci_packets.PacketBoundaryFlag. 372 FIRST_NON_AUTOMATICALLY_FLUSHABLE, 373 hci_packets.BroadcastFlag.POINT_TO_POINT, 374 bytes(b'Just SomeMoreAclData')) 375 376 cert_acl_data_asserts.assert_event_occurs( 377 lambda packet: logging.debug(packet.payload) or b'SomeAclData' in packet.payload 378 ) 379 acl_data_asserts.assert_event_occurs( 380 lambda packet: logging.debug(packet.data) or b'SomeMoreAclData' in packet.data 381 ) 382 383 def test_le_white_list_connection_cert_advertises(self): 384 self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE) 385 with EventCallbackStream(self.device_under_test.hci.FetchLeSubevents(empty_proto.Empty())) as le_event_stream, \ 386 EventCallbackStream(self.cert_device.hal.FetchHciEvent(empty_proto.Empty())) as cert_hci_event_stream: 387 le_event_asserts = EventAsserts(le_event_stream) 388 cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) 389 390 # DUT Connects 391 self.enqueue_hci_command( 392 hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01'), 393 True) 394 self.enqueue_hci_command( 395 hci_packets.LeAddDeviceToWhiteListBuilder( 396 hci_packets.WhiteListAddressType.RANDOM, 397 '0C:05:04:03:02:01'), True) 398 phy_scan_params = hci_packets.LeCreateConnPhyScanParameters() 399 phy_scan_params.scan_interval = 0x60 400 phy_scan_params.scan_window = 0x30 401 phy_scan_params.conn_interval_min = 0x18 402 phy_scan_params.conn_interval_max = 0x28 403 phy_scan_params.conn_latency = 0 404 phy_scan_params.supervision_timeout = 0x1f4 405 phy_scan_params.min_ce_length = 0 406 phy_scan_params.max_ce_length = 0 407 self.enqueue_hci_command( 408 hci_packets.LeExtendedCreateConnectionBuilder( 409 hci_packets.InitiatorFilterPolicy.USE_WHITE_LIST, 410 hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 411 hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, 412 'BA:D5:A4:A3:A2:A1', 1, [phy_scan_params]), False) 413 414 # CERT Advertises 415 advertising_handle = 1 416 self.send_hal_hci_command( 417 hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder( 418 advertising_handle, 419 hci_packets.LegacyAdvertisingProperties.ADV_IND, 420 512, 421 768, 422 7, 423 hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 424 hci_packets.PeerAddressType. 425 PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, 426 'A6:A5:A4:A3:A2:A1', 427 hci_packets.AdvertisingFilterPolicy.ALL_DEVICES, 428 0x7F, 429 0, # SID 430 hci_packets.Enable.DISABLED # Scan request notification 431 )) 432 433 self.send_hal_hci_command( 434 hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder( 435 advertising_handle, '0C:05:04:03:02:01')) 436 437 gap_name = hci_packets.GapData() 438 gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME 439 gap_name.data = list(bytes(b'Im_A_Cert!')) # TODO: Fix and remove ! 440 441 self.send_hal_hci_command( 442 hci_packets.LeSetExtendedAdvertisingDataBuilder( 443 advertising_handle, 444 hci_packets.Operation.COMPLETE_ADVERTISEMENT, 445 hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, 446 [gap_name])) 447 enabled_set = hci_packets.EnabledSet() 448 enabled_set.advertising_handle = 1 449 enabled_set.duration = 0 450 enabled_set.max_extended_advertising_events = 0 451 self.send_hal_hci_command( 452 hci_packets.LeSetExtendedAdvertisingEnableBuilder( 453 hci_packets.Enable.ENABLED, [enabled_set])) 454 455 # LeConnectionComplete 456 cert_hci_event_asserts.assert_event_occurs( 457 lambda packet: b'\x3e\x13\x01\x00' in packet.payload, 458 timeout=timedelta(seconds=20)) 459 le_event_asserts.assert_event_occurs( 460 lambda packet: b'\x3e\x13\x01\x00' in packet.event) 461 462 def test_connection_dut_connects(self): 463 self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE) 464 self.register_for_event( 465 hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) 466 self.enqueue_hci_command( 467 hci_packets.WritePageTimeoutBuilder(0x4000), True) 468 with EventCallbackStream(self.device_under_test.hci.FetchEvents(empty_proto.Empty())) as hci_event_stream, \ 469 EventCallbackStream(self.device_under_test.hci.FetchAclPackets(empty_proto.Empty())) as acl_data_stream, \ 470 EventCallbackStream(self.cert_device.hal.FetchHciEvent(empty_proto.Empty())) as cert_hci_event_stream, \ 471 EventCallbackStream(self.cert_device.hal.FetchHciAcl(empty_proto.Empty())) as cert_acl_data_stream: 472 473 cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) 474 hci_event_asserts = EventAsserts(hci_event_stream) 475 acl_data_asserts = EventAsserts(acl_data_stream) 476 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 477 478 address = hci_packets.Address() 479 480 def get_address_from_complete(packet): 481 packet_bytes = packet.payload 482 if b'\x0e\x0a\x01\x09\x10' in packet_bytes: 483 nonlocal address 484 addr_view = hci_packets.ReadBdAddrCompleteView( 485 hci_packets.CommandCompleteView( 486 hci_packets.EventPacketView( 487 bt_packets.PacketViewLittleEndian( 488 list(packet_bytes))))) 489 address = addr_view.GetBdAddr() 490 return True 491 return False 492 493 # CERT Enables scans and gets its address 494 self.send_hal_hci_command(hci_packets.ReadBdAddrBuilder()) 495 496 cert_hci_event_asserts.assert_event_occurs( 497 get_address_from_complete) 498 self.send_hal_hci_command( 499 hci_packets.WriteScanEnableBuilder( 500 hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) 501 502 # DUT Connects 503 self.enqueue_hci_command( 504 hci_packets.CreateConnectionBuilder( 505 address, 506 0xcc18, # Packet Type 507 hci_packets.PageScanRepetitionMode.R0, 508 0, 509 hci_packets.ClockOffsetValid.INVALID, 510 hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH), 511 False) 512 513 # Cert Accepts 514 connection_request = None 515 516 def get_connect_request(packet): 517 if b'\x04\x0a' in packet.payload: 518 nonlocal connection_request 519 connection_request = hci_packets.ConnectionRequestView( 520 hci_packets.EventPacketView( 521 bt_packets.PacketViewLittleEndian( 522 list(packet.payload)))) 523 return True 524 return False 525 526 # Cert Accepts 527 cert_hci_event_asserts.assert_event_occurs( 528 get_connect_request, timeout=timedelta(seconds=20)) 529 self.send_hal_hci_command( 530 hci_packets.AcceptConnectionRequestBuilder( 531 connection_request.GetBdAddr(), 532 hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE)) 533 534 conn_handle = 0xfff 535 536 def get_handle(packet_bytes): 537 if b'\x03\x0b\x00' in packet_bytes: 538 nonlocal conn_handle 539 cc_view = hci_packets.ConnectionCompleteView( 540 hci_packets.EventPacketView( 541 bt_packets.PacketViewLittleEndian( 542 list(packet_bytes)))) 543 conn_handle = cc_view.GetConnectionHandle() 544 return True 545 return False 546 547 def event_handle(packet): 548 packet_bytes = packet.event 549 return get_handle(packet_bytes) 550 551 def payload_handle(packet): 552 packet_bytes = packet.payload 553 return get_handle(packet_bytes) 554 555 cert_hci_event_asserts.assert_event_occurs(payload_handle) 556 cert_handle = conn_handle 557 conn_handle = 0xfff 558 hci_event_asserts.assert_event_occurs(event_handle) 559 dut_handle = conn_handle 560 if dut_handle == 0xfff: 561 logging.warning("Failed to get the DUT handle") 562 return False 563 if cert_handle == 0xfff: 564 logging.warning("Failed to get the CERT handle") 565 return False 566 567 # Send ACL Data 568 self.enqueue_acl_data( 569 dut_handle, hci_packets.PacketBoundaryFlag. 570 FIRST_NON_AUTOMATICALLY_FLUSHABLE, 571 hci_packets.BroadcastFlag.POINT_TO_POINT, 572 bytes(b'Just SomeAclData')) 573 self.send_hal_acl_data( 574 cert_handle, hci_packets.PacketBoundaryFlag. 575 FIRST_NON_AUTOMATICALLY_FLUSHABLE, 576 hci_packets.BroadcastFlag.POINT_TO_POINT, 577 bytes(b'Just SomeMoreAclData')) 578 579 cert_acl_data_asserts.assert_event_occurs( 580 lambda packet: b'SomeAclData' in packet.payload) 581 acl_data_asserts.assert_event_occurs( 582 lambda packet: b'SomeMoreAclData' in packet.data) 583 584 def test_connection_cert_connects(self): 585 self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE) 586 self.register_for_event( 587 hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) 588 self.register_for_event(hci_packets.EventCode.CONNECTION_REQUEST) 589 self.send_hal_hci_command(hci_packets.WritePageTimeoutBuilder(0x4000)) 590 with EventCallbackStream(self.device_under_test.hci.FetchEvents(empty_proto.Empty())) as hci_event_stream, \ 591 EventCallbackStream(self.device_under_test.hci.FetchAclPackets(empty_proto.Empty())) as acl_data_stream, \ 592 EventCallbackStream(self.cert_device.hal.FetchHciEvent(empty_proto.Empty())) as cert_hci_event_stream, \ 593 EventCallbackStream(self.cert_device.hal.FetchHciAcl(empty_proto.Empty())) as cert_acl_data_stream: 594 595 hci_event_asserts = EventAsserts(hci_event_stream) 596 cert_hci_event_asserts = EventAsserts(cert_hci_event_stream) 597 acl_data_asserts = EventAsserts(acl_data_stream) 598 cert_acl_data_asserts = EventAsserts(cert_acl_data_stream) 599 600 address = hci_packets.Address() 601 602 def get_address_from_complete(packet): 603 packet_bytes = packet.event 604 if b'\x0e\x0a\x01\x09\x10' in packet_bytes: 605 nonlocal address 606 addr_view = hci_packets.ReadBdAddrCompleteView( 607 hci_packets.CommandCompleteView( 608 hci_packets.EventPacketView( 609 bt_packets.PacketViewLittleEndian( 610 list(packet_bytes))))) 611 address = addr_view.GetBdAddr() 612 return True 613 return False 614 615 # DUT Enables scans and gets its address 616 self.enqueue_hci_command( 617 hci_packets.WriteScanEnableBuilder( 618 hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True) 619 self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True) 620 621 hci_event_asserts.assert_event_occurs(get_address_from_complete) 622 623 # Cert Connects 624 self.send_hal_hci_command( 625 hci_packets.CreateConnectionBuilder( 626 address, 627 0xcc18, # Packet Type 628 hci_packets.PageScanRepetitionMode.R0, 629 0, 630 hci_packets.ClockOffsetValid.INVALID, 631 hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH)) 632 633 # DUT Accepts 634 connection_request = None 635 636 def get_connect_request(packet): 637 if b'\x04\x0a' in packet.event: 638 nonlocal connection_request 639 connection_request = hci_packets.ConnectionRequestView( 640 hci_packets.EventPacketView( 641 bt_packets.PacketViewLittleEndian( 642 list(packet.event)))) 643 return True 644 return False 645 646 hci_event_asserts.assert_event_occurs( 647 get_connect_request, timeout=timedelta(seconds=20)) 648 self.enqueue_hci_command( 649 hci_packets.AcceptConnectionRequestBuilder( 650 connection_request.GetBdAddr(), 651 hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE), 652 False) 653 654 conn_handle = 0xfff 655 656 def get_handle(packet_bytes): 657 if b'\x03\x0b\x00' in packet_bytes: 658 nonlocal conn_handle 659 cc_view = hci_packets.ConnectionCompleteView( 660 hci_packets.EventPacketView( 661 bt_packets.PacketViewLittleEndian( 662 list(packet_bytes)))) 663 conn_handle = cc_view.GetConnectionHandle() 664 return True 665 return False 666 667 def event_handle(packet): 668 packet_bytes = packet.event 669 return get_handle(packet_bytes) 670 671 def payload_handle(packet): 672 packet_bytes = packet.payload 673 return get_handle(packet_bytes) 674 675 cert_hci_event_asserts.assert_event_occurs(payload_handle) 676 cert_handle = conn_handle 677 conn_handle = 0xfff 678 hci_event_asserts.assert_event_occurs(event_handle) 679 dut_handle = conn_handle 680 if dut_handle == 0xfff: 681 logging.warning("Failed to get the DUT handle") 682 return False 683 if cert_handle == 0xfff: 684 logging.warning("Failed to get the CERT handle") 685 return False 686 687 # Send ACL Data 688 self.enqueue_acl_data( 689 dut_handle, hci_packets.PacketBoundaryFlag. 690 FIRST_NON_AUTOMATICALLY_FLUSHABLE, 691 hci_packets.BroadcastFlag.POINT_TO_POINT, 692 bytes(b'This is just SomeAclData')) 693 self.send_hal_acl_data( 694 cert_handle, hci_packets.PacketBoundaryFlag. 695 FIRST_NON_AUTOMATICALLY_FLUSHABLE, 696 hci_packets.BroadcastFlag.POINT_TO_POINT, 697 bytes(b'This is just SomeMoreAclData')) 698 699 cert_acl_data_asserts.assert_event_occurs( 700 lambda packet: b'SomeAclData' in packet.payload) 701 acl_data_asserts.assert_event_occurs( 702 lambda packet: b'SomeMoreAclData' in packet.data) 703