1# Copyright 2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Security grpc interface."""
15
16import asyncio
17import logging
18from typing import AsyncGenerator
19from typing import AsyncIterator
20
21from floss.pandora.floss import adapter_client
22from floss.pandora.floss import floss_enums
23from floss.pandora.floss import utils
24from floss.pandora.server import bluetooth as bluetooth_module
25from google.protobuf import empty_pb2
26from google.protobuf import wrappers_pb2
27import grpc
28from pandora import security_grpc_aio
29from pandora import security_pb2
30
31
32class SecurityService(security_grpc_aio.SecurityServicer):
33    """Service to trigger Bluetooth Host security pairing procedures.
34
35    This class implements the Pandora bluetooth test interfaces,
36    where the meta class definition is automatically generated by the protobuf.
37    The interface definition can be found in:
38    https://cs.android.com/android/platform/superproject/+/main:external
39    /pandora/bt-test-interfaces/pandora/security.proto
40    """
41
42    def __init__(self, bluetooth: bluetooth_module.Bluetooth):
43        self.bluetooth = bluetooth
44        self.manually_confirm = False
45        self.on_pairing_count = 0
46
47        class PairingObserver(adapter_client.BluetoothCallbacks):
48            """Observer to observe pairing events."""
49
50            def __init__(self, client: adapter_client, security: security_grpc_aio.SecurityServicer):
51                self.client = client
52                self.security = security
53
54            @utils.glib_callback()
55            def on_ssp_request(self, remote_device, class_of_device, variant, passkey):
56                if self.security.manually_confirm:
57                    return
58
59                logging.info("Security: on_ssp_request variant: %s passkey: %s", variant, passkey)
60                address, _ = remote_device
61
62                if variant in (floss_enums.PairingVariant.CONSENT, floss_enums.PairingVariant.PASSKEY_CONFIRMATION):
63                    self.client.set_pairing_confirmation(address,
64                                                         True,
65                                                         method_callback=self.on_set_pairing_confirmation)
66
67            @utils.glib_callback()
68            def on_set_pairing_confirmation(self, err, result):
69                if err or not result:
70                    logging.info('Security: on_set_pairing_confirmation failed. err: %s result: %s', err, result)
71
72        observer = PairingObserver(self.bluetooth.adapter_client, self)
73        name = utils.create_observer_name(observer)
74        self.bluetooth.adapter_client.register_callback_observer(name, observer)
75        self.pairing_observer = observer
76
77    async def wait_le_security_level(self, level, address):
78
79        class BondingObserver(adapter_client.BluetoothCallbacks):
80            """Observer to observe the bond state."""
81
82            def __init__(self, task):
83                self.task = task
84
85            @utils.glib_callback()
86            def on_bond_state_changed(self, status, address, state):
87                if address != self.task['address']:
88                    return
89
90                future = self.task['wait_bond']
91                if status != floss_enums.BtStatus.SUCCESS:
92                    future.get_loop().call_soon_threadsafe(future.set_result, (False, f'Status: {status}'))
93                    return
94
95                if state == floss_enums.BondState.BONDED:
96                    future.get_loop().call_soon_threadsafe(future.set_result, (True, None))
97                elif state == floss_enums.BondState.NOT_BONDED:
98                    future.get_loop().call_soon_threadsafe(future.set_result,
99                                                           (False, f'Status: {status}, State: {state}'))
100
101        if level == security_pb2.LE_LEVEL1:
102            return True
103        if level == security_pb2.LE_LEVEL4:
104            logging.error('wait_le_security_level: Low-energy level 4 not supported.')
105            return False
106
107        if self.bluetooth.is_bonded(address):
108            is_bonded = True
109        else:
110            try:
111                wait_bond = asyncio.get_running_loop().create_future()
112                observer = BondingObserver({'wait_bond': wait_bond, 'address': address})
113                name = utils.create_observer_name(observer)
114                self.bluetooth.adapter_client.register_callback_observer(name, observer)
115                is_bonded, reason = await wait_bond
116                if not is_bonded:
117                    logging.error('Failed to bond to the address: %s, reason: %s', address, reason)
118            finally:
119                self.bluetooth.adapter_client.unregister_callback_observer(name, observer)
120
121        is_encrypted = self.bluetooth.is_encrypted(address)
122        if level == security_pb2.LE_LEVEL2:
123            return is_encrypted
124        if level == security_pb2.LE_LEVEL3:
125            return is_encrypted and is_bonded
126
127        logging.error('wait_le_security_level: Invalid security level %s.', level)
128        return False
129
130    async def wait_classic_security_level(self, level, address):
131
132        class BondingObserver(adapter_client.BluetoothCallbacks):
133            """Observer to observe the bond state"""
134
135            def __init__(self, task):
136                self.task = task
137
138            @utils.glib_callback()
139            def on_bond_state_changed(self, status, address, state):
140                if address != self.task['address']:
141                    return
142
143                future = self.task['wait_bond']
144                if status != floss_enums.BtStatus.SUCCESS:
145                    future.get_loop().call_soon_threadsafe(future.set_result, (False, f'Status: {status}'))
146                    return
147
148                if state == floss_enums.BondState.BONDED:
149                    future.get_loop().call_soon_threadsafe(future.set_result, (True, None))
150                elif state == floss_enums.BondState.NOT_BONDED:
151                    future.get_loop().call_soon_threadsafe(future.set_result,
152                                                           (False, f'Status: {status}, State: {state}'))
153
154        if level == security_pb2.LEVEL0:
155            return True
156        if level == security_pb2.LEVEL3:
157            logging.error('wait_classic_security_level: Classic level 3 not supported')
158            return False
159
160        if self.bluetooth.is_bonded(address):
161            is_bonded = True
162        else:
163            try:
164                wait_bond = asyncio.get_running_loop().create_future()
165                observer = BondingObserver({'wait_bond': wait_bond, 'address': address})
166                name = utils.create_observer_name(observer)
167                self.bluetooth.adapter_client.register_callback_observer(name, observer)
168                is_bonded, reason = await wait_bond
169                if not is_bonded:
170                    logging.error('Failed to bond to the address: %s, reason: %s', address, reason)
171            finally:
172                self.bluetooth.adapter_client.unregister_callback_observer(name, observer)
173
174        is_encrypted = self.bluetooth.is_encrypted(address)
175        if level == security_pb2.LEVEL1:
176            return not is_encrypted or is_bonded
177        if level == security_pb2.LEVEL2:
178            return is_encrypted and is_bonded
179        return False
180
181    async def OnPairing(self, request: AsyncIterator[security_pb2.PairingEventAnswer],
182                        context: grpc.ServicerContext) -> AsyncGenerator[security_pb2.PairingEvent, None]:
183        logging.info('OnPairing')
184        on_pairing_id = self.on_pairing_count
185        self.on_pairing_count = self.on_pairing_count + 1
186
187        class PairingObserver(adapter_client.BluetoothCallbacks):
188            """Observer to observe all pairing events."""
189
190            def __init__(self, loop: asyncio.AbstractEventLoop, task):
191                self.loop = loop
192                self.task = task
193
194            @utils.glib_callback()
195            def on_ssp_request(self, remote_device, class_of_device, variant, passkey):
196                address, name = remote_device
197
198                result = (address, name, variant, passkey)
199                asyncio.run_coroutine_threadsafe(self.task['pairing_events'].put(result), self.loop)
200
201            @utils.glib_callback()
202            def on_pin_request(self, remote_device, cod, min_16_digit):
203                address, name = remote_device
204
205                if min_16_digit:
206                    variant = floss_enums.PairingVariant.PIN_16_DIGITS_ENTRY
207                else:
208                    variant = floss_enums.PairingVariant.PIN_ENTRY
209                result = (address, name, variant, min_16_digit)
210                asyncio.run_coroutine_threadsafe(self.task['pairing_events'].put(result), self.loop)
211
212            @utils.glib_callback()
213            def on_pin_display(self, remote_device, pincode):
214                address, name = remote_device
215
216                variant = floss_enums.PairingVariant.PIN_NOTIFICATION
217                result = (address, name, variant, pincode)
218                asyncio.run_coroutine_threadsafe(self.task['pairing_events'].put(result), self.loop)
219
220        pairing_answers = request
221
222        async def streaming_answers(self):
223            while True:
224                nonlocal pairing_answers
225                nonlocal on_pairing_id
226
227                logging.info('OnPairing[%s]: Wait for pairing answer...', on_pairing_id)
228                pairing_answer = await utils.anext(pairing_answers)
229
230                answer = pairing_answer.WhichOneof('answer')
231                address = utils.address_from(pairing_answer.event.address)
232                logging.info('OnPairing[%s]: Pairing answer: %s address: %s', on_pairing_id, answer, address)
233
234                if answer == 'confirm':
235                    self.bluetooth.set_pairing_confirmation(address, True)
236                elif answer == 'passkey':
237                    self.bluetooth.set_pin(address, True, list(str(answer.passkey).zfill(6).encode()))
238                elif answer == 'pin':
239                    self.bluetooth.set_pin(address, True, list(answer.pin))
240
241        observers = []
242        try:
243            self.manually_confirm = True
244
245            pairing_events = asyncio.Queue()
246            observer = PairingObserver(asyncio.get_running_loop(), {'pairing_events': pairing_events})
247            name = utils.create_observer_name(observer)
248            self.bluetooth.adapter_client.register_callback_observer(name, observer)
249            observers.append((name, observer))
250
251            streaming_answers_task = asyncio.create_task(streaming_answers(self))
252
253            while True:
254                logging.info('OnPairing[%s]: Wait for pairing events...', on_pairing_id)
255                address, name, variant, *variables = await pairing_events.get()
256                logging.info('OnPairing[%s]: Pairing event: address: %s, name: %s, variant: %s, variables: %s',
257                             on_pairing_id, address, name, variant, variables)
258
259                event = security_pb2.PairingEvent()
260                event.address = utils.address_to(address)
261
262                # SSP
263                if variant == floss_enums.PairingVariant.PASSKEY_CONFIRMATION:
264                    [passkey] = variables
265                    event.numeric_comparison = passkey
266                elif variant == floss_enums.PairingVariant.PASSKEY_ENTRY:
267                    event.passkey_entry_request.CopyFrom(empty_pb2.Empty())
268                elif variant == floss_enums.PairingVariant.CONSENT:
269                    event.just_works.CopyFrom(empty_pb2.Empty())
270                elif variant == floss_enums.PairingVariant.PASSKEY_NOTIFICATION:
271                    [passkey] = variables
272                    event.passkey_entry_notification = passkey
273                # Legacy
274                elif variant == floss_enums.PairingVariant.PIN_ENTRY:
275                    transport = self.bluetooth.get_remote_type(address)
276
277                    if transport == floss_enums.BtTransport.BREDR:
278                        event.pin_code_request.CopyFrom(empty_pb2.Empty())
279                    elif transport == floss_enums.BtTransport.LE:
280                        event.passkey_entry_request.CopyFrom(empty_pb2.Empty())
281                    else:
282                        logging.error('Cannot determine pairing variant from unknown transport.')
283                        continue
284                elif variant == floss_enums.PairingVariant.PIN_16_DIGITS_ENTRY:
285                    event.pin_code_request.CopyFrom(empty_pb2.Empty())
286                elif variant == floss_enums.PairingVarint.PIN_NOTIFICATION:
287                    transport = self.bluetooth.get_remote_type(address)
288                    [pincode] = variables
289
290                    if transport == floss_enums.BtTransport.BREDR:
291                        event.pin_code_notification = pincode.encode()
292                    elif transport == floss_enums.BtTransport.LE:
293                        event.passkey_entry_notification = int(pincode)
294                    else:
295                        logging.error('Cannot determine pairing variant from unknown transport.')
296                        continue
297                else:
298                    logging.error('Unknown pairing variant: %s', variant)
299                    continue
300
301                yield event
302        finally:
303            streaming_answers_task.cancel()
304            for name, observer in observers:
305                self.bluetooth.adapter_client.unregister_callback_observer(name, observer)
306
307            pairing_events = None
308            pairing_answers = None
309
310    async def Secure(self, request: security_pb2.SecureRequest,
311                     context: grpc.ServicerContext) -> security_pb2.SecureResponse:
312        connection = utils.connection_from(request.connection)
313        address = connection.address
314        transport = connection.transport
315
316        if transport == floss_enums.BtTransport.LE:
317            if not request.HasField('le'):
318                await context.abort(grpc.StatusCode.INVALID_ARGUMENT, 'Request le field must be set.')
319            if request.le == security_pb2.LE_LEVEL1:
320                security_level_reached = True
321            elif request.le == security_pb2.LE_LEVEL4:
322                await context.abort(grpc.StatusCode.INVALID_ARGUMENT, 'Low-energy security level 4 is not supported.')
323            else:
324                if not self.bluetooth.is_bonded(address):
325                    self.bluetooth.create_bond(address, transport)
326                security_level_reached = await self.wait_le_security_level(request.le, address)
327        elif transport == floss_enums.BtTransport.BREDR:
328            if not request.HasField('classic'):
329                await context.abort(grpc.StatusCode.INVALID_ARGUMENT, 'Request classic field must be set.')
330            if request.classic == security_pb2.LEVEL0:
331                security_level_reached = True
332            elif request.classic >= security_pb2.LEVEL3:
333                await context.abort(grpc.StatusCode.INVALID_ARGUMENT,
334                                    'Classic security level up to 3 is not supported.')
335            else:
336                if not self.bluetooth.is_bonded(address):
337                    self.bluetooth.create_bond(address, transport)
338                security_level_reached = await self.wait_classic_security_level(request.classic, address)
339        else:
340            await context.abort(grpc.StatusCode.INVALID_ARGUMENT, f'Invalid bluetooth transport type: {transport}.')
341
342        secure_response = security_pb2.SecureResponse()
343        if security_level_reached:
344            secure_response.success.CopyFrom(empty_pb2.Empty())
345        else:
346            secure_response.not_reached.CopyFrom(empty_pb2.Empty())
347        return secure_response
348
349    async def WaitSecurity(self, request: security_pb2.WaitSecurityRequest,
350                           context: grpc.ServicerContext) -> security_pb2.WaitSecurityResponse:
351        address = utils.connection_from(request.connection).address
352        transport = floss_enums.BtTransport.BREDR if request.HasField('classic') else floss_enums.BtTransport.LE
353
354        if transport == floss_enums.BtTransport.LE:
355            security_level_reached = await self.wait_le_security_level(request.le, address)
356        elif transport == floss_enums.BtTransport.BREDR:
357            security_level_reached = await self.wait_classic_security_level(request.classic, address)
358        else:
359            await context.abort(grpc.StatusCode.INVALID_ARGUMENT, f'Invalid bluetooth transport type: {transport}.')
360
361        wait_security_response = security_pb2.WaitSecurityResponse()
362        if security_level_reached:
363            wait_security_response.success.CopyFrom(empty_pb2.Empty())
364        else:
365            wait_security_response.pairing_failure.CopyFrom(empty_pb2.Empty())
366        return wait_security_response
367
368
369class SecurityStorageService(security_grpc_aio.SecurityStorageServicer):
370    """Service to trigger Bluetooth Host security persistent storage procedures.
371
372    This class implements the Pandora bluetooth test interfaces,
373    where the meta class definition is automatically generated by the protobuf.
374    The interface definition can be found in:
375    https://cs.android.com/android/platform/superproject/+/main:external
376    /pandora/bt-test-interfaces/pandora/security.proto
377    """
378
379    def __init__(self, bluetooth: bluetooth_module.Bluetooth):
380        self.bluetooth = bluetooth
381
382    async def IsBonded(self, request: security_pb2.IsBondedRequest,
383                       context: grpc.ServicerContext) -> wrappers_pb2.BoolValue:
384
385        address = utils.address_from(request.address)
386        is_bonded = self.bluetooth.is_bonded(address)
387        return wrappers_pb2.BoolValue(value=is_bonded)
388
389    async def DeleteBond(self, request: security_pb2.DeleteBondRequest,
390                         context: grpc.ServicerContext) -> empty_pb2.Empty:
391
392        class BondingObserver(adapter_client.BluetoothCallbacks):
393            """Observer to observe the bond state"""
394
395            def __init__(self, task):
396                self.task = task
397
398            @utils.glib_callback()
399            def on_bond_state_changed(self, status, address, state):
400                if address != self.task['address']:
401                    return
402
403                future = self.task['remove_bond']
404                if status != 0:
405                    future.get_loop().call_soon_threadsafe(future.set_result,
406                                                           (False, f'{address} failed to remove bond. Status: {status},'
407                                                            f' State: {state}'))
408                    return
409
410                if state == floss_enums.BondState.NOT_BONDED:
411                    future.get_loop().call_soon_threadsafe(future.set_result, (True, None))
412                else:
413                    future.get_loop().call_soon_threadsafe(
414                        future.set_result, (False, f'{address} failed on remove_bond, got bond state {state},'
415                                            f' want {floss_enums.BondState.NOT_BONDED}'))
416
417        address = utils.address_from(request.address)
418        if not self.bluetooth.is_bonded(address):
419            return empty_pb2.Empty()
420        try:
421            remove_bond = asyncio.get_running_loop().create_future()
422            observer = BondingObserver({'remove_bond': remove_bond, 'address': address})
423            name = utils.create_observer_name(observer)
424            self.bluetooth.adapter_client.register_callback_observer(name, observer)
425            self.bluetooth.remove_bond(address)
426            success, reason = await remove_bond
427            if not success:
428                await context.abort(grpc.StatusCode.INVALID_ARGUMENT,
429                                    f'Failed to remove bond of address: {address}. Reason: {reason}.')
430        finally:
431            self.bluetooth.adapter_client.unregister_callback_observer(name, observer)
432        return empty_pb2.Empty()
433