1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from google.protobuf import empty_pb2 as empty_proto
18from blueberry.tests.gd.cert.event_stream import EventStream
19from blueberry.tests.gd.cert.event_stream import FilteringEventStream
20from blueberry.tests.gd.cert.event_stream import IEventStream
21from blueberry.tests.gd.cert.closable import Closable
22from blueberry.tests.gd.cert.closable import safeClose
23from blueberry.tests.gd.cert.captures import HciCaptures
24from blueberry.tests.gd.cert.truth import assertThat
25from blueberry.tests.gd.cert.matchers import HciMatchers
26from blueberry.facade import common_pb2 as common
27import hci_packets as hci
28from blueberry.utils import bluetooth
29
30
31class PyHalAclConnection(IEventStream):
32
33    def __init__(self, handle, acl_stream, device):
34        self.handle = int(handle)
35        self.device = device
36        self.our_acl_stream = FilteringEventStream(acl_stream, None)
37
38    def send(self, pb_flag, b_flag, data: bytes):
39        assert isinstance(data, bytes)
40        acl = hci.Acl(handle=self.handle, packet_boundary_flag=pb_flag, broadcast_flag=b_flag, payload=data)
41        self.device.hal.SendAcl(common.Data(payload=acl.serialize()))
42
43    def send_first(self, data: bytes):
44        assert isinstance(data, bytes)
45        self.send(hci.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, hci.BroadcastFlag.POINT_TO_POINT, data)
46
47    def get_event_queue(self):
48        return self.our_acl_stream.get_event_queue()
49
50
51class PyHalAdvertisement(object):
52
53    def __init__(self, handle, py_hal, is_legacy):
54        self.handle = handle
55        self.py_hal = py_hal
56        self.legacy = is_legacy
57
58    def set_data(self, complete_name):
59        advertising_data = [hci.GapData(data_type=hci.GapDataType.COMPLETE_LOCAL_NAME, data=list(complete_name))]
60
61        if self.legacy:
62            self.py_hal.send_hci_command(hci.LeSetAdvertisingData(advertising_data=advertising_data))
63            self.py_hal.wait_for_complete(hci.OpCode.LE_SET_ADVERTISING_DATA)
64        else:
65            self.py_hal.send_hci_command(
66                hci.LeSetExtendedAdvertisingData(advertising_handle=self.handle,
67                                                 operation=hci.Operation.COMPLETE_ADVERTISEMENT,
68                                                 fragment_preference=hci.FragmentPreference.CONTROLLER_SHOULD_NOT,
69                                                 advertising_data=advertising_data))
70            self.py_hal.wait_for_complete(hci.OpCode.LE_SET_EXTENDED_ADVERTISING_DATA)
71
72    def set_scan_response(self, shortened_name):
73        advertising_data = [hci.GapData(data_type=hci.GapDataType.SHORTENED_LOCAL_NAME, data=list(shortened_name))]
74
75        if self.legacy:
76            self.py_hal.send_hci_command(hci.LeSetScanResponseData(advertising_data=advertising_data))
77            self.py_hal.wait_for_complete(hci.OpCode.LE_SET_SCAN_RESPONSE_DATA)
78        else:
79            self.py_hal.send_hci_command(
80                hci.LeSetExtendedScanResponseData(advertising_handle=self.handle,
81                                                  operation=hci.Operation.COMPLETE_ADVERTISEMENT,
82                                                  fragment_preference=hci.FragmentPreference.CONTROLLER_SHOULD_NOT,
83                                                  scan_response_data=advertising_data))
84            self.py_hal.wait_for_complete(hci.OpCode.LE_SET_EXTENDED_SCAN_RESPONSE_DATA)
85
86    def start(self):
87        if self.legacy:
88            self.py_hal.send_hci_command(hci.LeSetAdvertisingEnable(advertising_enable=hci.Enable.ENABLED))
89            self.py_hal.wait_for_complete(hci.OpCode.LE_SET_ADVERTISING_ENABLE)
90        else:
91            self.py_hal.send_hci_command(
92                hci.LeSetExtendedAdvertisingEnable(enable=hci.Enable.ENABLED,
93                                                   enabled_sets=[
94                                                       hci.EnabledSet(advertising_handle=self.handle,
95                                                                      duration=0,
96                                                                      max_extended_advertising_events=0)
97                                                   ]))
98            self.py_hal.wait_for_complete(hci.OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE)
99
100    def stop(self):
101        if self.legacy:
102            self.py_hal.send_hci_command(hci.LeSetAdvertisingEnable(advertising_enable=hci.Enable.DISABLED))
103            self.py_hal.wait_for_complete(hci.OpCode.LE_SET_ADVERTISING_ENABLE)
104        else:
105            self.py_hal.send_hci_command(
106                hci.LeSetExtendedAdvertisingEnable(enable=hci.Enable.DISABLED,
107                                                   enabled_sets=[
108                                                       hci.EnabledSet(advertising_handle=self.handle,
109                                                                      duration=0,
110                                                                      max_extended_advertising_events=0)
111                                                   ]))
112            self.py_hal.wait_for_complete(hci.OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE)
113
114
115class PyHal(Closable):
116
117    def __init__(self, device):
118        self.device = device
119
120        self.hci_event_stream = EventStream(self.device.hal.StreamEvents(empty_proto.Empty()))
121        self.acl_stream = EventStream(self.device.hal.StreamAcl(empty_proto.Empty()))
122
123        self.event_mask = 0x1FFF_FFFF_FFFF  # Default Event Mask (Core Vol 4 [E] 7.3.1)
124        self.le_event_mask = 0x0000_0000_001F  # Default LE Event Mask (Core Vol 4 [E] 7.8.1)
125
126        # We don't deal with SCO for now
127
128    def close(self):
129        safeClose(self.hci_event_stream)
130        safeClose(self.acl_stream)
131
132    def get_hci_event_stream(self):
133        return self.hci_event_stream
134
135    def wait_for_complete(self, opcode):
136        assertThat(self.hci_event_stream).emits(HciMatchers.CommandComplete(opcode))
137
138    def wait_for_status(self, opcode):
139        assertThat(self.hci_event_stream).emits(HciMatchers.CommandStatus(opcode))
140
141    def get_acl_stream(self):
142        return self.acl_stream
143
144    def send_hci_command(self, command: hci.Packet):
145        self.device.hal.SendCommand(common.Data(payload=command.serialize()))
146
147    def send_acl(self, handle, pb_flag, b_flag, data: bytes):
148        acl = hci.Acl(handle=handle, packet_boundary_flag=pb_flag, broadcast_flag=b_flag, payload=data)
149        self.device.hal.SendAcl(common.Data(payload=acl.serialize()))
150
151    def send_acl_first(self, handle, data: bytes):
152        self.send_acl(handle, hci.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
153                      hci.BroadcastFlag.POINT_TO_POINT, data)
154
155    def read_own_address(self) -> bluetooth.Address:
156        self.send_hci_command(hci.ReadBdAddr())
157        read_bd_addr = HciCaptures.ReadBdAddrCompleteCapture()
158        assertThat(self.hci_event_stream).emits(read_bd_addr)
159        return read_bd_addr.get().bd_addr
160
161    def set_random_le_address(self, addr):
162        self.send_hci_command(hci.LeSetRandomAddress(random_address=bluetooth.Address(addr)))
163        self.wait_for_complete(hci.OpCode.LE_SET_RANDOM_ADDRESS)
164
165    def set_scan_parameters(self):
166        self.send_hci_command(
167            hci.LeSetExtendedScanParameters(own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS,
168                                            scanning_filter_policy=hci.LeScanningFilterPolicy.ACCEPT_ALL,
169                                            scanning_phys=1,
170                                            parameters=[
171                                                hci.PhyScanParameters(le_scan_type=hci.LeScanType.ACTIVE,
172                                                                      le_scan_interval=6553,
173                                                                      le_scan_window=6553)
174                                            ]))
175        self.wait_for_complete(hci.OpCode.LE_SET_EXTENDED_SCAN_PARAMETERS)
176
177    def unmask_event(self, *event_codes):
178        for event_code in event_codes:
179            self.event_mask |= 1 << (int(event_code) - 1)
180        self.send_hci_command(hci.SetEventMask(event_mask=self.event_mask))
181
182    def unmask_le_event(self, *subevent_codes):
183        for subevent_code in subevent_codes:
184            self.le_event_mask |= 1 << (int(subevent_code) - 1)
185        self.send_hci_command(hci.LeSetEventMask(le_event_mask=self.le_event_mask))
186
187    def start_scanning(self):
188        self.send_hci_command(
189            hci.LeSetExtendedScanEnable(enable=hci.Enable.ENABLED,
190                                        filter_duplicates=hci.FilterDuplicates.DISABLED,
191                                        duration=0,
192                                        period=0))
193        self.wait_for_complete(hci.OpCode.LE_SET_EXTENDED_SCAN_ENABLE)
194
195    def stop_scanning(self):
196        self.send_hci_command(
197            hci.LeSetExtendedScanEnable(enable=hci.Enable.DISABLED,
198                                        filter_duplicates=hci.FilterDuplicates.DISABLED,
199                                        duration=0,
200                                        period=0))
201        self.wait_for_complete(hci.OpCode.LE_SET_EXTENDED_SCAN_ENABLE)
202
203    def reset(self):
204        self.send_hci_command(hci.Reset())
205        self.wait_for_complete(hci.OpCode.RESET)
206
207    def enable_inquiry_and_page_scan(self):
208        self.send_hci_command(hci.WriteScanEnable(scan_enable=hci.ScanEnable.INQUIRY_AND_PAGE_SCAN))
209
210    def initiate_connection(self, remote_addr):
211        self.send_hci_command(
212            hci.CreateConnection(bd_addr=bluetooth.Address(remote_addr),
213                                 packet_type=0xcc18,
214                                 page_scan_repetition_mode=hci.PageScanRepetitionMode.R1,
215                                 clock_offset=0x0,
216                                 clock_offset_valid=hci.ClockOffsetValid.INVALID,
217                                 allow_role_switch=hci.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH))
218
219    def accept_connection(self):
220        connection_request = HciCaptures.ConnectionRequestCapture()
221        assertThat(self.hci_event_stream).emits(connection_request)
222
223        self.send_hci_command(
224            hci.AcceptConnectionRequest(bd_addr=connection_request.get().bd_addr,
225                                        role=hci.AcceptConnectionRequestRole.REMAIN_PERIPHERAL))
226        return self.complete_connection()
227
228    def complete_connection(self):
229        connection_complete = HciCaptures.ConnectionCompleteCapture()
230        assertThat(self.hci_event_stream).emits(connection_complete)
231
232        handle = connection_complete.get().connection_handle
233        return PyHalAclConnection(handle, self.acl_stream, self.device)
234
235    def initiate_le_connection(self, remote_addr):
236        self.send_hci_command(
237            hci.LeExtendedCreateConnection(initiator_filter_policy=hci.InitiatorFilterPolicy.USE_PEER_ADDRESS,
238                                           own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS,
239                                           peer_address_type=hci.AddressType.RANDOM_DEVICE_ADDRESS,
240                                           peer_address=bluetooth.Address(remote_addr),
241                                           initiating_phys=1,
242                                           phy_scan_parameters=[
243                                               hci.LeCreateConnPhyScanParameters(scan_interval=0x60,
244                                                                                 scan_window=0x30,
245                                                                                 conn_interval_min=0x18,
246                                                                                 conn_interval_max=0x28,
247                                                                                 conn_latency=0,
248                                                                                 supervision_timeout=0x1f4,
249                                                                                 min_ce_length=0,
250                                                                                 max_ce_length=0)
251                                           ]))
252        self.wait_for_status(hci.OpCode.LE_EXTENDED_CREATE_CONNECTION)
253
254    def add_to_filter_accept_list(self, remote_addr):
255        self.send_hci_command(
256            hci.LeAddDeviceToFilterAcceptList(address_type=hci.FilterAcceptListAddressType.RANDOM,
257                                              address=bluetooth.Address(remote_addr)))
258
259    def initiate_le_connection_by_filter_accept_list(self, remote_addr):
260        self.send_hci_command(
261            hci.LeExtendedCreateConnection(initiator_filter_policy=hci.InitiatorFilterPolicy.USE_FILTER_ACCEPT_LIST,
262                                           own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS,
263                                           peer_address_type=hci.AddressType.RANDOM_DEVICE_ADDRESS,
264                                           peer_address=bluetooth.Address('00:00:00:00:00:00'),
265                                           initiating_phys=1,
266                                           phy_scan_parameters=[
267                                               hci.LeCreateConnPhyScanParameters(scan_interval=0x60,
268                                                                                 scan_window=0x30,
269                                                                                 conn_interval_min=0x18,
270                                                                                 conn_interval_max=0x28,
271                                                                                 conn_latency=0,
272                                                                                 supervision_timeout=0x1f4,
273                                                                                 min_ce_length=0,
274                                                                                 max_ce_length=0)
275                                           ]))
276
277    def complete_le_connection(self):
278        connection_complete = HciCaptures.LeConnectionCompleteCapture()
279        assertThat(self.hci_event_stream).emits(connection_complete)
280
281        handle = connection_complete.get().connection_handle
282        return PyHalAclConnection(handle, self.acl_stream, self.device)
283
284    def create_advertisement(self,
285                             handle,
286                             own_address: str,
287                             properties=hci.LegacyAdvertisingEventProperties.ADV_IND,
288                             min_interval=400,
289                             max_interval=450,
290                             channel_map=7,
291                             own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS,
292                             peer_address_type=hci.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
293                             peer_address='00:00:00:00:00:00',
294                             filter_policy=hci.AdvertisingFilterPolicy.ALL_DEVICES,
295                             tx_power=0xF8,
296                             sid=1,
297                             scan_request_notification=hci.Enable.DISABLED):
298
299        self.send_hci_command(
300            hci.LeSetExtendedAdvertisingParametersLegacy(advertising_handle=handle,
301                                                         legacy_advertising_event_properties=properties,
302                                                         primary_advertising_interval_min=min_interval,
303                                                         primary_advertising_interval_max=max_interval,
304                                                         primary_advertising_channel_map=channel_map,
305                                                         own_address_type=own_address_type,
306                                                         peer_address_type=peer_address_type,
307                                                         peer_address=bluetooth.Address(peer_address),
308                                                         advertising_filter_policy=filter_policy,
309                                                         advertising_tx_power=tx_power,
310                                                         advertising_sid=sid,
311                                                         scan_request_notification_enable=scan_request_notification))
312        self.wait_for_complete(hci.OpCode.LE_SET_EXTENDED_ADVERTISING_PARAMETERS)
313        self.send_hci_command(
314            hci.LeSetAdvertisingSetRandomAddress(advertising_handle=handle,
315                                                 random_address=bluetooth.Address(own_address)))
316        self.wait_for_complete(hci.OpCode.LE_SET_ADVERTISING_SET_RANDOM_ADDRESS)
317
318        return PyHalAdvertisement(handle, self, False)
319
320    def create_legacy_advertisement(self,
321                                    advertising_type=hci.AdvertisingType.ADV_IND,
322                                    min_interval=400,
323                                    max_interval=450,
324                                    channel_map=7,
325                                    own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS,
326                                    peer_address_type=hci.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
327                                    peer_address='00:00:00:00:00:00',
328                                    filter_policy=hci.AdvertisingFilterPolicy.ALL_DEVICES):
329
330        self.send_hci_command(
331            hci.LeSetAdvertisingParameters(advertising_interval_min=min_interval,
332                                           advertising_interval_max=max_interval,
333                                           advertising_type=advertising_type,
334                                           own_address_type=own_address_type,
335                                           peer_address_type=peer_address_type,
336                                           peer_address=bluetooth.Address(peer_address),
337                                           advertising_channel_map=channel_map,
338                                           advertising_filter_policy=filter_policy))
339        self.wait_for_complete(hci.OpCode.LE_SET_ADVERTISING_PARAMETERS)
340
341        return PyHalAdvertisement(None, self, True)
342