• Home
  • History
  • Annotate
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass
18from cert.event_callback_stream import EventCallbackStream
19from cert.event_asserts import EventAsserts
20from google.protobuf import empty_pb2 as empty_proto
21from facade import rootservice_pb2 as facade_rootservice
22from facade import common_pb2 as common
23from hci.facade import le_acl_manager_facade_pb2 as le_acl_manager_facade
24from hci.facade import le_advertising_manager_facade_pb2 as le_advertising_facade
25from hci.facade import facade_pb2 as hci_facade
26import bluetooth_packets_python3 as bt_packets
27from bluetooth_packets_python3 import hci_packets
28
29
30class LeAclManagerTest(GdFacadeOnlyBaseTestClass):
31
32    def setup_test(self):
33        self.device_under_test.rootservice.StartStack(
34            facade_rootservice.StartStackRequest(
35                module_under_test=facade_rootservice.BluetoothModule.Value(
36                    'HCI_INTERFACES'),))
37        self.cert_device.rootservice.StartStack(
38            facade_rootservice.StartStackRequest(
39                module_under_test=facade_rootservice.BluetoothModule.Value(
40                    'HCI'),))
41
42        self.device_under_test.wait_channel_ready()
43        self.cert_device.wait_channel_ready()
44
45    def teardown_test(self):
46        self.device_under_test.rootservice.StopStack(
47            facade_rootservice.StopStackRequest())
48        self.cert_device.rootservice.StopStack(
49            facade_rootservice.StopStackRequest())
50
51    def register_for_event(self, event_code):
52        msg = hci_facade.EventCodeMsg(code=int(event_code))
53        self.cert_device.hci.RegisterEventHandler(msg)
54
55    def register_for_le_event(self, event_code):
56        msg = hci_facade.LeSubeventCodeMsg(code=int(event_code))
57        self.cert_device.hci.RegisterLeEventHandler(msg)
58
59    def enqueue_hci_command(self, command, expect_complete):
60        cmd_bytes = bytes(command.Serialize())
61        cmd = hci_facade.CommandMsg(command=cmd_bytes)
62        if (expect_complete):
63            self.cert_device.hci.EnqueueCommandWithComplete(cmd)
64        else:
65            self.cert_device.hci.EnqueueCommandWithStatus(cmd)
66
67    def enqueue_acl_data(self, handle, pb_flag, b_flag, acl):
68        acl_msg = hci_facade.AclMsg(
69            handle=int(handle),
70            packet_boundary_flag=int(pb_flag),
71            broadcast_flag=int(b_flag),
72            data=acl)
73        self.cert_device.hci.SendAclData(acl_msg)
74
75    def test_dut_connects(self):
76        self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE)
77        with EventCallbackStream(self.cert_device.hci.FetchLeSubevents(empty_proto.Empty())) as cert_hci_le_event_stream, \
78            EventCallbackStream(self.cert_device.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \
79            EventCallbackStream(self.device_under_test.hci_le_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:
80
81            cert_hci_le_event_asserts = EventAsserts(cert_hci_le_event_stream)
82            acl_data_asserts = EventAsserts(acl_data_stream)
83            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
84
85            # Cert Advertises
86            advertising_handle = 0
87            self.enqueue_hci_command(
88                hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
89                    advertising_handle,
90                    hci_packets.LegacyAdvertisingProperties.ADV_IND,
91                    400,
92                    450,
93                    7,
94                    hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
95                    hci_packets.PeerAddressType.
96                    PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
97                    '00:00:00:00:00:00',
98                    hci_packets.AdvertisingFilterPolicy.ALL_DEVICES,
99                    0xF8,
100                    1,  #SID
101                    hci_packets.Enable.DISABLED  # Scan request notification
102                ),
103                True)
104
105            self.enqueue_hci_command(
106                hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(
107                    advertising_handle, '0C:05:04:03:02:01'), True)
108
109            gap_name = hci_packets.GapData()
110            gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
111            gap_name.data = list(bytes(b'Im_A_Cert'))
112
113            self.enqueue_hci_command(
114                hci_packets.LeSetExtendedAdvertisingDataBuilder(
115                    advertising_handle,
116                    hci_packets.Operation.COMPLETE_ADVERTISEMENT,
117                    hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT,
118                    [gap_name]), True)
119
120            gap_short_name = hci_packets.GapData()
121            gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME
122            gap_short_name.data = list(bytes(b'Im_A_C'))
123
124            self.enqueue_hci_command(
125                hci_packets.LeSetExtendedAdvertisingScanResponseBuilder(
126                    advertising_handle,
127                    hci_packets.Operation.COMPLETE_ADVERTISEMENT,
128                    hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT,
129                    [gap_short_name]), True)
130
131            enabled_set = hci_packets.EnabledSet()
132            enabled_set.advertising_handle = advertising_handle
133            enabled_set.duration = 0
134            enabled_set.max_extended_advertising_events = 0
135            self.enqueue_hci_command(
136                hci_packets.LeSetExtendedAdvertisingEnableBuilder(
137                    hci_packets.Enable.ENABLED, [enabled_set]), True)
138
139            with EventCallbackStream(
140                    self.device_under_test.hci_le_acl_manager.CreateConnection(
141                        le_acl_manager_facade.LeConnectionMsg(
142                            address_type=int(
143                                hci_packets.AddressType.RANDOM_DEVICE_ADDRESS),
144                            address=bytes('0C:05:04:03:02:01',
145                                          'utf8')))) as connection_event_stream:
146
147                connection_event_asserts = EventAsserts(connection_event_stream)
148
149                # Cert gets ConnectionComplete with a handle and sends ACL data
150                handle = 0xfff
151
152                def get_handle(packet):
153                    packet_bytes = packet.event
154                    nonlocal handle
155                    if b'\x3e\x13\x01\x00' in packet_bytes:
156                        cc_view = hci_packets.LeConnectionCompleteView(
157                            hci_packets.LeMetaEventView(
158                                hci_packets.EventPacketView(
159                                    bt_packets.PacketViewLittleEndian(
160                                        list(packet_bytes)))))
161                        handle = cc_view.GetConnectionHandle()
162                        return True
163                    if b'\x3e\x13\x0A\x00' in packet_bytes:
164                        cc_view = hci_packets.LeEnhancedConnectionCompleteView(
165                            hci_packets.LeMetaEventView(
166                                hci_packets.EventPacketView(
167                                    bt_packets.PacketViewLittleEndian(
168                                        list(packet_bytes)))))
169                        handle = cc_view.GetConnectionHandle()
170                        return True
171                    return False
172
173                cert_hci_le_event_asserts.assert_event_occurs(get_handle)
174                cert_handle = handle
175
176                self.enqueue_acl_data(
177                    cert_handle, hci_packets.PacketBoundaryFlag.
178                    FIRST_AUTOMATICALLY_FLUSHABLE,
179                    hci_packets.BroadcastFlag.POINT_TO_POINT,
180                    bytes(b'\x19\x00\x07\x00SomeAclData from the Cert'))
181
182                # DUT gets a connection complete event and sends and receives
183                handle = 0xfff
184                connection_event_asserts.assert_event_occurs(get_handle)
185
186                self.device_under_test.hci_le_acl_manager.SendAclData(
187                    le_acl_manager_facade.LeAclData(
188                        handle=handle,
189                        payload=bytes(
190                            b'\x1C\x00\x07\x00SomeMoreAclData from the DUT')))
191
192                cert_acl_data_asserts.assert_event_occurs(
193                    lambda packet: b'SomeMoreAclData' in packet.data)
194                acl_data_asserts.assert_event_occurs(
195                    lambda packet: b'SomeAclData' in packet.payload)
196
197    def test_cert_connects(self):
198        self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE)
199        with EventCallbackStream(self.cert_device.hci.FetchLeSubevents(empty_proto.Empty())) as cert_hci_le_event_stream, \
200                EventCallbackStream(self.cert_device.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \
201                EventCallbackStream(self.device_under_test.hci_le_acl_manager.FetchIncomingConnection(empty_proto.Empty())) as incoming_connection_stream, \
202                EventCallbackStream(self.device_under_test.hci_le_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:
203
204            cert_hci_le_event_asserts = EventAsserts(cert_hci_le_event_stream)
205            incoming_connection_asserts = EventAsserts(
206                incoming_connection_stream)
207            acl_data_asserts = EventAsserts(acl_data_stream)
208            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
209
210            # DUT Advertises
211            gap_name = hci_packets.GapData()
212            gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
213            gap_name.data = list(bytes(b'Im_The_DUT'))
214            gap_data = le_advertising_facade.GapDataMsg(
215                data=bytes(gap_name.Serialize()))
216            config = le_advertising_facade.AdvertisingConfig(
217                advertisement=[gap_data],
218                random_address=common.BluetoothAddress(
219                    address=bytes(b'0D:05:04:03:02:01')),
220                interval_min=512,
221                interval_max=768,
222                event_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
223                address_type=common.RANDOM_DEVICE_ADDRESS,
224                peer_address_type=common.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
225                peer_address=common.BluetoothAddress(
226                    address=bytes(b'A6:A5:A4:A3:A2:A1')),
227                channel_map=7,
228                filter_policy=le_advertising_facade.AdvertisingFilterPolicy.
229                ALL_DEVICES)
230            request = le_advertising_facade.CreateAdvertiserRequest(
231                config=config)
232
233            create_response = self.device_under_test.hci_le_advertising_manager.CreateAdvertiser(
234                request)
235
236            # Cert Connects
237            self.enqueue_hci_command(
238                hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01'),
239                True)
240            phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
241            phy_scan_params.scan_interval = 0x60
242            phy_scan_params.scan_window = 0x30
243            phy_scan_params.conn_interval_min = 0x18
244            phy_scan_params.conn_interval_max = 0x28
245            phy_scan_params.conn_latency = 0
246            phy_scan_params.supervision_timeout = 0x1f4
247            phy_scan_params.min_ce_length = 0
248            phy_scan_params.max_ce_length = 0
249            self.enqueue_hci_command(
250                hci_packets.LeExtendedCreateConnectionBuilder(
251                    hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS,
252                    hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
253                    hci_packets.AddressType.RANDOM_DEVICE_ADDRESS,
254                    '0D:05:04:03:02:01', 1, [phy_scan_params]), False)
255
256            # Cert gets ConnectionComplete with a handle and sends ACL data
257            handle = 0xfff
258
259            def get_handle(packet):
260                packet_bytes = packet.event
261                nonlocal handle
262                if b'\x3e\x13\x01\x00' in packet_bytes:
263                    cc_view = hci_packets.LeConnectionCompleteView(
264                        hci_packets.LeMetaEventView(
265                            hci_packets.EventPacketView(
266                                bt_packets.PacketViewLittleEndian(
267                                    list(packet_bytes)))))
268                    handle = cc_view.GetConnectionHandle()
269                    return True
270                if b'\x3e\x13\x0A\x00' in packet_bytes:
271                    cc_view = hci_packets.LeEnhancedConnectionCompleteView(
272                        hci_packets.LeMetaEventView(
273                            hci_packets.EventPacketView(
274                                bt_packets.PacketViewLittleEndian(
275                                    list(packet_bytes)))))
276                    handle = cc_view.GetConnectionHandle()
277                    return True
278                return False
279
280            cert_hci_le_event_asserts.assert_event_occurs(get_handle)
281            cert_handle = handle
282
283            self.enqueue_acl_data(
284                cert_handle,
285                hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE,
286                hci_packets.BroadcastFlag.POINT_TO_POINT,
287                bytes(b'\x19\x00\x07\x00SomeAclData from the Cert'))
288
289            # DUT gets a connection complete event and sends and receives
290            handle = 0xfff
291            incoming_connection_asserts.assert_event_occurs(get_handle)
292
293            self.device_under_test.hci_le_acl_manager.SendAclData(
294                le_acl_manager_facade.LeAclData(
295                    handle=handle,
296                    payload=bytes(
297                        b'\x1C\x00\x07\x00SomeMoreAclData from the DUT')))
298
299            cert_acl_data_asserts.assert_event_occurs(
300                lambda packet: b'SomeMoreAclData' in packet.data)
301            acl_data_asserts.assert_event_occurs(
302                lambda packet: b'SomeAclData' in packet.payload)
303
304    def test_recombination_l2cap_packet(self):
305        self.register_for_le_event(hci_packets.SubeventCode.CONNECTION_COMPLETE)
306        with EventCallbackStream(self.cert_device.hci.FetchLeSubevents(empty_proto.Empty())) as cert_hci_le_event_stream, \
307                EventCallbackStream(self.cert_device.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \
308                EventCallbackStream(self.device_under_test.hci_le_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:
309
310            cert_hci_le_event_asserts = EventAsserts(cert_hci_le_event_stream)
311            acl_data_asserts = EventAsserts(acl_data_stream)
312            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
313
314            # Cert Advertises
315            advertising_handle = 0
316            self.enqueue_hci_command(
317                hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
318                    advertising_handle,
319                    hci_packets.LegacyAdvertisingProperties.ADV_IND,
320                    400,
321                    450,
322                    7,
323                    hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
324                    hci_packets.PeerAddressType.
325                    PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
326                    '00:00:00:00:00:00',
327                    hci_packets.AdvertisingFilterPolicy.ALL_DEVICES,
328                    0xF8,
329                    1,  #SID
330                    hci_packets.Enable.DISABLED  # Scan request notification
331                ),
332                True)
333
334            self.enqueue_hci_command(
335                hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(
336                    advertising_handle, '0C:05:04:03:02:01'), True)
337
338            gap_name = hci_packets.GapData()
339            gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
340            gap_name.data = list(bytes(b'Im_A_Cert'))
341
342            self.enqueue_hci_command(
343                hci_packets.LeSetExtendedAdvertisingDataBuilder(
344                    advertising_handle,
345                    hci_packets.Operation.COMPLETE_ADVERTISEMENT,
346                    hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT,
347                    [gap_name]), True)
348
349            gap_short_name = hci_packets.GapData()
350            gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME
351            gap_short_name.data = list(bytes(b'Im_A_C'))
352
353            self.enqueue_hci_command(
354                hci_packets.LeSetExtendedAdvertisingScanResponseBuilder(
355                    advertising_handle,
356                    hci_packets.Operation.COMPLETE_ADVERTISEMENT,
357                    hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT,
358                    [gap_short_name]), True)
359
360            enabled_set = hci_packets.EnabledSet()
361            enabled_set.advertising_handle = advertising_handle
362            enabled_set.duration = 0
363            enabled_set.max_extended_advertising_events = 0
364            self.enqueue_hci_command(
365                hci_packets.LeSetExtendedAdvertisingEnableBuilder(
366                    hci_packets.Enable.ENABLED, [enabled_set]), True)
367
368            with EventCallbackStream(
369                    self.device_under_test.hci_le_acl_manager.CreateConnection(
370                        le_acl_manager_facade.LeConnectionMsg(
371                            address_type=int(
372                                hci_packets.AddressType.RANDOM_DEVICE_ADDRESS),
373                            address=bytes('0C:05:04:03:02:01',
374                                          'utf8')))) as connection_event_stream:
375
376                connection_event_asserts = EventAsserts(connection_event_stream)
377
378                # Cert gets ConnectionComplete with a handle and sends ACL data
379                handle = 0xfff
380
381                def get_handle(packet):
382                    packet_bytes = packet.event
383                    nonlocal handle
384                    if b'\x3e\x13\x01\x00' in packet_bytes:
385                        cc_view = hci_packets.LeConnectionCompleteView(
386                            hci_packets.LeMetaEventView(
387                                hci_packets.EventPacketView(
388                                    bt_packets.PacketViewLittleEndian(
389                                        list(packet_bytes)))))
390                        handle = cc_view.GetConnectionHandle()
391                        return True
392                    if b'\x3e\x13\x0A\x00' in packet_bytes:
393                        cc_view = hci_packets.LeEnhancedConnectionCompleteView(
394                            hci_packets.LeMetaEventView(
395                                hci_packets.EventPacketView(
396                                    bt_packets.PacketViewLittleEndian(
397                                        list(packet_bytes)))))
398                        handle = cc_view.GetConnectionHandle()
399                        return True
400                    return False
401
402                cert_hci_le_event_asserts.assert_event_occurs(get_handle)
403                cert_handle = handle
404
405                # DUT gets a connection complete event
406                connection_event_asserts.assert_event_occurs(get_handle)
407
408                self.enqueue_acl_data(
409                    cert_handle, hci_packets.PacketBoundaryFlag.
410                    FIRST_AUTOMATICALLY_FLUSHABLE,
411                    hci_packets.BroadcastFlag.POINT_TO_POINT,
412                    bytes(b'\x06\x00\x07\x00Hello'))
413                self.enqueue_acl_data(
414                    cert_handle,
415                    hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT,
416                    hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'!'))
417
418                acl_data_asserts.assert_event_occurs(
419                    lambda packet: b'Hello!' in packet.payload)
420