1#!/usr/bin/env python3
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from datetime import timedelta
18import os
19import sys
20import logging
21
22from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass
23from cert.event_callback_stream import EventCallbackStream
24from cert.event_asserts import EventAsserts
25from google.protobuf import empty_pb2 as empty_proto
26from facade import rootservice_pb2 as facade_rootservice
27from hal import facade_pb2 as hal_facade
28from hci.facade import facade_pb2 as hci_facade
29from bluetooth_packets_python3 import hci_packets
30import bluetooth_packets_python3 as bt_packets
31
32
33class DirectHciTest(GdFacadeOnlyBaseTestClass):
34
35    def setup_test(self):
36        self.device_under_test.rootservice.StartStack(
37            facade_rootservice.StartStackRequest(
38                module_under_test=facade_rootservice.BluetoothModule.Value(
39                    'HCI'),))
40        self.cert_device.rootservice.StartStack(
41            facade_rootservice.StartStackRequest(
42                module_under_test=facade_rootservice.BluetoothModule.Value(
43                    'HAL'),))
44
45        self.device_under_test.wait_channel_ready()
46        self.cert_device.wait_channel_ready()
47
48        self.cert_device.hal.SendHciCommand(
49            hal_facade.HciCommandPacket(
50                payload=bytes(hci_packets.ResetBuilder().Serialize())))
51
52    def teardown_test(self):
53        self.device_under_test.rootservice.StopStack(
54            facade_rootservice.StopStackRequest())
55        self.cert_device.rootservice.StopStack(
56            facade_rootservice.StopStackRequest())
57
58    def register_for_event(self, event_code):
59        msg = hci_facade.EventCodeMsg(code=int(event_code))
60        self.device_under_test.hci.RegisterEventHandler(msg)
61
62    def register_for_le_event(self, event_code):
63        msg = hci_facade.LeSubeventCodeMsg(code=int(event_code))
64        self.device_under_test.hci.RegisterLeEventHandler(msg)
65
66    def enqueue_hci_command(self, command, expect_complete):
67        cmd_bytes = bytes(command.Serialize())
68        cmd = hci_facade.CommandMsg(command=cmd_bytes)
69        if (expect_complete):
70            self.device_under_test.hci.EnqueueCommandWithComplete(cmd)
71        else:
72            self.device_under_test.hci.EnqueueCommandWithStatus(cmd)
73
74    def send_hal_hci_command(self, command):
75        self.cert_device.hal.SendHciCommand(
76            hal_facade.HciCommandPacket(payload=bytes(command.Serialize())))
77
78    def enqueue_acl_data(self, handle, pb_flag, b_flag, acl):
79        acl_msg = hci_facade.AclMsg(
80            handle=int(handle),
81            packet_boundary_flag=int(pb_flag),
82            broadcast_flag=int(b_flag),
83            data=acl)
84        self.device_under_test.hci.SendAclData(acl_msg)
85
86    def send_hal_acl_data(self, handle, pb_flag, b_flag, acl):
87        lower = handle & 0xff
88        upper = (handle >> 8) & 0xf
89        upper = upper | int(pb_flag) & 0x3
90        upper = upper | ((int(b_flag) & 0x3) << 2)
91        lower_length = len(acl) & 0xff
92        upper_length = (len(acl) & 0xff00) >> 8
93        concatenated = bytes([lower, upper, lower_length, upper_length] +
94                             list(acl))
95        self.cert_device.hal.SendHciAcl(
96            hal_facade.HciAclPacket(payload=concatenated))
97
98    def test_local_hci_cmd_and_event(self):
99        # Loopback mode responds with ACL and SCO connection complete
100        self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE)
101        self.register_for_event(hci_packets.EventCode.LOOPBACK_COMMAND)
102        self.register_for_event(
103            hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
104        with EventCallbackStream(
105                self.device_under_test.hci.FetchEvents(
106                    empty_proto.Empty())) as hci_event_stream:
107            hci_event_asserts = EventAsserts(hci_event_stream)
108
109            self.enqueue_hci_command(
110                hci_packets.WriteLoopbackModeBuilder(
111                    hci_packets.LoopbackMode.ENABLE_LOCAL), True)
112
113            cmd2loop = hci_packets.ReadLocalNameBuilder()
114            self.enqueue_hci_command(cmd2loop, True)
115
116            looped_bytes = bytes(cmd2loop.Serialize())
117            hci_event_asserts.assert_event_occurs(
118                lambda packet: looped_bytes in packet.event)
119
120    def test_inquiry_from_dut(self):
121        self.register_for_event(hci_packets.EventCode.INQUIRY_RESULT)
122        with EventCallbackStream(
123                self.device_under_test.hci.FetchEvents(
124                    empty_proto.Empty())) as hci_event_stream:
125
126            hci_event_asserts = EventAsserts(hci_event_stream)
127
128            self.send_hal_hci_command(
129                hci_packets.WriteScanEnableBuilder(
130                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))
131            lap = hci_packets.Lap()
132            lap.lap = 0x33
133            self.enqueue_hci_command(
134                hci_packets.InquiryBuilder(lap, 0x30, 0xff), False)
135            hci_event_asserts.assert_event_occurs(
136                lambda packet: b'\x02\x0f' in packet.event
137                # Expecting an HCI Event (code 0x02, length 0x0f)
138            )
139
140    def test_le_ad_scan_cert_advertises(self):
141        self.register_for_le_event(
142            hci_packets.SubeventCode.EXTENDED_ADVERTISING_REPORT)
143        self.register_for_le_event(hci_packets.SubeventCode.ADVERTISING_REPORT)
144        with EventCallbackStream(
145                self.device_under_test.hci.FetchLeSubevents(
146                    empty_proto.Empty())) as hci_le_event_stream:
147
148            hci_event_asserts = EventAsserts(hci_le_event_stream)
149
150            # DUT Scans
151            self.enqueue_hci_command(
152                hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01'),
153                True)
154            phy_scan_params = hci_packets.PhyScanParameters()
155            phy_scan_params.le_scan_interval = 6553
156            phy_scan_params.le_scan_window = 6553
157            phy_scan_params.le_scan_type = hci_packets.LeScanType.ACTIVE
158
159            self.enqueue_hci_command(
160                hci_packets.LeSetExtendedScanParametersBuilder(
161                    hci_packets.AddressType.RANDOM_DEVICE_ADDRESS,
162                    hci_packets.LeSetScanningFilterPolicy.ACCEPT_ALL, 1,
163                    [phy_scan_params]), True)
164            self.enqueue_hci_command(
165                hci_packets.LeSetExtendedScanEnableBuilder(
166                    hci_packets.Enable.ENABLED,
167                    hci_packets.FilterDuplicates.DISABLED, 0, 0), True)
168
169            # CERT Advertises
170            advertising_handle = 0
171            self.send_hal_hci_command(
172                hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
173                    advertising_handle,
174                    hci_packets.LegacyAdvertisingProperties.ADV_IND,
175                    512,
176                    768,
177                    7,
178                    hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
179                    hci_packets.PeerAddressType.
180                    PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
181                    'A6:A5:A4:A3:A2:A1',
182                    hci_packets.AdvertisingFilterPolicy.ALL_DEVICES,
183                    0xF7,
184                    1,  # SID
185                    hci_packets.Enable.DISABLED  # Scan request notification
186                ))
187
188            self.send_hal_hci_command(
189                hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(
190                    advertising_handle, '0C:05:04:03:02:01'))
191            gap_name = hci_packets.GapData()
192            gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
193            gap_name.data = list(bytes(b'Im_A_Cert!'))  # TODO: Fix and remove !
194
195            self.send_hal_hci_command(
196                hci_packets.LeSetExtendedAdvertisingDataBuilder(
197                    advertising_handle,
198                    hci_packets.Operation.COMPLETE_ADVERTISEMENT,
199                    hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT,
200                    [gap_name]))
201
202            gap_short_name = hci_packets.GapData()
203            gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME
204            gap_short_name.data = list(bytes(b'Im_A_C'))
205
206            self.send_hal_hci_command(
207                hci_packets.LeSetExtendedAdvertisingScanResponseBuilder(
208                    advertising_handle,
209                    hci_packets.Operation.COMPLETE_ADVERTISEMENT,
210                    hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT,
211                    [gap_short_name]))
212
213            enabled_set = hci_packets.EnabledSet()
214            enabled_set.advertising_handle = 0
215            enabled_set.duration = 0
216            enabled_set.max_extended_advertising_events = 0
217            self.send_hal_hci_command(
218                hci_packets.LeSetExtendedAdvertisingEnableBuilder(
219                    hci_packets.Enable.ENABLED, [enabled_set]))
220
221            hci_event_asserts.assert_event_occurs(
222                lambda packet: b'Im_A_Cert' in packet.event)
223
224            self.send_hal_hci_command(
225                hci_packets.LeSetExtendedAdvertisingEnableBuilder(
226                    hci_packets.Enable.DISABLED, [enabled_set]))
227            self.enqueue_hci_command(
228                hci_packets.LeSetExtendedScanEnableBuilder(
229                    hci_packets.Enable.DISABLED,
230                    hci_packets.FilterDuplicates.DISABLED, 0, 0), True)
231
232    def test_le_connection_dut_advertises(self):
233        self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE)
234        with EventCallbackStream(self.device_under_test.hci.FetchLeSubevents(empty_proto.Empty())) as le_event_stream, \
235            EventCallbackStream(self.device_under_test.hci.FetchEvents(empty_proto.Empty())) as event_stream, \
236            EventCallbackStream(self.device_under_test.hci.FetchAclPackets(empty_proto.Empty())) as acl_data_stream, \
237            EventCallbackStream(self.cert_device.hal.FetchHciEvent(empty_proto.Empty())) as cert_hci_event_stream, \
238            EventCallbackStream(self.cert_device.hal.FetchHciAcl(empty_proto.Empty())) as cert_acl_data_stream:
239
240            le_event_asserts = EventAsserts(le_event_stream)
241            event_asserts = EventAsserts(event_stream)
242            cert_hci_event_asserts = EventAsserts(cert_hci_event_stream)
243            acl_data_asserts = EventAsserts(acl_data_stream)
244            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
245
246            # Cert Connects
247            self.send_hal_hci_command(
248                hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01'))
249            phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
250            phy_scan_params.scan_interval = 0x60
251            phy_scan_params.scan_window = 0x30
252            phy_scan_params.conn_interval_min = 0x18
253            phy_scan_params.conn_interval_max = 0x28
254            phy_scan_params.conn_latency = 0
255            phy_scan_params.supervision_timeout = 0x1f4
256            phy_scan_params.min_ce_length = 0
257            phy_scan_params.max_ce_length = 0
258            self.send_hal_hci_command(
259                hci_packets.LeExtendedCreateConnectionBuilder(
260                    hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS,
261                    hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
262                    hci_packets.AddressType.RANDOM_DEVICE_ADDRESS,
263                    '0D:05:04:03:02:01', 1, [phy_scan_params]))
264
265            # DUT Advertises
266            advertising_handle = 0
267            self.enqueue_hci_command(
268                hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
269                    advertising_handle,
270                    hci_packets.LegacyAdvertisingProperties.ADV_IND,
271                    400,
272                    450,
273                    7,
274                    hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
275                    hci_packets.PeerAddressType.
276                    PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
277                    '00:00:00:00:00:00',
278                    hci_packets.AdvertisingFilterPolicy.ALL_DEVICES,
279                    0xF8,
280                    1,  #SID
281                    hci_packets.Enable.DISABLED  # Scan request notification
282                ),
283                True)
284
285            self.enqueue_hci_command(
286                hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(
287                    advertising_handle, '0D:05:04:03:02:01'), True)
288
289            gap_name = hci_packets.GapData()
290            gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
291            gap_name.data = list(
292                bytes(b'Im_The_DUT!'))  # TODO: Fix and remove !
293
294            self.enqueue_hci_command(
295                hci_packets.LeSetExtendedAdvertisingDataBuilder(
296                    advertising_handle,
297                    hci_packets.Operation.COMPLETE_ADVERTISEMENT,
298                    hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT,
299                    [gap_name]), True)
300
301            gap_short_name = hci_packets.GapData()
302            gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME
303            gap_short_name.data = list(bytes(b'Im_The_D'))
304
305            self.enqueue_hci_command(
306                hci_packets.LeSetExtendedAdvertisingScanResponseBuilder(
307                    advertising_handle,
308                    hci_packets.Operation.COMPLETE_ADVERTISEMENT,
309                    hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT,
310                    [gap_short_name]), True)
311
312            enabled_set = hci_packets.EnabledSet()
313            enabled_set.advertising_handle = advertising_handle
314            enabled_set.duration = 0
315            enabled_set.max_extended_advertising_events = 0
316            self.enqueue_hci_command(
317                hci_packets.LeSetExtendedAdvertisingEnableBuilder(
318                    hci_packets.Enable.ENABLED, [enabled_set]), True)
319
320            # Check for success of Enable
321            event_asserts.assert_event_occurs(
322                lambda packet: b'\x0e\x04\x01\x39\x20\x00' in packet.event)
323
324            conn_handle = 0xfff
325
326            def event_handle(packet):
327                packet_bytes = packet.event
328                if b'\x3e\x13\x01\x00' in packet_bytes:
329                    nonlocal conn_handle
330                    cc_view = hci_packets.LeConnectionCompleteView(
331                        hci_packets.LeMetaEventView(
332                            hci_packets.EventPacketView(
333                                bt_packets.PacketViewLittleEndian(
334                                    list(packet_bytes)))))
335                    conn_handle = cc_view.GetConnectionHandle()
336                    return True
337                return False
338
339            def payload_handle(packet):
340                packet_bytes = packet.payload
341                if b'\x3e\x13\x01\x00' in packet_bytes:
342                    nonlocal conn_handle
343                    cc_view = hci_packets.LeConnectionCompleteView(
344                        hci_packets.LeMetaEventView(
345                            hci_packets.EventPacketView(
346                                bt_packets.PacketViewLittleEndian(
347                                    list(packet_bytes)))))
348                    conn_handle = cc_view.GetConnectionHandle()
349                    return True
350                return False
351
352            cert_hci_event_asserts.assert_event_occurs(payload_handle)
353            cert_handle = conn_handle
354            conn_handle = 0xfff
355            le_event_asserts.assert_event_occurs(event_handle)
356            dut_handle = conn_handle
357            if dut_handle == 0xfff:
358                logging.warning("Failed to get the DUT handle")
359                return False
360            if cert_handle == 0xfff:
361                logging.warning("Failed to get the CERT handle")
362                return False
363
364            # Send ACL Data
365            self.enqueue_acl_data(
366                dut_handle, hci_packets.PacketBoundaryFlag.
367                FIRST_NON_AUTOMATICALLY_FLUSHABLE,
368                hci_packets.BroadcastFlag.POINT_TO_POINT,
369                bytes(b'Just SomeAclData'))
370            self.send_hal_acl_data(
371                cert_handle, hci_packets.PacketBoundaryFlag.
372                FIRST_NON_AUTOMATICALLY_FLUSHABLE,
373                hci_packets.BroadcastFlag.POINT_TO_POINT,
374                bytes(b'Just SomeMoreAclData'))
375
376            cert_acl_data_asserts.assert_event_occurs(
377                lambda packet: logging.debug(packet.payload) or b'SomeAclData' in packet.payload
378            )
379            acl_data_asserts.assert_event_occurs(
380                lambda packet: logging.debug(packet.data) or b'SomeMoreAclData' in packet.data
381            )
382
383    def test_le_white_list_connection_cert_advertises(self):
384        self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE)
385        with EventCallbackStream(self.device_under_test.hci.FetchLeSubevents(empty_proto.Empty())) as le_event_stream, \
386                EventCallbackStream(self.cert_device.hal.FetchHciEvent(empty_proto.Empty())) as cert_hci_event_stream:
387            le_event_asserts = EventAsserts(le_event_stream)
388            cert_hci_event_asserts = EventAsserts(cert_hci_event_stream)
389
390            # DUT Connects
391            self.enqueue_hci_command(
392                hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01'),
393                True)
394            self.enqueue_hci_command(
395                hci_packets.LeAddDeviceToWhiteListBuilder(
396                    hci_packets.WhiteListAddressType.RANDOM,
397                    '0C:05:04:03:02:01'), True)
398            phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
399            phy_scan_params.scan_interval = 0x60
400            phy_scan_params.scan_window = 0x30
401            phy_scan_params.conn_interval_min = 0x18
402            phy_scan_params.conn_interval_max = 0x28
403            phy_scan_params.conn_latency = 0
404            phy_scan_params.supervision_timeout = 0x1f4
405            phy_scan_params.min_ce_length = 0
406            phy_scan_params.max_ce_length = 0
407            self.enqueue_hci_command(
408                hci_packets.LeExtendedCreateConnectionBuilder(
409                    hci_packets.InitiatorFilterPolicy.USE_WHITE_LIST,
410                    hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
411                    hci_packets.AddressType.RANDOM_DEVICE_ADDRESS,
412                    'BA:D5:A4:A3:A2:A1', 1, [phy_scan_params]), False)
413
414            # CERT Advertises
415            advertising_handle = 1
416            self.send_hal_hci_command(
417                hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
418                    advertising_handle,
419                    hci_packets.LegacyAdvertisingProperties.ADV_IND,
420                    512,
421                    768,
422                    7,
423                    hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
424                    hci_packets.PeerAddressType.
425                    PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
426                    'A6:A5:A4:A3:A2:A1',
427                    hci_packets.AdvertisingFilterPolicy.ALL_DEVICES,
428                    0x7F,
429                    0,  # SID
430                    hci_packets.Enable.DISABLED  # Scan request notification
431                ))
432
433            self.send_hal_hci_command(
434                hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(
435                    advertising_handle, '0C:05:04:03:02:01'))
436
437            gap_name = hci_packets.GapData()
438            gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
439            gap_name.data = list(bytes(b'Im_A_Cert!'))  # TODO: Fix and remove !
440
441            self.send_hal_hci_command(
442                hci_packets.LeSetExtendedAdvertisingDataBuilder(
443                    advertising_handle,
444                    hci_packets.Operation.COMPLETE_ADVERTISEMENT,
445                    hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT,
446                    [gap_name]))
447            enabled_set = hci_packets.EnabledSet()
448            enabled_set.advertising_handle = 1
449            enabled_set.duration = 0
450            enabled_set.max_extended_advertising_events = 0
451            self.send_hal_hci_command(
452                hci_packets.LeSetExtendedAdvertisingEnableBuilder(
453                    hci_packets.Enable.ENABLED, [enabled_set]))
454
455            # LeConnectionComplete
456            cert_hci_event_asserts.assert_event_occurs(
457                lambda packet: b'\x3e\x13\x01\x00' in packet.payload,
458                timeout=timedelta(seconds=20))
459            le_event_asserts.assert_event_occurs(
460                lambda packet: b'\x3e\x13\x01\x00' in packet.event)
461
462    def test_connection_dut_connects(self):
463        self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE)
464        self.register_for_event(
465            hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
466        self.enqueue_hci_command(
467            hci_packets.WritePageTimeoutBuilder(0x4000), True)
468        with EventCallbackStream(self.device_under_test.hci.FetchEvents(empty_proto.Empty())) as hci_event_stream, \
469            EventCallbackStream(self.device_under_test.hci.FetchAclPackets(empty_proto.Empty())) as acl_data_stream, \
470            EventCallbackStream(self.cert_device.hal.FetchHciEvent(empty_proto.Empty())) as cert_hci_event_stream, \
471            EventCallbackStream(self.cert_device.hal.FetchHciAcl(empty_proto.Empty())) as cert_acl_data_stream:
472
473            cert_hci_event_asserts = EventAsserts(cert_hci_event_stream)
474            hci_event_asserts = EventAsserts(hci_event_stream)
475            acl_data_asserts = EventAsserts(acl_data_stream)
476            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
477
478            address = hci_packets.Address()
479
480            def get_address_from_complete(packet):
481                packet_bytes = packet.payload
482                if b'\x0e\x0a\x01\x09\x10' in packet_bytes:
483                    nonlocal address
484                    addr_view = hci_packets.ReadBdAddrCompleteView(
485                        hci_packets.CommandCompleteView(
486                            hci_packets.EventPacketView(
487                                bt_packets.PacketViewLittleEndian(
488                                    list(packet_bytes)))))
489                    address = addr_view.GetBdAddr()
490                    return True
491                return False
492
493            # CERT Enables scans and gets its address
494            self.send_hal_hci_command(hci_packets.ReadBdAddrBuilder())
495
496            cert_hci_event_asserts.assert_event_occurs(
497                get_address_from_complete)
498            self.send_hal_hci_command(
499                hci_packets.WriteScanEnableBuilder(
500                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))
501
502            # DUT Connects
503            self.enqueue_hci_command(
504                hci_packets.CreateConnectionBuilder(
505                    address,
506                    0xcc18,  # Packet Type
507                    hci_packets.PageScanRepetitionMode.R0,
508                    0,
509                    hci_packets.ClockOffsetValid.INVALID,
510                    hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH),
511                False)
512
513            # Cert Accepts
514            connection_request = None
515
516            def get_connect_request(packet):
517                if b'\x04\x0a' in packet.payload:
518                    nonlocal connection_request
519                    connection_request = hci_packets.ConnectionRequestView(
520                        hci_packets.EventPacketView(
521                            bt_packets.PacketViewLittleEndian(
522                                list(packet.payload))))
523                    return True
524                return False
525
526            # Cert Accepts
527            cert_hci_event_asserts.assert_event_occurs(
528                get_connect_request, timeout=timedelta(seconds=20))
529            self.send_hal_hci_command(
530                hci_packets.AcceptConnectionRequestBuilder(
531                    connection_request.GetBdAddr(),
532                    hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE))
533
534            conn_handle = 0xfff
535
536            def get_handle(packet_bytes):
537                if b'\x03\x0b\x00' in packet_bytes:
538                    nonlocal conn_handle
539                    cc_view = hci_packets.ConnectionCompleteView(
540                        hci_packets.EventPacketView(
541                            bt_packets.PacketViewLittleEndian(
542                                list(packet_bytes))))
543                    conn_handle = cc_view.GetConnectionHandle()
544                    return True
545                return False
546
547            def event_handle(packet):
548                packet_bytes = packet.event
549                return get_handle(packet_bytes)
550
551            def payload_handle(packet):
552                packet_bytes = packet.payload
553                return get_handle(packet_bytes)
554
555            cert_hci_event_asserts.assert_event_occurs(payload_handle)
556            cert_handle = conn_handle
557            conn_handle = 0xfff
558            hci_event_asserts.assert_event_occurs(event_handle)
559            dut_handle = conn_handle
560            if dut_handle == 0xfff:
561                logging.warning("Failed to get the DUT handle")
562                return False
563            if cert_handle == 0xfff:
564                logging.warning("Failed to get the CERT handle")
565                return False
566
567            # Send ACL Data
568            self.enqueue_acl_data(
569                dut_handle, hci_packets.PacketBoundaryFlag.
570                FIRST_NON_AUTOMATICALLY_FLUSHABLE,
571                hci_packets.BroadcastFlag.POINT_TO_POINT,
572                bytes(b'Just SomeAclData'))
573            self.send_hal_acl_data(
574                cert_handle, hci_packets.PacketBoundaryFlag.
575                FIRST_NON_AUTOMATICALLY_FLUSHABLE,
576                hci_packets.BroadcastFlag.POINT_TO_POINT,
577                bytes(b'Just SomeMoreAclData'))
578
579            cert_acl_data_asserts.assert_event_occurs(
580                lambda packet: b'SomeAclData' in packet.payload)
581            acl_data_asserts.assert_event_occurs(
582                lambda packet: b'SomeMoreAclData' in packet.data)
583
584    def test_connection_cert_connects(self):
585        self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE)
586        self.register_for_event(
587            hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
588        self.register_for_event(hci_packets.EventCode.CONNECTION_REQUEST)
589        self.send_hal_hci_command(hci_packets.WritePageTimeoutBuilder(0x4000))
590        with EventCallbackStream(self.device_under_test.hci.FetchEvents(empty_proto.Empty())) as hci_event_stream, \
591            EventCallbackStream(self.device_under_test.hci.FetchAclPackets(empty_proto.Empty())) as acl_data_stream, \
592            EventCallbackStream(self.cert_device.hal.FetchHciEvent(empty_proto.Empty())) as cert_hci_event_stream, \
593            EventCallbackStream(self.cert_device.hal.FetchHciAcl(empty_proto.Empty())) as cert_acl_data_stream:
594
595            hci_event_asserts = EventAsserts(hci_event_stream)
596            cert_hci_event_asserts = EventAsserts(cert_hci_event_stream)
597            acl_data_asserts = EventAsserts(acl_data_stream)
598            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
599
600            address = hci_packets.Address()
601
602            def get_address_from_complete(packet):
603                packet_bytes = packet.event
604                if b'\x0e\x0a\x01\x09\x10' in packet_bytes:
605                    nonlocal address
606                    addr_view = hci_packets.ReadBdAddrCompleteView(
607                        hci_packets.CommandCompleteView(
608                            hci_packets.EventPacketView(
609                                bt_packets.PacketViewLittleEndian(
610                                    list(packet_bytes)))))
611                    address = addr_view.GetBdAddr()
612                    return True
613                return False
614
615            # DUT Enables scans and gets its address
616            self.enqueue_hci_command(
617                hci_packets.WriteScanEnableBuilder(
618                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True)
619            self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True)
620
621            hci_event_asserts.assert_event_occurs(get_address_from_complete)
622
623            # Cert Connects
624            self.send_hal_hci_command(
625                hci_packets.CreateConnectionBuilder(
626                    address,
627                    0xcc18,  # Packet Type
628                    hci_packets.PageScanRepetitionMode.R0,
629                    0,
630                    hci_packets.ClockOffsetValid.INVALID,
631                    hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH))
632
633            # DUT Accepts
634            connection_request = None
635
636            def get_connect_request(packet):
637                if b'\x04\x0a' in packet.event:
638                    nonlocal connection_request
639                    connection_request = hci_packets.ConnectionRequestView(
640                        hci_packets.EventPacketView(
641                            bt_packets.PacketViewLittleEndian(
642                                list(packet.event))))
643                    return True
644                return False
645
646            hci_event_asserts.assert_event_occurs(
647                get_connect_request, timeout=timedelta(seconds=20))
648            self.enqueue_hci_command(
649                hci_packets.AcceptConnectionRequestBuilder(
650                    connection_request.GetBdAddr(),
651                    hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE),
652                False)
653
654            conn_handle = 0xfff
655
656            def get_handle(packet_bytes):
657                if b'\x03\x0b\x00' in packet_bytes:
658                    nonlocal conn_handle
659                    cc_view = hci_packets.ConnectionCompleteView(
660                        hci_packets.EventPacketView(
661                            bt_packets.PacketViewLittleEndian(
662                                list(packet_bytes))))
663                    conn_handle = cc_view.GetConnectionHandle()
664                    return True
665                return False
666
667            def event_handle(packet):
668                packet_bytes = packet.event
669                return get_handle(packet_bytes)
670
671            def payload_handle(packet):
672                packet_bytes = packet.payload
673                return get_handle(packet_bytes)
674
675            cert_hci_event_asserts.assert_event_occurs(payload_handle)
676            cert_handle = conn_handle
677            conn_handle = 0xfff
678            hci_event_asserts.assert_event_occurs(event_handle)
679            dut_handle = conn_handle
680            if dut_handle == 0xfff:
681                logging.warning("Failed to get the DUT handle")
682                return False
683            if cert_handle == 0xfff:
684                logging.warning("Failed to get the CERT handle")
685                return False
686
687            # Send ACL Data
688            self.enqueue_acl_data(
689                dut_handle, hci_packets.PacketBoundaryFlag.
690                FIRST_NON_AUTOMATICALLY_FLUSHABLE,
691                hci_packets.BroadcastFlag.POINT_TO_POINT,
692                bytes(b'This is just SomeAclData'))
693            self.send_hal_acl_data(
694                cert_handle, hci_packets.PacketBoundaryFlag.
695                FIRST_NON_AUTOMATICALLY_FLUSHABLE,
696                hci_packets.BroadcastFlag.POINT_TO_POINT,
697                bytes(b'This is just SomeMoreAclData'))
698
699            cert_acl_data_asserts.assert_event_occurs(
700                lambda packet: b'SomeAclData' in packet.payload)
701            acl_data_asserts.assert_event_occurs(
702                lambda packet: b'SomeMoreAclData' in packet.data)
703