1# Copyright 2024 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""A2DP grpc interface."""
15
16import asyncio
17import json
18import logging
19from typing import AsyncGenerator
20from typing import AsyncIterator
21
22from floss.pandora.floss import audio_utils
23from floss.pandora.floss import cras_utils
24from floss.pandora.floss import media_client
25from floss.pandora.floss import utils
26from floss.pandora.server import bluetooth as bluetooth_module
27from google.protobuf import wrappers_pb2, empty_pb2
28import grpc
29from pandora import a2dp_grpc_aio
30from pandora import a2dp_pb2
31
32
33class A2DPService(a2dp_grpc_aio.A2DPServicer):
34    """Service to trigger Bluetooth A2DP procedures.
35
36    This class implements the Pandora bluetooth test interfaces,
37    where the meta class definition is automatically generated by the protobuf.
38    The interface definition can be found in:
39    https://cs.android.com/android/platform/superproject/+/main:external/pandora/bt-test-interfaces/pandora/a2dp.proto
40    """
41
42    def __init__(self, bluetooth: bluetooth_module.Bluetooth):
43        self.bluetooth = bluetooth
44        self._cras_test_client = cras_utils.CrasTestClient()
45        cras_utils.set_floss_enabled(True)
46
47    async def OpenSource(self, request: a2dp_pb2.OpenSourceRequest,
48                         context: grpc.ServicerContext) -> a2dp_pb2.OpenSourceResponse:
49
50        class ConnectionObserver(media_client.BluetoothMediaCallbacks):
51            """Observer to observe the A2DP profile connection state."""
52
53            def __init__(self, task):
54                self.task = task
55
56            @utils.glib_callback()
57            def on_bluetooth_audio_device_added(self, remote_device):
58                if remote_device['address'] != self.task['address']:
59                    return
60
61                future = self.task['open_source']
62                future.get_loop().call_soon_threadsafe(future.set_result, True)
63
64        connection = utils.connection_from(request.connection)
65        address = connection.address
66        connected_devices = self.bluetooth.get_connected_audio_devices()
67
68        if not self.bluetooth.is_connected(address) or address not in connected_devices:
69            try:
70                open_source = asyncio.get_running_loop().create_future()
71                observer = ConnectionObserver({'open_source': open_source, 'address': address})
72                name = utils.create_observer_name(observer)
73                self.bluetooth.media_client.register_callback_observer(name, observer)
74                self.bluetooth.connect_device(address)
75                success = await asyncio.wait_for(open_source, timeout=10)
76
77                if not success:
78                    await context.abort(grpc.StatusCode.UNKNOWN, f'Failed to connect to the address {address}.')
79            except asyncio.TimeoutError as e:
80                logging.error(f'OpenSource: timeout for waiting A2DP connection. {e}')
81            finally:
82                self.bluetooth.media_client.unregister_callback_observer(name, observer)
83
84        cookie = utils.address_to(address)
85        return a2dp_pb2.OpenSourceResponse(source=a2dp_pb2.Source(cookie=cookie))
86
87    async def OpenSink(self, request: a2dp_pb2.OpenSinkRequest,
88                       context: grpc.ServicerContext) -> a2dp_pb2.OpenSinkResponse:
89
90        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
91        context.set_details('Method not implemented!')  # type: ignore
92        raise NotImplementedError('Method not implemented!')
93
94    async def WaitSource(self, request: a2dp_pb2.WaitSourceRequest,
95                         context: grpc.ServicerContext) -> a2dp_pb2.WaitSourceResponse:
96
97        class ConnectionObserver(media_client.BluetoothMediaCallbacks):
98            """Observer to observe the A2DP profile connection state."""
99
100            def __init__(self, task):
101                self.task = task
102
103            @utils.glib_callback()
104            def on_bluetooth_audio_device_added(self, remote_device):
105                if remote_device['address'] != self.task['address']:
106                    return
107
108                future = self.task['wait_source']
109                future.get_loop().call_soon_threadsafe(future.set_result, address)
110
111        connection = utils.connection_from(request.connection)
112        address = connection.address
113        if not address:
114            await context.abort(grpc.StatusCode.INVALID_ARGUMENT, 'Request address field must be set.')
115
116        connected_devices = self.bluetooth.get_connected_audio_devices()
117        if not self.bluetooth.is_connected(address) or address not in connected_devices:
118            try:
119                wait_source = asyncio.get_running_loop().create_future()
120                observer = ConnectionObserver({'wait_source': wait_source, 'address': address})
121                name = utils.create_observer_name(observer)
122                self.bluetooth.media_client.register_callback_observer(name, observer)
123                await asyncio.wait_for(wait_source, timeout=10)
124            except asyncio.TimeoutError as e:
125                logging.error(f'WaitSource: timeout for waiting A2DP connection. {e}')
126            finally:
127                self.bluetooth.media_client.unregister_callback_observer(name, observer)
128
129        cookie = utils.address_to(address)
130        return a2dp_pb2.WaitSourceResponse(source=a2dp_pb2.Source(cookie=cookie))
131
132    async def WaitSink(self, request: a2dp_pb2.WaitSinkRequest,
133                       context: grpc.ServicerContext) -> a2dp_pb2.WaitSinkResponse:
134
135        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
136        context.set_details('Method not implemented!')  # type: ignore
137        raise NotImplementedError('Method not implemented!')
138
139    async def IsSuspended(self, request: a2dp_pb2.IsSuspendedRequest,
140                          context: grpc.ServicerContext) -> wrappers_pb2.BoolValue:
141
142        address = utils.address_from(request.target.cookie)
143        connected_audio_devices = self.bluetooth.get_connected_audio_devices()
144        if address not in connected_audio_devices:
145            await context.abort(grpc.StatusCode.FAILED_PRECONDITION,
146                                'A2dp device is not connected, cannot get suspend state')
147
148        is_suspended = cras_utils.get_active_stream_count() == 0
149        return wrappers_pb2.BoolValue(value=is_suspended)
150
151    async def Start(self, request: a2dp_pb2.StartRequest, context: grpc.ServicerContext) -> a2dp_pb2.StartResponse:
152
153        target = request.WhichOneof('target')
154        address = utils.address_from(request.target.cookie)
155        connected_audio_devices = self.bluetooth.get_connected_audio_devices()
156        if address not in connected_audio_devices:
157            await context.abort(grpc.StatusCode.FAILED_PRECONDITION, 'A2dp device is not connected, cannot start')
158
159        audio_data = json.dumps(audio_utils.A2DP_TEST_DATA)
160        audio_data = json.loads(audio_data)
161        audio_utils.generate_playback_file(audio_data)
162
163        if not audio_utils.select_audio_output_node():
164            await context.abort(grpc.StatusCode.UNKNOWN, 'Failed to select audio output node')
165
166        if target == 'source':
167            self._cras_test_client.start_playing_subprocess(audio_data['file'],
168                                                            channels=audio_data['channels'],
169                                                            rate=audio_data['rate'],
170                                                            duration=audio_data['duration'])
171        else:
172            await context.abort(grpc.StatusCode.INVALID_ARGUMENT, f'Invalid target type: {target}.')
173
174        return a2dp_pb2.StartResponse(started=empty_pb2.Empty())
175
176    async def Suspend(self, request: a2dp_pb2.SuspendRequest,
177                      context: grpc.ServicerContext) -> a2dp_pb2.SuspendResponse:
178
179        target = request.WhichOneof('target')
180        address = utils.address_from(request.target.cookie)
181        connected_audio_devices = self.bluetooth.get_connected_audio_devices()
182        if address not in connected_audio_devices:
183            await context.abort(grpc.StatusCode.FAILED_PRECONDITION, 'A2dp device is not connected, cannot suspend')
184
185        if cras_utils.get_active_stream_count() == 0:
186            await context.abort(grpc.StatusCode.FAILED_PRECONDITION, 'A2dp Device is already suspended, cannot suspend')
187
188        if target == 'source':
189            self._cras_test_client.stop_playing_subprocess()
190        else:
191            await context.abort(grpc.StatusCode.INVALID_ARGUMENT, f'Invalid target type: {target}.')
192
193        return a2dp_pb2.SuspendResponse(suspended=empty_pb2.Empty())
194
195    async def Close(self, request: a2dp_pb2.CloseRequest, context: grpc.ServicerContext) -> a2dp_pb2.CloseResponse:
196
197        class ConnectionObserver(media_client.BluetoothMediaCallbacks):
198            """Observer to observe the A2DP profile connection state."""
199
200            def __init__(self, task):
201                self.task = task
202
203            @utils.glib_callback()
204            def on_bluetooth_audio_device_removed(self, address):
205                if address != self.task['address']:
206                    return
207
208                future = self.task['close_stream']
209                future.get_loop().call_soon_threadsafe(future.set_result, address)
210
211        address = utils.address_from(request.target.cookie)
212        connected_audio_devices = self.bluetooth.get_connected_audio_devices()
213        if address not in connected_audio_devices:
214            await context.abort(grpc.StatusCode.FAILED_PRECONDITION, 'A2dp device is not connected, cannot close')
215
216        try:
217            close_stream = asyncio.get_running_loop().create_future()
218            observer = ConnectionObserver({'close_stream': close_stream, 'address': address})
219            name = utils.create_observer_name(observer)
220            self.bluetooth.media_client.register_callback_observer(name, observer)
221            self.bluetooth.disconnect_media(address)
222            await close_stream
223        finally:
224            self.bluetooth.media_client.unregister_callback_observer(name, observer)
225        return a2dp_pb2.CloseResponse()
226
227    async def GetAudioEncoding(self, request: a2dp_pb2.GetAudioEncodingRequest,
228                               context: grpc.ServicerContext) -> a2dp_pb2.GetAudioEncodingResponse:
229        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
230        context.set_details('Method not implemented!')  # type: ignore
231        raise NotImplementedError('Method not implemented!')
232
233    async def PlaybackAudio(self, request: AsyncIterator[a2dp_pb2.PlaybackAudioRequest],
234                            context: grpc.ServicerContext) -> a2dp_pb2.PlaybackAudioResponse:
235
236        audio_signals = request
237        logging.info('PlaybackAudio: Wait for audio signal...')
238
239        audio_signal = await utils.anext(audio_signals)
240        audio_data = audio_signal.data
241
242        if cras_utils.get_active_stream_count() == 0:
243            await context.abort(grpc.StatusCode.FAILED_PRECONDITION, 'Audio track is not started')
244
245        audio_utils.generate_playback_file_from_binary_data(audio_data)
246        audio_file = audio_utils.A2DP_PLAYBACK_DATA['file']
247        self._cras_test_client.play(audio_file)
248        return a2dp_pb2.PlaybackAudioResponse()
249
250    async def CaptureAudio(self, request: a2dp_pb2.CaptureAudioRequest,
251                           context: grpc.ServicerContext) -> AsyncGenerator[a2dp_pb2.CaptureAudioResponse, None]:
252        context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
253        context.set_details('Method not implemented!')  # type: ignore
254        raise NotImplementedError('Method not implemented!')
255