1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16from datetime import timedelta
17
18from google.protobuf import empty_pb2 as empty_proto
19from blueberry.tests.gd.cert.event_stream import EventStream
20from blueberry.tests.gd.cert.event_stream import FilteringEventStream
21from blueberry.tests.gd.cert.event_stream import IEventStream
22from blueberry.tests.gd.cert.closable import Closable
23from blueberry.tests.gd.cert.closable import safeClose
24from blueberry.tests.gd.cert.captures import HciCaptures
25from blueberry.tests.gd.cert.truth import assertThat
26from blueberry.facade.hci import hci_facade_pb2 as hci_facade
27from blueberry.facade import common_pb2 as common
28from blueberry.tests.gd.cert.matchers import HciMatchers
29import hci_packets as hci
30import blueberry.utils.bluetooth as bluetooth
31
32
33class PyHciAclConnection(IEventStream):
34
35    def __init__(self, handle, acl_stream, device):
36        self.handle = int(handle)
37        self.device = device
38        # todo, handle we got is 0, so doesn't match - fix before enabling filtering
39        self.our_acl_stream = FilteringEventStream(acl_stream, None)
40
41    def send(self, pb_flag, b_flag, data: bytes):
42        assert isinstance(data, bytes)
43        acl = hci.Acl(handle=self.handle, packet_boundary_flag=pb_flag, broadcast_flag=b_flag, payload=data)
44        self.device.hci.SendAcl(common.Data(payload=acl.serialize()))
45
46    def send_first(self, data: bytes):
47        self.send(hci.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, hci.BroadcastFlag.POINT_TO_POINT, data)
48
49    def send_continuing(self, data):
50        self.send(hci.PacketBoundaryFlag.CONTINUING_FRAGMENT, hci.BroadcastFlag.POINT_TO_POINT, data)
51
52    def get_event_queue(self):
53        return self.our_acl_stream.get_event_queue()
54
55
56class PyHciLeAclConnection(IEventStream):
57
58    def __init__(self, handle, acl_stream, device, peer, peer_type, peer_resolvable, local_resolvable):
59        self.handle = int(handle)
60        self.device = device
61        self.peer = peer
62        self.peer_type = peer_type
63        self.peer_resolvable = peer_resolvable
64        self.local_resolvable = local_resolvable
65        # todo, handle we got is 0, so doesn't match - fix before enabling filtering
66        self.our_acl_stream = FilteringEventStream(acl_stream, None)
67
68    def send(self, pb_flag, b_flag, data: bytes):
69        assert isinstance(data, bytes)
70        acl = hci.Acl(handle=self.handle, packet_boundary_flag=pb_flag, broadcast_flag=b_flag, payload=data)
71        self.device.hci.SendAcl(common.Data(payload=acl.serialize()))
72
73    def send_first(self, data: bytes):
74        self.send(hci.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, hci.BroadcastFlag.POINT_TO_POINT, data)
75
76    def send_continuing(self, data: bytes):
77        self.send(hci.PacketBoundaryFlag.CONTINUING_FRAGMENT, hci.BroadcastFlag.POINT_TO_POINT, data)
78
79    def get_event_queue(self):
80        return self.our_acl_stream.get_event_queue()
81
82    def local_resolvable_address(self):
83        return self.local_resolvable
84
85    def peer_resolvable_address(self):
86        return self.peer_resolvable
87
88    def peer_address(self):
89        return self.peer
90
91
92class PyHciAdvertisement(object):
93
94    def __init__(self, handle, py_hci):
95        self.handle = handle
96        self.py_hci = py_hci
97
98    def set_data(self, complete_name):
99        self.py_hci.send_command(
100            hci.LeSetExtendedAdvertisingData(
101                advertising_handle=self.handle,
102                operation=hci.Operation.COMPLETE_ADVERTISEMENT,
103                fragment_preference=hci.FragmentPreference.CONTROLLER_SHOULD_NOT,
104                advertising_data=[hci.GapData(data_type=hci.GapDataType.COMPLETE_LOCAL_NAME,
105                                              data=list(complete_name))]))
106
107    def set_scan_response(self, shortened_name):
108        self.py_hci.send_command(
109            hci.LeSetExtendedScanResponseData(advertising_handle=self.handle,
110                                              operation=hci.Operation.COMPLETE_ADVERTISEMENT,
111                                              fragment_preference=hci.FragmentPreference.CONTROLLER_SHOULD_NOT,
112                                              scan_response_data=[
113                                                  hci.GapData(data_type=hci.GapDataType.SHORTENED_LOCAL_NAME,
114                                                              data=list(shortened_name))
115                                              ]))
116
117    def start(self):
118        self.py_hci.send_command(
119            hci.LeSetExtendedAdvertisingEnable(enable=hci.Enable.ENABLED,
120                                               enabled_sets=[
121                                                   hci.EnabledSet(advertising_handle=self.handle,
122                                                                  duration=0,
123                                                                  max_extended_advertising_events=0)
124                                               ]))
125        assertThat(self.py_hci.get_event_stream()).emits(
126            HciMatchers.CommandComplete(hci.OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE))
127
128
129class PyHci(Closable):
130
131    event_stream = None
132    le_event_stream = None
133    acl_stream = None
134
135    def __init__(self, device, acl_streaming=False):
136        """
137            If you are planning on personally using the ACL data stream
138            coming from HCI, specify acl_streaming=True. You probably only
139            want this if you are testing HCI itself.
140        """
141        self.device = device
142        self.event_stream = EventStream(self.device.hci.StreamEvents(empty_proto.Empty()))
143        self.le_event_stream = EventStream(self.device.hci.StreamLeSubevents(empty_proto.Empty()))
144        if acl_streaming:
145            self.register_for_events(hci.EventCode.ROLE_CHANGE, hci.EventCode.CONNECTION_REQUEST,
146                                     hci.EventCode.CONNECTION_COMPLETE, hci.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
147            self.register_for_le_events(hci.SubeventCode.ENHANCED_CONNECTION_COMPLETE)
148            self.acl_stream = EventStream(self.device.hci.StreamAcl(empty_proto.Empty()))
149
150    def close(self):
151        safeClose(self.event_stream)
152        safeClose(self.le_event_stream)
153        safeClose(self.acl_stream)
154
155    def get_event_stream(self):
156        return self.event_stream
157
158    def get_le_event_stream(self):
159        return self.le_event_stream
160
161    def get_raw_acl_stream(self):
162        if self.acl_stream is None:
163            raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__)
164        return self.acl_stream
165
166    def register_for_events(self, *event_codes):
167        for event_code in event_codes:
168            self.device.hci.RequestEvent(hci_facade.EventRequest(code=int(event_code)))
169
170    def register_for_le_events(self, *event_codes):
171        for event_code in event_codes:
172            self.device.hci.RequestLeSubevent(hci_facade.EventRequest(code=int(event_code)))
173
174    def send_command(self, command: hci.Packet):
175        self.device.hci.SendCommand(common.Data(payload=command.serialize()))
176
177    def enable_inquiry_and_page_scan(self):
178        self.send_command(hci.WriteScanEnable(scan_enable=hci.ScanEnable.INQUIRY_AND_PAGE_SCAN))
179
180    def read_own_address(self) -> bluetooth.Address:
181        self.send_command(hci.ReadBdAddr())
182        read_bd_addr = HciCaptures.ReadBdAddrCompleteCapture()
183        assertThat(self.event_stream).emits(read_bd_addr)
184        return read_bd_addr.get().bd_addr
185
186    def initiate_connection(self, remote_addr):
187        self.send_command(
188            hci.CreateConnection(bd_addr=bluetooth.Address(remote_addr),
189                                 packet_type=0xcc18,
190                                 page_scan_repetition_mode=hci.PageScanRepetitionMode.R1,
191                                 clock_offset=0x0,
192                                 clock_offset_valid=hci.ClockOffsetValid.INVALID,
193                                 allow_role_switch=hci.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH))
194
195    def accept_connection(self):
196        connection_request = HciCaptures.ConnectionRequestCapture()
197        assertThat(self.event_stream).emits(connection_request)
198
199        self.send_command(
200            hci.AcceptConnectionRequest(bd_addr=bluetooth.Address(connection_request.get().bd_addr),
201                                        role=hci.AcceptConnectionRequestRole.REMAIN_PERIPHERAL))
202        return self.complete_connection()
203
204    def complete_connection(self):
205        connection_complete = HciCaptures.ConnectionCompleteCapture()
206        assertThat(self.event_stream).emits(connection_complete)
207
208        handle = connection_complete.get().connection_handle
209        if self.acl_stream is None:
210            raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__)
211        return PyHciAclConnection(handle, self.acl_stream, self.device)
212
213    def set_random_le_address(self, addr):
214        self.send_command(hci.LeSetRandomAddress(random_address=bluetooth.Address(addr)))
215        assertThat(self.event_stream).emits(HciMatchers.CommandComplete(hci.OpCode.LE_SET_RANDOM_ADDRESS))
216
217    def initiate_le_connection(self, remote_addr):
218        self.send_command(
219            hci.LeExtendedCreateConnection(initiator_filter_policy=hci.InitiatorFilterPolicy.USE_PEER_ADDRESS,
220                                           own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS,
221                                           peer_address_type=hci.AddressType.RANDOM_DEVICE_ADDRESS,
222                                           peer_address=bluetooth.Address(remote_addr),
223                                           initiating_phys=1,
224                                           phy_scan_parameters=[
225                                               hci.LeCreateConnPhyScanParameters(scan_interval=0x60,
226                                                                                 scan_window=0x30,
227                                                                                 conn_interval_min=0x18,
228                                                                                 conn_interval_max=0x28,
229                                                                                 conn_latency=0,
230                                                                                 supervision_timeout=0x1f4,
231                                                                                 min_ce_length=0,
232                                                                                 max_ce_length=0)
233                                           ]))
234        assertThat(self.event_stream).emits(HciMatchers.CommandStatus(hci.OpCode.LE_EXTENDED_CREATE_CONNECTION))
235
236    def incoming_le_connection(self):
237        connection_complete = HciCaptures.LeConnectionCompleteCapture()
238        assertThat(self.le_event_stream).emits(connection_complete)
239
240        handle = connection_complete.get().connection_handle
241        peer = connection_complete.get().peer_address
242        peer_type = connection_complete.get().peer_address_type
243        local_resolvable = connection_complete.get().local_resolvable_private_address
244        peer_resolvable = connection_complete.get().peer_resolvable_private_address
245        if self.acl_stream is None:
246            raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__)
247        return PyHciLeAclConnection(handle, self.acl_stream, self.device, repr(peer), peer_type, repr(peer_resolvable),
248                                    repr(local_resolvable))
249
250    def incoming_le_connection_fails(self):
251        connection_complete = HciCaptures.LeConnectionCompleteCapture()
252        assertThat(self.le_event_stream).emitsNone(connection_complete, timeout=timedelta(seconds=5))
253
254    def add_device_to_resolving_list(self, peer_address_type, peer_address, peer_irk, local_irk):
255        self.send_command(
256            hci.LeAddDeviceToResolvingList(peer_identity_address_type=peer_address_type,
257                                           peer_identity_address=bluetooth.Address(peer_address),
258                                           peer_irk=peer_irk,
259                                           local_irk=local_irk))
260
261    def create_advertisement(self,
262                             handle,
263                             own_address: str,
264                             properties=hci.LegacyAdvertisingEventProperties.ADV_IND,
265                             min_interval=400,
266                             max_interval=450,
267                             channel_map=7,
268                             own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS,
269                             peer_address_type=hci.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
270                             peer_address='00:00:00:00:00:00',
271                             filter_policy=hci.AdvertisingFilterPolicy.ALL_DEVICES,
272                             tx_power=0xF8,
273                             sid=1,
274                             scan_request_notification=hci.Enable.DISABLED):
275
276        self.send_command(
277            hci.LeSetExtendedAdvertisingParametersLegacy(advertising_handle=handle,
278                                                         legacy_advertising_event_properties=properties,
279                                                         primary_advertising_interval_min=min_interval,
280                                                         primary_advertising_interval_max=max_interval,
281                                                         primary_advertising_channel_map=channel_map,
282                                                         own_address_type=own_address_type,
283                                                         peer_address_type=peer_address_type,
284                                                         peer_address=bluetooth.Address(peer_address),
285                                                         advertising_filter_policy=filter_policy,
286                                                         advertising_tx_power=tx_power,
287                                                         advertising_sid=sid,
288                                                         scan_request_notification_enable=scan_request_notification))
289
290        self.send_command(
291            hci.LeSetAdvertisingSetRandomAddress(advertising_handle=handle,
292                                                 random_address=bluetooth.Address(own_address)))
293
294        return PyHciAdvertisement(handle, self)
295