1# Copyright 2024 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""A2DP grpc interface.""" 15 16import asyncio 17import json 18import logging 19from typing import AsyncGenerator 20from typing import AsyncIterator 21 22from floss.pandora.floss import audio_utils 23from floss.pandora.floss import cras_utils 24from floss.pandora.floss import media_client 25from floss.pandora.floss import utils 26from floss.pandora.server import bluetooth as bluetooth_module 27from google.protobuf import wrappers_pb2, empty_pb2 28import grpc 29from pandora import a2dp_grpc_aio 30from pandora import a2dp_pb2 31 32 33class A2DPService(a2dp_grpc_aio.A2DPServicer): 34 """Service to trigger Bluetooth A2DP procedures. 35 36 This class implements the Pandora bluetooth test interfaces, 37 where the meta class definition is automatically generated by the protobuf. 38 The interface definition can be found in: 39 https://cs.android.com/android/platform/superproject/+/main:external/pandora/bt-test-interfaces/pandora/a2dp.proto 40 """ 41 42 def __init__(self, bluetooth: bluetooth_module.Bluetooth): 43 self.bluetooth = bluetooth 44 self._cras_test_client = cras_utils.CrasTestClient() 45 cras_utils.set_floss_enabled(True) 46 47 async def OpenSource(self, request: a2dp_pb2.OpenSourceRequest, 48 context: grpc.ServicerContext) -> a2dp_pb2.OpenSourceResponse: 49 50 class ConnectionObserver(media_client.BluetoothMediaCallbacks): 51 """Observer to observe the A2DP profile connection state.""" 52 53 def __init__(self, task): 54 self.task = task 55 56 @utils.glib_callback() 57 def on_bluetooth_audio_device_added(self, remote_device): 58 if remote_device['address'] != self.task['address']: 59 return 60 61 future = self.task['open_source'] 62 future.get_loop().call_soon_threadsafe(future.set_result, True) 63 64 connection = utils.connection_from(request.connection) 65 address = connection.address 66 connected_devices = self.bluetooth.get_connected_audio_devices() 67 68 if not self.bluetooth.is_connected(address) or address not in connected_devices: 69 try: 70 open_source = asyncio.get_running_loop().create_future() 71 observer = ConnectionObserver({'open_source': open_source, 'address': address}) 72 name = utils.create_observer_name(observer) 73 self.bluetooth.media_client.register_callback_observer(name, observer) 74 self.bluetooth.connect_device(address) 75 success = await asyncio.wait_for(open_source, timeout=10) 76 77 if not success: 78 await context.abort(grpc.StatusCode.UNKNOWN, f'Failed to connect to the address {address}.') 79 except asyncio.TimeoutError as e: 80 logging.error(f'OpenSource: timeout for waiting A2DP connection. {e}') 81 finally: 82 self.bluetooth.media_client.unregister_callback_observer(name, observer) 83 84 cookie = utils.address_to(address) 85 return a2dp_pb2.OpenSourceResponse(source=a2dp_pb2.Source(cookie=cookie)) 86 87 async def OpenSink(self, request: a2dp_pb2.OpenSinkRequest, 88 context: grpc.ServicerContext) -> a2dp_pb2.OpenSinkResponse: 89 90 context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore 91 context.set_details('Method not implemented!') # type: ignore 92 raise NotImplementedError('Method not implemented!') 93 94 async def WaitSource(self, request: a2dp_pb2.WaitSourceRequest, 95 context: grpc.ServicerContext) -> a2dp_pb2.WaitSourceResponse: 96 97 class ConnectionObserver(media_client.BluetoothMediaCallbacks): 98 """Observer to observe the A2DP profile connection state.""" 99 100 def __init__(self, task): 101 self.task = task 102 103 @utils.glib_callback() 104 def on_bluetooth_audio_device_added(self, remote_device): 105 if remote_device['address'] != self.task['address']: 106 return 107 108 future = self.task['wait_source'] 109 future.get_loop().call_soon_threadsafe(future.set_result, address) 110 111 connection = utils.connection_from(request.connection) 112 address = connection.address 113 if not address: 114 await context.abort(grpc.StatusCode.INVALID_ARGUMENT, 'Request address field must be set.') 115 116 connected_devices = self.bluetooth.get_connected_audio_devices() 117 if not self.bluetooth.is_connected(address) or address not in connected_devices: 118 try: 119 wait_source = asyncio.get_running_loop().create_future() 120 observer = ConnectionObserver({'wait_source': wait_source, 'address': address}) 121 name = utils.create_observer_name(observer) 122 self.bluetooth.media_client.register_callback_observer(name, observer) 123 await asyncio.wait_for(wait_source, timeout=10) 124 except asyncio.TimeoutError as e: 125 logging.error(f'WaitSource: timeout for waiting A2DP connection. {e}') 126 finally: 127 self.bluetooth.media_client.unregister_callback_observer(name, observer) 128 129 cookie = utils.address_to(address) 130 return a2dp_pb2.WaitSourceResponse(source=a2dp_pb2.Source(cookie=cookie)) 131 132 async def WaitSink(self, request: a2dp_pb2.WaitSinkRequest, 133 context: grpc.ServicerContext) -> a2dp_pb2.WaitSinkResponse: 134 135 context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore 136 context.set_details('Method not implemented!') # type: ignore 137 raise NotImplementedError('Method not implemented!') 138 139 async def IsSuspended(self, request: a2dp_pb2.IsSuspendedRequest, 140 context: grpc.ServicerContext) -> wrappers_pb2.BoolValue: 141 142 address = utils.address_from(request.target.cookie) 143 connected_audio_devices = self.bluetooth.get_connected_audio_devices() 144 if address not in connected_audio_devices: 145 await context.abort(grpc.StatusCode.FAILED_PRECONDITION, 146 'A2dp device is not connected, cannot get suspend state') 147 148 is_suspended = cras_utils.get_active_stream_count() == 0 149 return wrappers_pb2.BoolValue(value=is_suspended) 150 151 async def Start(self, request: a2dp_pb2.StartRequest, context: grpc.ServicerContext) -> a2dp_pb2.StartResponse: 152 153 target = request.WhichOneof('target') 154 address = utils.address_from(request.target.cookie) 155 connected_audio_devices = self.bluetooth.get_connected_audio_devices() 156 if address not in connected_audio_devices: 157 await context.abort(grpc.StatusCode.FAILED_PRECONDITION, 'A2dp device is not connected, cannot start') 158 159 audio_data = json.dumps(audio_utils.A2DP_TEST_DATA) 160 audio_data = json.loads(audio_data) 161 audio_utils.generate_playback_file(audio_data) 162 163 if not audio_utils.select_audio_output_node(): 164 await context.abort(grpc.StatusCode.UNKNOWN, 'Failed to select audio output node') 165 166 if target == 'source': 167 self._cras_test_client.start_playing_subprocess(audio_data['file'], 168 channels=audio_data['channels'], 169 rate=audio_data['rate'], 170 duration=audio_data['duration']) 171 else: 172 await context.abort(grpc.StatusCode.INVALID_ARGUMENT, f'Invalid target type: {target}.') 173 174 return a2dp_pb2.StartResponse(started=empty_pb2.Empty()) 175 176 async def Suspend(self, request: a2dp_pb2.SuspendRequest, 177 context: grpc.ServicerContext) -> a2dp_pb2.SuspendResponse: 178 179 target = request.WhichOneof('target') 180 address = utils.address_from(request.target.cookie) 181 connected_audio_devices = self.bluetooth.get_connected_audio_devices() 182 if address not in connected_audio_devices: 183 await context.abort(grpc.StatusCode.FAILED_PRECONDITION, 'A2dp device is not connected, cannot suspend') 184 185 if cras_utils.get_active_stream_count() == 0: 186 await context.abort(grpc.StatusCode.FAILED_PRECONDITION, 'A2dp Device is already suspended, cannot suspend') 187 188 if target == 'source': 189 self._cras_test_client.stop_playing_subprocess() 190 else: 191 await context.abort(grpc.StatusCode.INVALID_ARGUMENT, f'Invalid target type: {target}.') 192 193 return a2dp_pb2.SuspendResponse(suspended=empty_pb2.Empty()) 194 195 async def Close(self, request: a2dp_pb2.CloseRequest, context: grpc.ServicerContext) -> a2dp_pb2.CloseResponse: 196 197 class ConnectionObserver(media_client.BluetoothMediaCallbacks): 198 """Observer to observe the A2DP profile connection state.""" 199 200 def __init__(self, task): 201 self.task = task 202 203 @utils.glib_callback() 204 def on_bluetooth_audio_device_removed(self, address): 205 if address != self.task['address']: 206 return 207 208 future = self.task['close_stream'] 209 future.get_loop().call_soon_threadsafe(future.set_result, address) 210 211 address = utils.address_from(request.target.cookie) 212 connected_audio_devices = self.bluetooth.get_connected_audio_devices() 213 if address not in connected_audio_devices: 214 await context.abort(grpc.StatusCode.FAILED_PRECONDITION, 'A2dp device is not connected, cannot close') 215 216 try: 217 close_stream = asyncio.get_running_loop().create_future() 218 observer = ConnectionObserver({'close_stream': close_stream, 'address': address}) 219 name = utils.create_observer_name(observer) 220 self.bluetooth.media_client.register_callback_observer(name, observer) 221 self.bluetooth.disconnect_media(address) 222 await close_stream 223 finally: 224 self.bluetooth.media_client.unregister_callback_observer(name, observer) 225 return a2dp_pb2.CloseResponse() 226 227 async def GetAudioEncoding(self, request: a2dp_pb2.GetAudioEncodingRequest, 228 context: grpc.ServicerContext) -> a2dp_pb2.GetAudioEncodingResponse: 229 context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore 230 context.set_details('Method not implemented!') # type: ignore 231 raise NotImplementedError('Method not implemented!') 232 233 async def PlaybackAudio(self, request: AsyncIterator[a2dp_pb2.PlaybackAudioRequest], 234 context: grpc.ServicerContext) -> a2dp_pb2.PlaybackAudioResponse: 235 236 audio_signals = request 237 logging.info('PlaybackAudio: Wait for audio signal...') 238 239 audio_signal = await utils.anext(audio_signals) 240 audio_data = audio_signal.data 241 242 if cras_utils.get_active_stream_count() == 0: 243 await context.abort(grpc.StatusCode.FAILED_PRECONDITION, 'Audio track is not started') 244 245 audio_utils.generate_playback_file_from_binary_data(audio_data) 246 audio_file = audio_utils.A2DP_PLAYBACK_DATA['file'] 247 self._cras_test_client.play(audio_file) 248 return a2dp_pb2.PlaybackAudioResponse() 249 250 async def CaptureAudio(self, request: a2dp_pb2.CaptureAudioRequest, 251 context: grpc.ServicerContext) -> AsyncGenerator[a2dp_pb2.CaptureAudioResponse, None]: 252 context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore 253 context.set_details('Method not implemented!') # type: ignore 254 raise NotImplementedError('Method not implemented!') 255