1# Copyright 2024 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""RFCOMM grpc interface."""
15
16import asyncio
17import logging
18import os
19import socket as socket_module
20import uuid as uuid_module
21
22from floss.pandora.floss import floss_enums
23from floss.pandora.floss import socket_manager
24from floss.pandora.floss import utils
25from floss.pandora.server import bluetooth as bluetooth_module
26from google.protobuf import empty_pb2
27import grpc
28from pandora_experimental import rfcomm_grpc_aio
29from pandora_experimental import rfcomm_pb2
30
31
32class RFCOMMService(rfcomm_grpc_aio.RFCOMMServicer):
33    """Service to trigger Bluetooth RFCOMM procedures.
34
35    This class implements the Pandora bluetooth test interfaces,
36    where the meta class definition is automatically generated by the protobuf.
37    The interface definition can be found in:
38    https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/Bluetooth/pandora/interfaces/pandora_experimental/rfcomm.proto
39    """
40
41    # Size of the buffer for data transactions.
42    BUFFER_SIZE = 512
43
44    def __init__(self, bluetooth: bluetooth_module.Bluetooth):
45        self.bluetooth = bluetooth
46
47        # Used by new_stream_id() to generate IDs for the RPC client to specify the stream.
48        self.current_stream_id = 0x12FC0
49
50        # key = stream_id, val = stream
51        self.streams = dict()
52
53    def new_stream_id(self) -> int:
54        id = self.current_stream_id
55        self.current_stream_id += 1
56        return id
57
58    async def ConnectToServer(self, request: rfcomm_pb2.ConnectionRequest,
59                              context: grpc.ServicerContext) -> rfcomm_pb2.ConnectionResponse:
60
61        class CreateRFCOMMObserver(socket_manager.SocketManagerCallbacks):
62            """Observer to observe the created RFCOMM connection state."""
63
64            def __init__(self, task):
65                self.task = task
66
67            @utils.glib_callback()
68            def on_outgoing_connection_result(self, connecting_id, result, socket, *, dbus_unix_fd_list=None):
69                if connecting_id != self.task['connecting_id']:
70                    return
71
72                future = self.task['create_rfcomm_channel']
73                if result is None or floss_enums.BtStatus(result) != floss_enums.BtStatus.SUCCESS:
74                    logging.error('Failed to create the RFCOMM channel with connecting_id: %s. Status: %s',
75                                  connecting_id, result)
76                    future.get_loop().call_soon_threadsafe(future.set_result, None)
77                    return
78
79                if not socket:
80                    future.get_loop().call_soon_threadsafe(future.set_result, None)
81                    return
82
83                optional_fd = socket['optional_value']['fd']
84                if not optional_fd:
85                    future.get_loop().call_soon_threadsafe(future.set_result, None)
86                    return
87
88                if not dbus_unix_fd_list or dbus_unix_fd_list.get_length() < 1:
89                    logging.error('on_outgoing_connection_result: Empty fd list')
90                    future.get_loop().call_soon_threadsafe(future.set_result, None)
91                    return
92
93                fd_handle = optional_fd['optional_value']
94                if fd_handle > dbus_unix_fd_list.get_length():
95                    logging.error('on_outgoing_connection_result: Invalid fd handle')
96                    future.get_loop().call_soon_threadsafe(future.set_result, None)
97                    return
98
99                fd = dbus_unix_fd_list.get(fd_handle)
100                fd_dup = os.dup(fd)
101                future.get_loop().call_soon_threadsafe(future.set_result, fd_dup)
102
103        address = utils.address_from(request.address)
104        uuid = list(uuid_module.UUID(request.uuid).bytes)
105        try:
106            socket_result = self.bluetooth.create_insecure_rfcomm_socket_to_service_record(address, uuid)
107            if socket_result is None:
108                await context.abort(grpc.StatusCode.INTERNAL,
109                                    'Failed to call create_insecure_rfcomm_socket_to_service_record.')
110
111            connecting_id = socket_result['id']
112            rfcomm_channel_creation = {
113                'create_rfcomm_channel': asyncio.get_running_loop().create_future(),
114                'connecting_id': connecting_id
115            }
116            observer = CreateRFCOMMObserver(rfcomm_channel_creation)
117            name = utils.create_observer_name(observer)
118            self.bluetooth.socket_manager.register_callback_observer(name, observer)
119            fd = await asyncio.wait_for(rfcomm_channel_creation['create_rfcomm_channel'], timeout=5)
120            if fd is None:
121                await context.abort(grpc.StatusCode.INTERNAL,
122                                    f'Failed to get the fd from RFCOMM socket with connecting_id: {connecting_id}')
123
124            stream_id = self.new_stream_id()
125            stream = socket_module.fromfd(fd, socket_module.AF_UNIX, socket_module.SOCK_STREAM)
126            self.streams[stream_id] = stream
127        finally:
128            self.bluetooth.socket_manager.unregister_callback_observer(name, observer)
129
130        return rfcomm_pb2.ConnectionResponse(connection=rfcomm_pb2.RfcommConnection(id=stream_id))
131
132    async def Disconnect(self, request: rfcomm_pb2.DisconnectionRequest,
133                         context: grpc.ServicerContext) -> rfcomm_pb2.DisconnectionResponse:
134
135        stream_id = request.connection.id
136        if stream_id in self.streams:
137            stream = self.streams[stream_id]
138            try:
139                stream.shutdown(socket_module.SHUT_RDWR)
140                stream.close()
141                del self.streams[stream_id]
142            except asyncio.TimeoutError as e:
143                logging.error('Disconnect: asyncio.TimeoutError %s', e)
144        else:
145            logging.error('No stream found with ID %s', stream_id)
146
147        return empty_pb2.Empty()
148
149    async def StopServer(self, request: rfcomm_pb2.StopServerRequest,
150                         context: grpc.ServicerContext) -> rfcomm_pb2.StopServerResponse:
151
152        class StopRFCOMMSocket(socket_manager.SocketManagerCallbacks):
153            """Observer to observe stop state of RFCOMM connection."""
154
155            def __init__(self, task):
156                self.task = task
157
158            @utils.glib_callback()
159            def on_incoming_socket_closed(self, listener_id, reason):
160                if listener_id != self.task['listener_id']:
161                    return
162
163                if reason is None or floss_enums.BtStatus(reason) != floss_enums.BtStatus.SUCCESS:
164                    logging.error('Failed to stop RFCOMM channel with listener_id: %s. Status: %s', listener_id, reason)
165
166                future = self.task['stop_rfcomm_channel']
167                future.get_loop().call_soon_threadsafe(future.set_result, reason)
168
169        try:
170            listener_id = request.server.id
171            rfcomm_channel_stop = {
172                'stop_rfcomm_channel': asyncio.get_running_loop().create_future(),
173                'listener_id': listener_id
174            }
175            observer = StopRFCOMMSocket(rfcomm_channel_stop)
176            name = utils.create_observer_name(observer)
177            self.bluetooth.socket_manager.register_callback_observer(name, observer)
178            if not self.bluetooth.close_socket(listener_id):
179                await context.abort(grpc.StatusCode.INTERNAL, 'Failed to call close_socket.')
180
181            status = await asyncio.wait_for(rfcomm_channel_stop['stop_rfcomm_channel'], timeout=5)
182            if status != floss_enums.BtStatus.SUCCESS:
183                await context.abort(grpc.StatusCode.INTERNAL,
184                                    f'Failed to stop RFCOMM channel with listener_id: {listener_id}. Status: {status}')
185        finally:
186            self.bluetooth.socket_manager.unregister_callback_observer(name, observer)
187
188        return empty_pb2.Empty()
189
190    async def StartServer(self, request: rfcomm_pb2.StartServerRequest,
191                          context: grpc.ServicerContext) -> rfcomm_pb2.StartServerResponse:
192
193        class StartServerObserver(socket_manager.SocketManagerCallbacks):
194            """Observer to observe the RFCOMM server start."""
195
196            def __init__(self, task):
197                self.task = task
198
199            @utils.glib_callback()
200            def on_incoming_socket_ready(self, socket, status):
201                if not socket or 'id' not in socket:
202                    return
203
204                listener_id = socket['id']
205                if listener_id != self.task['socket_id']:
206                    return
207
208                future = self.task['start_rfcomm_server']
209                if status is None or floss_enums.BtStatus(status) != floss_enums.BtStatus.SUCCESS:
210                    logging.error('Failed listening to RFCOMM channel with socket_id: %s. Status: %s', listener_id,
211                                  status)
212                    future.get_loop().call_soon_threadsafe(future.set_result, None)
213                else:
214                    future.get_loop().call_soon_threadsafe(future.set_result, listener_id)
215
216        try:
217            uuid = list(uuid_module.UUID(request.uuid).bytes)
218            socket_result = self.bluetooth.listen_using_insecure_rfcomm_with_service_record(request.name, uuid)
219            if socket_result is None:
220                await context.abort(grpc.StatusCode.INTERNAL,
221                                    'Failed to call listen_using_insecure_rfcomm_with_service_record.')
222
223            rfcomm_channel_listener = {
224                'start_rfcomm_server': asyncio.get_running_loop().create_future(),
225                'socket_id': socket_result['id']
226            }
227            observer = StartServerObserver(rfcomm_channel_listener)
228            name = utils.create_observer_name(observer)
229            self.bluetooth.socket_manager.register_callback_observer(name, observer)
230            listener_id = await asyncio.wait_for(rfcomm_channel_listener['start_rfcomm_server'], timeout=5)
231        finally:
232            self.bluetooth.socket_manager.unregister_callback_observer(name, observer)
233
234        return rfcomm_pb2.StartServerResponse(server=rfcomm_pb2.ServerId(id=listener_id))
235
236    async def AcceptConnection(self, request: rfcomm_pb2.AcceptConnectionRequest,
237                               context: grpc.ServicerContext) -> rfcomm_pb2.AcceptConnectionResponse:
238
239        class AcceptConnectionObserver(socket_manager.SocketManagerCallbacks):
240            """Observer to observe the accepted RFCOMM connection."""
241
242            def __init__(self, task):
243                self.task = task
244
245            @utils.glib_callback()
246            def on_handle_incoming_connection(self, listener_id, connection, *, dbus_unix_fd_list=None):
247                if listener_id != self.task['listener_id']:
248                    return
249
250                future = self.task['accept_rfcomm_channel']
251                if not connection:
252                    future.get_loop().call_soon_threadsafe(future.set_result, None)
253                    return
254
255                optional_fd = connection['fd']
256                if not optional_fd:
257                    future.get_loop().call_soon_threadsafe(future.set_result, None)
258                    return
259
260                if not dbus_unix_fd_list or dbus_unix_fd_list.get_length() < 1:
261                    logging.error('on_handle_incoming_connection: Empty fd list')
262                    future.get_loop().call_soon_threadsafe(future.set_result, None)
263                    return
264
265                fd_handle = optional_fd['optional_value']
266                if fd_handle > dbus_unix_fd_list.get_length():
267                    logging.error('on_handle_incoming_connection: Invalid fd handle')
268                    future.get_loop().call_soon_threadsafe(future.set_result, None)
269                    return
270
271                fd = dbus_unix_fd_list.get(fd_handle)
272                fd_dup = os.dup(fd)
273                future.get_loop().call_soon_threadsafe(future.set_result, fd_dup)
274
275        try:
276            listener_id = request.server.id
277            rfcomm_channel_acceptance = {
278                'accept_rfcomm_channel': asyncio.get_running_loop().create_future(),
279                'listener_id': listener_id
280            }
281            observer = AcceptConnectionObserver(rfcomm_channel_acceptance)
282            name = utils.create_observer_name(observer)
283            self.bluetooth.socket_manager.register_callback_observer(name, observer)
284            accept_socket_status = self.bluetooth.accept_socket(listener_id, timeout_ms=5)
285            if accept_socket_status != floss_enums.BtStatus.SUCCESS:
286                await context.abort(
287                    grpc.StatusCode.INTERNAL, f'Failed to accept the RFCOMM socket with listener_id: {listener_id}. '
288                    f'Status: {accept_socket_status}.')
289
290            fd = await asyncio.wait_for(rfcomm_channel_acceptance['accept_rfcomm_channel'], timeout=5)
291            if fd is None:
292                await context.abort(grpc.StatusCode.INTERNAL,
293                                    f'Failed to get the fd from RFCOMM socket with listener_id: {listener_id}')
294
295            stream_id = self.new_stream_id()
296            stream = socket_module.fromfd(fd, socket_module.AF_UNIX, socket_module.SOCK_STREAM)
297            self.streams[stream_id] = stream
298
299        except asyncio.TimeoutError as e:
300            logging.error('AcceptConnection: asyncio.TimeoutError %s', e)
301            return rfcomm_pb2.AcceptConnectionResponse(connection=rfcomm_pb2.RfcommConnection(id=None))
302        finally:
303            self.bluetooth.socket_manager.unregister_callback_observer(name, observer)
304
305        return rfcomm_pb2.AcceptConnectionResponse(connection=rfcomm_pb2.RfcommConnection(id=stream_id))
306
307    async def Send(self, request: rfcomm_pb2.TxRequest, context: grpc.ServicerContext) -> rfcomm_pb2.TxResponse:
308        stream_id = request.connection.id
309        output_stream = self.streams.get(stream_id)
310        if output_stream:
311            try:
312                output_stream.send(request.data)
313
314            except Exception as e:
315                logging.error('Exception during writing to output stream %s', e)
316        else:
317            logging.error('Output stream: %s not found for the stream_id: %s', output_stream, stream_id)
318
319        return empty_pb2.Empty()
320
321    async def Receive(self, request: rfcomm_pb2.RxRequest, context: grpc.ServicerContext) -> rfcomm_pb2.RxResponse:
322        stream_id = request.connection.id
323        input_stream = self.streams.get(stream_id)
324        if input_stream:
325            try:
326                data = input_stream.recv(self.BUFFER_SIZE)
327                if data:
328                    return rfcomm_pb2.RxResponse(data=bytes(data))
329            except Exception as e:
330                logging.error('Exception during reading from input stream %s', e)
331        else:
332            logging.error('Input stream: %s not found for the stream_id: %s', input_stream, stream_id)
333
334        # Return an empty byte array.
335        return rfcomm_pb2.RxResponse(data=b'')
336