1# Copyright (C) 2024 The Android Open Source Project
2# Licensed under the Apache License, Version 2.0 (the "License");
3# you may not use this file except in compliance with the License.
4# You may obtain a copy of the License at
5#
6# http://www.apache.org/licenses/LICENSE-2.0
7#
8# Unless required by applicable law or agreed to in writing, software
9# distributed under the License is distributed on an "AS IS" BASIS,
10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11# See the License for the specific language governing permissions and
12# limitations under the License.
13
14import asyncio
15import logging
16from typing import Dict, Optional
17
18from bumble import core
19from bumble.device import Device
20from bumble.rfcomm import (
21    Server,
22    make_service_sdp_records,
23    DLC,
24)
25from bumble.pandora import utils
26import grpc
27from pandora_experimental.rfcomm_grpc_aio import RFCOMMServicer
28from pandora_experimental.rfcomm_pb2 import (
29    AcceptConnectionRequest,
30    AcceptConnectionResponse,
31    ConnectionRequest,
32    ConnectionResponse,
33    RfcommConnection,
34    RxRequest,
35    RxResponse,
36    ServerId,
37    StartServerRequest,
38    StartServerResponse,
39    StopServerRequest,
40    StopServerResponse,
41    TxRequest,
42    TxResponse,
43)
44
45
46class RFCOMMService(RFCOMMServicer):
47    #TODO Add support for multiple servers
48    device: Device
49    server_id: Optional[ServerId]
50    server: Optional[Server]
51
52    def __init__(self, device: Device) -> None:
53        super().__init__()
54        self.device = device
55        self.server_id = None
56        self.server = None
57        self.server_name = None
58        self.server_uuid = None
59        self.connections = {}  # key = id, value = dlc
60        self.next_server_id = 1
61        self.next_conn_id = 1
62        self.open_channel = None
63        self.wait_dlc = None
64        self.dlc = None
65        self.data_queue = asyncio.Queue()
66
67    @utils.rpc
68    async def StartServer(self, request: StartServerRequest, context: grpc.ServicerContext) -> StartServerResponse:
69        logging.info(f"StartServer")
70        if self.server_id:
71            logging.warning(f"Server already started, returning existing server")
72            return StartServerResponse(server=self.server_id)
73        else:
74            self.server_id = ServerId(id=self.next_server_id)
75            self.next_server_id += 1
76            self.server = Server(self.device)
77            self.server_name = request.name
78            self.server_uuid = core.UUID(request.uuid)
79        self.wait_dlc = asyncio.get_running_loop().create_future()
80        handle = 1
81        #TODO Add support for multiple clients
82        self.open_channel = self.server.listen(acceptor=self.wait_dlc.set_result, channel=2)
83        records = make_service_sdp_records(handle, self.open_channel, self.server_uuid)
84        self.device.sdp_service_records[handle] = records
85        return StartServerResponse(server=self.server_id)
86
87    @utils.rpc
88    async def AcceptConnection(self, request: AcceptConnectionRequest,
89                               context: grpc.ServicerContext) -> AcceptConnectionResponse:
90        logging.info(f"AcceptConnection")
91        assert self.server_id.id == request.server.id
92        self.dlc = await self.wait_dlc
93        self.dlc.sink = self.data_queue.put_nowait
94        new_conn = RfcommConnection(id=self.next_conn_id)
95        self.next_conn_id += 1
96        self.connections[new_conn.id] = self.dlc
97        return AcceptConnectionResponse(connection=new_conn)
98
99    @utils.rpc
100    async def StopServer(self, request: StopServerRequest, context: grpc.ServicerContext) -> StopServerResponse:
101        logging.info(f"StopServer")
102        assert self.server_id.id == request.server.id
103        self.server = None
104        self.server_id = None
105        self.server_name = None
106        self.server_uuid = None
107
108        return StopServerResponse()
109
110    @utils.rpc
111    async def Send(self, request: TxRequest, context: grpc.ServicerContext) -> TxResponse:
112        logging.info(f"Send")
113        dlc = self.connections[request.connection.id]
114        dlc.write(request.data)
115        return TxResponse()
116
117    @utils.rpc
118    async def Receive(self, request: RxRequest, context: grpc.ServicerContext) -> RxResponse:
119        logging.info(f"Receive")
120        received_data = await self.data_queue.get()
121        return RxResponse(data=received_data)
122