1 /*
<lambda>null2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.android.pandora
18
19 import android.bluetooth.BluetoothAdapter
20 import android.bluetooth.BluetoothDevice
21 import android.bluetooth.BluetoothManager
22 import android.bluetooth.BluetoothProfile
23 import android.content.BroadcastReceiver
24 import android.content.Context
25 import android.content.Intent
26 import android.content.IntentFilter
27 import android.media.*
28 import android.net.MacAddress
29 import android.os.ParcelFileDescriptor
30 import android.util.Log
31 import androidx.test.platform.app.InstrumentationRegistry
32 import com.google.protobuf.Any
33 import com.google.protobuf.ByteString
34 import io.grpc.Status
35 import io.grpc.stub.ServerCallStreamObserver
36 import io.grpc.stub.StreamObserver
37 import java.io.BufferedReader
38 import java.io.InputStreamReader
39 import java.io.PrintWriter
40 import java.io.StringWriter
41 import java.util.concurrent.CancellationException
42 import java.util.stream.Collectors
43 import kotlinx.coroutines.CoroutineScope
44 import kotlinx.coroutines.Job
45 import kotlinx.coroutines.channels.Channel
46 import kotlinx.coroutines.channels.awaitClose
47 import kotlinx.coroutines.channels.trySendBlocking
48 import kotlinx.coroutines.flow.Flow
49 import kotlinx.coroutines.flow.callbackFlow
50 import kotlinx.coroutines.flow.catch
51 import kotlinx.coroutines.flow.consumeAsFlow
52 import kotlinx.coroutines.flow.first
53 import kotlinx.coroutines.flow.launchIn
54 import kotlinx.coroutines.flow.onCompletion
55 import kotlinx.coroutines.flow.onEach
56 import kotlinx.coroutines.launch
57 import kotlinx.coroutines.runBlocking
58 import kotlinx.coroutines.withTimeout
59 import kotlinx.coroutines.withTimeoutOrNull
60 import pandora.HostProto.Connection
61 import pandora.OsProto.InternalConnectionRef
62
63 private const val TAG = "PandoraUtils"
64 private val alphanumeric = ('A'..'Z') + ('a'..'z') + ('0'..'9')
65
66 val initiatedConnection = HashSet<BluetoothDevice>()
67 val waitedAclConnection = HashSet<BluetoothDevice>()
68 val waitedAclDisconnection = HashSet<BluetoothDevice>()
69
70 fun shell(cmd: String): String {
71 val fd = InstrumentationRegistry.getInstrumentation().getUiAutomation().executeShellCommand(cmd)
72 val input_stream = ParcelFileDescriptor.AutoCloseInputStream(fd)
73 return BufferedReader(InputStreamReader(input_stream)).lines().collect(Collectors.joining("\n"))
74 }
75
76 /**
77 * Creates a cold flow of intents based on an intent filter. If used multiple times in a same class,
78 * this flow should be transformed into a shared flow.
79 *
80 * @param context context on which to register the broadcast receiver.
81 * @param intentFilter intent filter.
82 * @return cold flow.
83 */
84 @kotlinx.coroutines.ExperimentalCoroutinesApi
<lambda>null85 fun intentFlow(context: Context, intentFilter: IntentFilter, scope: CoroutineScope) = callbackFlow {
86 val broadcastReceiver: BroadcastReceiver =
87 object : BroadcastReceiver() {
88 override fun onReceive(context: Context, intent: Intent) {
89 if (intent.action == BluetoothDevice.ACTION_ACL_CONNECTED) {
90 waitedAclDisconnection.remove(intent.getBluetoothDeviceExtra())
91 }
92 if (intent.action == BluetoothDevice.ACTION_ACL_DISCONNECTED) {
93 initiatedConnection.remove(intent.getBluetoothDeviceExtra())
94 waitedAclConnection.remove(intent.getBluetoothDeviceExtra())
95 waitedAclDisconnection.add(intent.getBluetoothDeviceExtra())
96 }
97 scope.launch { trySendBlocking(intent) }
98 }
99 }
100 context.registerReceiver(broadcastReceiver, intentFilter)
101
102 awaitClose { context.unregisterReceiver(broadcastReceiver) }
103 }
104
105 /**
106 * Creates a gRPC coroutine in a given coroutine scope which executes a given suspended function
107 * returning a gRPC response and sends it on a given gRPC stream observer.
108 *
109 * @param T the type of gRPC response.
110 * @param scope coroutine scope used to run the coroutine.
111 * @param responseObserver the gRPC stream observer on which to send the response.
112 * @param timeout the duration in seconds after which the coroutine is automatically cancelled and
113 * returns a timeout error. Default: 60s.
114 * @param block the suspended function to execute to get the response.
115 * @return reference to the coroutine as a Job.
116 *
117 * Example usage:
118 * ```
119 * override fun grpcMethod(
120 * request: TypeOfRequest,
121 * responseObserver: StreamObserver<TypeOfResponse> {
122 * grpcUnary(scope, responseObserver) {
123 * block
124 * }
125 * }
126 * }
127 * ```
128 */
129 @kotlinx.coroutines.ExperimentalCoroutinesApi
grpcUnarynull130 fun <T> grpcUnary(
131 scope: CoroutineScope,
132 responseObserver: StreamObserver<T>,
133 timeout: Long = 60,
134 block: suspend () -> T
135 ): Job {
136 return scope.launch {
137 try {
138 val response = withTimeout(timeout * 1000) { block() }
139 responseObserver.onNext(response)
140 responseObserver.onCompleted()
141 } catch (e: Throwable) {
142 e.printStackTrace()
143 val sw = StringWriter()
144 e.printStackTrace(PrintWriter(sw))
145 responseObserver.onError(
146 Status.UNKNOWN.withCause(e).withDescription(sw.toString()).asException()
147 )
148 }
149 }
150 }
151
152 /**
153 * Creates a gRPC coroutine in a given coroutine scope which executes a given suspended function
154 * taking in a Flow of gRPC requests and returning a Flow of gRPC responses and sends it on a given
155 * gRPC stream observer.
156 *
157 * @param T the type of gRPC response.
158 * @param scope coroutine scope used to run the coroutine.
159 * @param responseObserver the gRPC stream observer on which to send the response.
160 * @param block the suspended function transforming the request Flow to the response Flow.
161 * @return a StreamObserver for the incoming requests.
162 *
163 * Example usage:
164 * ```
165 * override fun grpcMethod(
166 * responseObserver: StreamObserver<TypeOfResponse> {
167 * grpcBidirectionalStream(scope, responseObserver) {
168 * block
169 * }
170 * }
171 * }
172 * ```
173 */
174 @kotlinx.coroutines.ExperimentalCoroutinesApi
grpcBidirectionalStreamnull175 fun <T, U> grpcBidirectionalStream(
176 scope: CoroutineScope,
177 responseObserver: StreamObserver<U>,
178 block: CoroutineScope.(Flow<T>) -> Flow<U>
179 ): StreamObserver<T> {
180
181 val inputChannel = Channel<T>()
182 val serverCallStreamObserver = responseObserver as ServerCallStreamObserver<T>
183
184 val job =
185 scope.launch {
186 block(inputChannel.consumeAsFlow())
187 .onEach { responseObserver.onNext(it) }
188 .onCompletion { error ->
189 if (error == null) {
190 responseObserver.onCompleted()
191 }
192 }
193 .catch {
194 it.printStackTrace()
195 val sw = StringWriter()
196 it.printStackTrace(PrintWriter(sw))
197 responseObserver.onError(
198 Status.UNKNOWN.withCause(it).withDescription(sw.toString()).asException()
199 )
200 }
201 .launchIn(this)
202 }
203
204 serverCallStreamObserver.setOnCancelHandler { job.cancel() }
205
206 return object : StreamObserver<T> {
207 override fun onNext(req: T) {
208 // Note: this should be made a blocking call, and the handler should run in a separate
209 // thread
210 // so we get flow control - but for now we can live with this
211 if (inputChannel.trySend(req).isFailure) {
212 job.cancel(CancellationException("too many incoming requests, buffer exceeded"))
213 responseObserver.onError(
214 CancellationException("too many incoming requests, buffer exceeded")
215 )
216 }
217 }
218
219 override fun onCompleted() {
220 // stop the input flow, but keep the job running
221 inputChannel.close()
222 }
223
224 override fun onError(e: Throwable) {
225 job.cancel()
226 e.printStackTrace()
227 val sw = StringWriter()
228 e.printStackTrace(PrintWriter(sw))
229 responseObserver.onError(
230 Status.UNKNOWN.withCause(e).withDescription(sw.toString()).asException()
231 )
232 }
233 }
234 }
235
236 /**
237 * Creates a gRPC coroutine in a given coroutine scope which executes a given suspended function
238 * taking in a Flow of gRPC requests and returning a Flow of gRPC responses and sends it on a given
239 * gRPC stream observer.
240 *
241 * @param T the type of gRPC response.
242 * @param scope coroutine scope used to run the coroutine.
243 * @param responseObserver the gRPC stream observer on which to send the response.
244 * @param block the suspended function producing the response Flow.
245 * @return a StreamObserver for the incoming requests.
246 *
247 * Example usage:
248 * ```
249 * override fun grpcMethod(
250 * request: TypeOfRequest,
251 * responseObserver: StreamObserver<TypeOfResponse> {
252 * grpcServerStream(scope, responseObserver) {
253 * block
254 * }
255 * }
256 * }
257 * ```
258 */
259 @kotlinx.coroutines.ExperimentalCoroutinesApi
grpcServerStreamnull260 fun <T> grpcServerStream(
261 scope: CoroutineScope,
262 responseObserver: StreamObserver<T>,
263 block: CoroutineScope.() -> Flow<T>
264 ) {
265 val serverCallStreamObserver = responseObserver as ServerCallStreamObserver<T>
266
267 val job =
268 scope.launch {
269 block()
270 .onEach { responseObserver.onNext(it) }
271 .onCompletion { error ->
272 if (error == null) {
273 responseObserver.onCompleted()
274 }
275 }
276 .catch {
277 it.printStackTrace()
278 val sw = StringWriter()
279 it.printStackTrace(PrintWriter(sw))
280 responseObserver.onError(
281 Status.UNKNOWN.withCause(it).withDescription(sw.toString()).asException()
282 )
283 }
284 .launchIn(this)
285 }
286
287 serverCallStreamObserver.setOnCancelHandler { job.cancel() }
288 }
289
290 /**
291 * Synchronous method to get a Bluetooth profile proxy.
292 *
293 * @param T the type of profile proxy (e.g. BluetoothA2dp)
294 * @param context context
295 * @param bluetoothAdapter local Bluetooth adapter
296 * @param profile identifier of the Bluetooth profile (e.g. BluetoothProfile#A2DP)
297 * @return T the desired profile proxy
298 */
299 @Suppress("UNCHECKED_CAST")
300 @kotlinx.coroutines.ExperimentalCoroutinesApi
getProfileProxynull301 fun <T> getProfileProxy(context: Context, profile: Int): T {
302 var proxy: BluetoothProfile?
303 runBlocking {
304 val bluetoothManager = context.getSystemService(BluetoothManager::class.java)!!
305 val bluetoothAdapter = bluetoothManager.adapter
306
307 val flow = callbackFlow {
308 val serviceListener =
309 object : BluetoothProfile.ServiceListener {
310 override fun onServiceConnected(profile: Int, proxy: BluetoothProfile) {
311 trySendBlocking(proxy)
312 }
313 override fun onServiceDisconnected(profile: Int) {}
314 }
315
316 bluetoothAdapter.getProfileProxy(context, serviceListener, profile)
317
318 awaitClose {}
319 }
320 proxy = withTimeoutOrNull(5_000) { flow.first() }
321 }
322 if (proxy == null) {
323 Log.w(TAG, "profile proxy $profile is null")
324 }
325 return proxy!! as T
326 }
327
Intentnull328 fun Intent.getBluetoothDeviceExtra(): BluetoothDevice =
329 this.getParcelableExtra(BluetoothDevice.EXTRA_DEVICE, BluetoothDevice::class.java)!!
330
331 fun ByteString.decodeAsMacAddressToString(): String =
332 MacAddress.fromBytes(this.toByteArray()).toString().uppercase()
333
334 fun ByteString.toBluetoothDevice(adapter: BluetoothAdapter): BluetoothDevice =
335 adapter.getRemoteDevice(this.decodeAsMacAddressToString())
336
337 fun Connection.toBluetoothDevice(adapter: BluetoothAdapter): BluetoothDevice =
338 adapter.getRemoteDevice(this.address)
339
340 val Connection.address: String
341 get() = InternalConnectionRef.parseFrom(this.cookie.value).address.decodeAsMacAddressToString()
342
343 val Connection.transport: Int
344 get() = InternalConnectionRef.parseFrom(this.cookie.value).transport
345
346 fun BluetoothDevice.toByteString() =
347 ByteString.copyFrom(MacAddress.fromString(this.address).toByteArray())!!
348
349 fun BluetoothDevice.toConnection(transport: Int): Connection {
350 val internal_connection_ref =
351 InternalConnectionRef.newBuilder()
352 .setAddress(ByteString.copyFrom(MacAddress.fromString(this.address).toByteArray()))
353 .setTransport(transport)
354 .build()
355 val cookie = Any.newBuilder().setValue(internal_connection_ref.toByteString()).build()
356
357 return Connection.newBuilder().setCookie(cookie).build()
358 }
359
360 /** Creates Audio track instance and returns the reference. */
buildAudioTracknull361 fun buildAudioTrack(): AudioTrack? {
362 return AudioTrack.Builder()
363 .setAudioAttributes(
364 AudioAttributes.Builder()
365 .setUsage(AudioAttributes.USAGE_MEDIA)
366 .setContentType(AudioAttributes.CONTENT_TYPE_MUSIC)
367 .build()
368 )
369 .setAudioFormat(
370 AudioFormat.Builder()
371 .setEncoding(AudioFormat.ENCODING_PCM_16BIT)
372 .setSampleRate(44100)
373 .setChannelMask(AudioFormat.CHANNEL_OUT_STEREO)
374 .build()
375 )
376 .setTransferMode(AudioTrack.MODE_STREAM)
377 .setBufferSizeInBytes(44100 * 2 * 2)
378 .build()
379 }
380
381 /**
382 * Generates Alpha-numeric string of given length.
383 *
384 * @param length required string size.
385 * @return a generated string
386 */
generateAlphanumericStringnull387 fun generateAlphanumericString(length: Int): String {
388 return buildString { repeat(length) { append(alphanumeric.random()) } }
389 }
390