1# Copyright 2024 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""RFCOMM grpc interface.""" 15 16import asyncio 17import logging 18import os 19import socket as socket_module 20import uuid as uuid_module 21 22from floss.pandora.floss import floss_enums 23from floss.pandora.floss import socket_manager 24from floss.pandora.floss import utils 25from floss.pandora.server import bluetooth as bluetooth_module 26from google.protobuf import empty_pb2 27import grpc 28from pandora_experimental import rfcomm_grpc_aio 29from pandora_experimental import rfcomm_pb2 30 31 32class RFCOMMService(rfcomm_grpc_aio.RFCOMMServicer): 33 """Service to trigger Bluetooth RFCOMM procedures. 34 35 This class implements the Pandora bluetooth test interfaces, 36 where the meta class definition is automatically generated by the protobuf. 37 The interface definition can be found in: 38 https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/Bluetooth/pandora/interfaces/pandora_experimental/rfcomm.proto 39 """ 40 41 # Size of the buffer for data transactions. 42 BUFFER_SIZE = 512 43 44 def __init__(self, bluetooth: bluetooth_module.Bluetooth): 45 self.bluetooth = bluetooth 46 47 # Used by new_stream_id() to generate IDs for the RPC client to specify the stream. 48 self.current_stream_id = 0x12FC0 49 50 # key = stream_id, val = stream 51 self.streams = dict() 52 53 def new_stream_id(self) -> int: 54 id = self.current_stream_id 55 self.current_stream_id += 1 56 return id 57 58 async def ConnectToServer(self, request: rfcomm_pb2.ConnectionRequest, 59 context: grpc.ServicerContext) -> rfcomm_pb2.ConnectionResponse: 60 61 class CreateRFCOMMObserver(socket_manager.SocketManagerCallbacks): 62 """Observer to observe the created RFCOMM connection state.""" 63 64 def __init__(self, task): 65 self.task = task 66 67 @utils.glib_callback() 68 def on_outgoing_connection_result(self, connecting_id, result, socket, *, dbus_unix_fd_list=None): 69 if connecting_id != self.task['connecting_id']: 70 return 71 72 future = self.task['create_rfcomm_channel'] 73 if result is None or floss_enums.BtStatus(result) != floss_enums.BtStatus.SUCCESS: 74 logging.error('Failed to create the RFCOMM channel with connecting_id: %s. Status: %s', 75 connecting_id, result) 76 future.get_loop().call_soon_threadsafe(future.set_result, None) 77 return 78 79 if not socket: 80 future.get_loop().call_soon_threadsafe(future.set_result, None) 81 return 82 83 optional_fd = socket['optional_value']['fd'] 84 if not optional_fd: 85 future.get_loop().call_soon_threadsafe(future.set_result, None) 86 return 87 88 if not dbus_unix_fd_list or dbus_unix_fd_list.get_length() < 1: 89 logging.error('on_outgoing_connection_result: Empty fd list') 90 future.get_loop().call_soon_threadsafe(future.set_result, None) 91 return 92 93 fd_handle = optional_fd['optional_value'] 94 if fd_handle > dbus_unix_fd_list.get_length(): 95 logging.error('on_outgoing_connection_result: Invalid fd handle') 96 future.get_loop().call_soon_threadsafe(future.set_result, None) 97 return 98 99 fd = dbus_unix_fd_list.get(fd_handle) 100 fd_dup = os.dup(fd) 101 future.get_loop().call_soon_threadsafe(future.set_result, fd_dup) 102 103 address = utils.address_from(request.address) 104 uuid = list(uuid_module.UUID(request.uuid).bytes) 105 try: 106 socket_result = self.bluetooth.create_insecure_rfcomm_socket_to_service_record(address, uuid) 107 if socket_result is None: 108 await context.abort(grpc.StatusCode.INTERNAL, 109 'Failed to call create_insecure_rfcomm_socket_to_service_record.') 110 111 connecting_id = socket_result['id'] 112 rfcomm_channel_creation = { 113 'create_rfcomm_channel': asyncio.get_running_loop().create_future(), 114 'connecting_id': connecting_id 115 } 116 observer = CreateRFCOMMObserver(rfcomm_channel_creation) 117 name = utils.create_observer_name(observer) 118 self.bluetooth.socket_manager.register_callback_observer(name, observer) 119 fd = await asyncio.wait_for(rfcomm_channel_creation['create_rfcomm_channel'], timeout=5) 120 if fd is None: 121 await context.abort(grpc.StatusCode.INTERNAL, 122 f'Failed to get the fd from RFCOMM socket with connecting_id: {connecting_id}') 123 124 stream_id = self.new_stream_id() 125 stream = socket_module.fromfd(fd, socket_module.AF_UNIX, socket_module.SOCK_STREAM) 126 self.streams[stream_id] = stream 127 finally: 128 self.bluetooth.socket_manager.unregister_callback_observer(name, observer) 129 130 return rfcomm_pb2.ConnectionResponse(connection=rfcomm_pb2.RfcommConnection(id=stream_id)) 131 132 async def Disconnect(self, request: rfcomm_pb2.DisconnectionRequest, 133 context: grpc.ServicerContext) -> rfcomm_pb2.DisconnectionResponse: 134 135 stream_id = request.connection.id 136 if stream_id in self.streams: 137 stream = self.streams[stream_id] 138 try: 139 stream.shutdown(socket_module.SHUT_RDWR) 140 stream.close() 141 del self.streams[stream_id] 142 except asyncio.TimeoutError as e: 143 logging.error('Disconnect: asyncio.TimeoutError %s', e) 144 else: 145 logging.error('No stream found with ID %s', stream_id) 146 147 return empty_pb2.Empty() 148 149 async def StopServer(self, request: rfcomm_pb2.StopServerRequest, 150 context: grpc.ServicerContext) -> rfcomm_pb2.StopServerResponse: 151 152 class StopRFCOMMSocket(socket_manager.SocketManagerCallbacks): 153 """Observer to observe stop state of RFCOMM connection.""" 154 155 def __init__(self, task): 156 self.task = task 157 158 @utils.glib_callback() 159 def on_incoming_socket_closed(self, listener_id, reason): 160 if listener_id != self.task['listener_id']: 161 return 162 163 if reason is None or floss_enums.BtStatus(reason) != floss_enums.BtStatus.SUCCESS: 164 logging.error('Failed to stop RFCOMM channel with listener_id: %s. Status: %s', listener_id, reason) 165 166 future = self.task['stop_rfcomm_channel'] 167 future.get_loop().call_soon_threadsafe(future.set_result, reason) 168 169 try: 170 listener_id = request.server.id 171 rfcomm_channel_stop = { 172 'stop_rfcomm_channel': asyncio.get_running_loop().create_future(), 173 'listener_id': listener_id 174 } 175 observer = StopRFCOMMSocket(rfcomm_channel_stop) 176 name = utils.create_observer_name(observer) 177 self.bluetooth.socket_manager.register_callback_observer(name, observer) 178 if not self.bluetooth.close_socket(listener_id): 179 await context.abort(grpc.StatusCode.INTERNAL, 'Failed to call close_socket.') 180 181 status = await asyncio.wait_for(rfcomm_channel_stop['stop_rfcomm_channel'], timeout=5) 182 if status != floss_enums.BtStatus.SUCCESS: 183 await context.abort(grpc.StatusCode.INTERNAL, 184 f'Failed to stop RFCOMM channel with listener_id: {listener_id}. Status: {status}') 185 finally: 186 self.bluetooth.socket_manager.unregister_callback_observer(name, observer) 187 188 return empty_pb2.Empty() 189 190 async def StartServer(self, request: rfcomm_pb2.StartServerRequest, 191 context: grpc.ServicerContext) -> rfcomm_pb2.StartServerResponse: 192 193 class StartServerObserver(socket_manager.SocketManagerCallbacks): 194 """Observer to observe the RFCOMM server start.""" 195 196 def __init__(self, task): 197 self.task = task 198 199 @utils.glib_callback() 200 def on_incoming_socket_ready(self, socket, status): 201 if not socket or 'id' not in socket: 202 return 203 204 listener_id = socket['id'] 205 if listener_id != self.task['socket_id']: 206 return 207 208 future = self.task['start_rfcomm_server'] 209 if status is None or floss_enums.BtStatus(status) != floss_enums.BtStatus.SUCCESS: 210 logging.error('Failed listening to RFCOMM channel with socket_id: %s. Status: %s', listener_id, 211 status) 212 future.get_loop().call_soon_threadsafe(future.set_result, None) 213 else: 214 future.get_loop().call_soon_threadsafe(future.set_result, listener_id) 215 216 try: 217 uuid = list(uuid_module.UUID(request.uuid).bytes) 218 socket_result = self.bluetooth.listen_using_insecure_rfcomm_with_service_record(request.name, uuid) 219 if socket_result is None: 220 await context.abort(grpc.StatusCode.INTERNAL, 221 'Failed to call listen_using_insecure_rfcomm_with_service_record.') 222 223 rfcomm_channel_listener = { 224 'start_rfcomm_server': asyncio.get_running_loop().create_future(), 225 'socket_id': socket_result['id'] 226 } 227 observer = StartServerObserver(rfcomm_channel_listener) 228 name = utils.create_observer_name(observer) 229 self.bluetooth.socket_manager.register_callback_observer(name, observer) 230 listener_id = await asyncio.wait_for(rfcomm_channel_listener['start_rfcomm_server'], timeout=5) 231 finally: 232 self.bluetooth.socket_manager.unregister_callback_observer(name, observer) 233 234 return rfcomm_pb2.StartServerResponse(server=rfcomm_pb2.ServerId(id=listener_id)) 235 236 async def AcceptConnection(self, request: rfcomm_pb2.AcceptConnectionRequest, 237 context: grpc.ServicerContext) -> rfcomm_pb2.AcceptConnectionResponse: 238 239 class AcceptConnectionObserver(socket_manager.SocketManagerCallbacks): 240 """Observer to observe the accepted RFCOMM connection.""" 241 242 def __init__(self, task): 243 self.task = task 244 245 @utils.glib_callback() 246 def on_handle_incoming_connection(self, listener_id, connection, *, dbus_unix_fd_list=None): 247 if listener_id != self.task['listener_id']: 248 return 249 250 future = self.task['accept_rfcomm_channel'] 251 if not connection: 252 future.get_loop().call_soon_threadsafe(future.set_result, None) 253 return 254 255 optional_fd = connection['fd'] 256 if not optional_fd: 257 future.get_loop().call_soon_threadsafe(future.set_result, None) 258 return 259 260 if not dbus_unix_fd_list or dbus_unix_fd_list.get_length() < 1: 261 logging.error('on_handle_incoming_connection: Empty fd list') 262 future.get_loop().call_soon_threadsafe(future.set_result, None) 263 return 264 265 fd_handle = optional_fd['optional_value'] 266 if fd_handle > dbus_unix_fd_list.get_length(): 267 logging.error('on_handle_incoming_connection: Invalid fd handle') 268 future.get_loop().call_soon_threadsafe(future.set_result, None) 269 return 270 271 fd = dbus_unix_fd_list.get(fd_handle) 272 fd_dup = os.dup(fd) 273 future.get_loop().call_soon_threadsafe(future.set_result, fd_dup) 274 275 try: 276 listener_id = request.server.id 277 rfcomm_channel_acceptance = { 278 'accept_rfcomm_channel': asyncio.get_running_loop().create_future(), 279 'listener_id': listener_id 280 } 281 observer = AcceptConnectionObserver(rfcomm_channel_acceptance) 282 name = utils.create_observer_name(observer) 283 self.bluetooth.socket_manager.register_callback_observer(name, observer) 284 accept_socket_status = self.bluetooth.accept_socket(listener_id, timeout_ms=5) 285 if accept_socket_status != floss_enums.BtStatus.SUCCESS: 286 await context.abort( 287 grpc.StatusCode.INTERNAL, f'Failed to accept the RFCOMM socket with listener_id: {listener_id}. ' 288 f'Status: {accept_socket_status}.') 289 290 fd = await asyncio.wait_for(rfcomm_channel_acceptance['accept_rfcomm_channel'], timeout=5) 291 if fd is None: 292 await context.abort(grpc.StatusCode.INTERNAL, 293 f'Failed to get the fd from RFCOMM socket with listener_id: {listener_id}') 294 295 stream_id = self.new_stream_id() 296 stream = socket_module.fromfd(fd, socket_module.AF_UNIX, socket_module.SOCK_STREAM) 297 self.streams[stream_id] = stream 298 299 except asyncio.TimeoutError as e: 300 logging.error('AcceptConnection: asyncio.TimeoutError %s', e) 301 return rfcomm_pb2.AcceptConnectionResponse(connection=rfcomm_pb2.RfcommConnection(id=None)) 302 finally: 303 self.bluetooth.socket_manager.unregister_callback_observer(name, observer) 304 305 return rfcomm_pb2.AcceptConnectionResponse(connection=rfcomm_pb2.RfcommConnection(id=stream_id)) 306 307 async def Send(self, request: rfcomm_pb2.TxRequest, context: grpc.ServicerContext) -> rfcomm_pb2.TxResponse: 308 stream_id = request.connection.id 309 output_stream = self.streams.get(stream_id) 310 if output_stream: 311 try: 312 output_stream.send(request.data) 313 314 except Exception as e: 315 logging.error('Exception during writing to output stream %s', e) 316 else: 317 logging.error('Output stream: %s not found for the stream_id: %s', output_stream, stream_id) 318 319 return empty_pb2.Empty() 320 321 async def Receive(self, request: rfcomm_pb2.RxRequest, context: grpc.ServicerContext) -> rfcomm_pb2.RxResponse: 322 stream_id = request.connection.id 323 input_stream = self.streams.get(stream_id) 324 if input_stream: 325 try: 326 data = input_stream.recv(self.BUFFER_SIZE) 327 if data: 328 return rfcomm_pb2.RxResponse(data=bytes(data)) 329 except Exception as e: 330 logging.error('Exception during reading from input stream %s', e) 331 else: 332 logging.error('Input stream: %s not found for the stream_id: %s', input_stream, stream_id) 333 334 # Return an empty byte array. 335 return rfcomm_pb2.RxResponse(data=b'') 336