1# Copyright (C) 2024 The Android Open Source Project 2# Licensed under the Apache License, Version 2.0 (the "License"); 3# you may not use this file except in compliance with the License. 4# You may obtain a copy of the License at 5# 6# http://www.apache.org/licenses/LICENSE-2.0 7# 8# Unless required by applicable law or agreed to in writing, software 9# distributed under the License is distributed on an "AS IS" BASIS, 10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11# See the License for the specific language governing permissions and 12# limitations under the License. 13 14import asyncio 15import logging 16from typing import Dict, Optional 17 18from bumble import core 19from bumble.device import Device 20from bumble.rfcomm import ( 21 Server, 22 make_service_sdp_records, 23 DLC, 24) 25from bumble.pandora import utils 26import grpc 27from pandora_experimental.rfcomm_grpc_aio import RFCOMMServicer 28from pandora_experimental.rfcomm_pb2 import ( 29 AcceptConnectionRequest, 30 AcceptConnectionResponse, 31 ConnectionRequest, 32 ConnectionResponse, 33 RfcommConnection, 34 RxRequest, 35 RxResponse, 36 ServerId, 37 StartServerRequest, 38 StartServerResponse, 39 StopServerRequest, 40 StopServerResponse, 41 TxRequest, 42 TxResponse, 43) 44 45 46class RFCOMMService(RFCOMMServicer): 47 #TODO Add support for multiple servers 48 device: Device 49 server_id: Optional[ServerId] 50 server: Optional[Server] 51 52 def __init__(self, device: Device) -> None: 53 super().__init__() 54 self.device = device 55 self.server_id = None 56 self.server = None 57 self.server_name = None 58 self.server_uuid = None 59 self.connections = {} # key = id, value = dlc 60 self.next_server_id = 1 61 self.next_conn_id = 1 62 self.open_channel = None 63 self.wait_dlc = None 64 self.dlc = None 65 self.data_queue = asyncio.Queue() 66 67 @utils.rpc 68 async def StartServer(self, request: StartServerRequest, context: grpc.ServicerContext) -> StartServerResponse: 69 logging.info(f"StartServer") 70 if self.server_id: 71 logging.warning(f"Server already started, returning existing server") 72 return StartServerResponse(server=self.server_id) 73 else: 74 self.server_id = ServerId(id=self.next_server_id) 75 self.next_server_id += 1 76 self.server = Server(self.device) 77 self.server_name = request.name 78 self.server_uuid = core.UUID(request.uuid) 79 self.wait_dlc = asyncio.get_running_loop().create_future() 80 handle = 1 81 #TODO Add support for multiple clients 82 self.open_channel = self.server.listen(acceptor=self.wait_dlc.set_result, channel=2) 83 records = make_service_sdp_records(handle, self.open_channel, self.server_uuid) 84 self.device.sdp_service_records[handle] = records 85 return StartServerResponse(server=self.server_id) 86 87 @utils.rpc 88 async def AcceptConnection(self, request: AcceptConnectionRequest, 89 context: grpc.ServicerContext) -> AcceptConnectionResponse: 90 logging.info(f"AcceptConnection") 91 assert self.server_id.id == request.server.id 92 self.dlc = await self.wait_dlc 93 self.dlc.sink = self.data_queue.put_nowait 94 new_conn = RfcommConnection(id=self.next_conn_id) 95 self.next_conn_id += 1 96 self.connections[new_conn.id] = self.dlc 97 return AcceptConnectionResponse(connection=new_conn) 98 99 @utils.rpc 100 async def StopServer(self, request: StopServerRequest, context: grpc.ServicerContext) -> StopServerResponse: 101 logging.info(f"StopServer") 102 assert self.server_id.id == request.server.id 103 self.server = None 104 self.server_id = None 105 self.server_name = None 106 self.server_uuid = None 107 108 return StopServerResponse() 109 110 @utils.rpc 111 async def Send(self, request: TxRequest, context: grpc.ServicerContext) -> TxResponse: 112 logging.info(f"Send") 113 dlc = self.connections[request.connection.id] 114 dlc.write(request.data) 115 return TxResponse() 116 117 @utils.rpc 118 async def Receive(self, request: RxRequest, context: grpc.ServicerContext) -> RxResponse: 119 logging.info(f"Receive") 120 received_data = await self.data_queue.get() 121 return RxResponse(data=received_data) 122