1 /*
<lambda>null2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.pandora
18 
19 import android.bluetooth.BluetoothAdapter
20 import android.bluetooth.BluetoothDevice
21 import android.bluetooth.BluetoothManager
22 import android.bluetooth.BluetoothProfile
23 import android.content.BroadcastReceiver
24 import android.content.Context
25 import android.content.Intent
26 import android.content.IntentFilter
27 import android.media.*
28 import android.net.MacAddress
29 import android.os.ParcelFileDescriptor
30 import android.util.Log
31 import androidx.test.platform.app.InstrumentationRegistry
32 import com.google.protobuf.Any
33 import com.google.protobuf.ByteString
34 import io.grpc.Status
35 import io.grpc.stub.ServerCallStreamObserver
36 import io.grpc.stub.StreamObserver
37 import java.io.BufferedReader
38 import java.io.InputStreamReader
39 import java.io.PrintWriter
40 import java.io.StringWriter
41 import java.util.concurrent.CancellationException
42 import java.util.stream.Collectors
43 import kotlinx.coroutines.CoroutineScope
44 import kotlinx.coroutines.Job
45 import kotlinx.coroutines.channels.Channel
46 import kotlinx.coroutines.channels.awaitClose
47 import kotlinx.coroutines.channels.trySendBlocking
48 import kotlinx.coroutines.flow.Flow
49 import kotlinx.coroutines.flow.callbackFlow
50 import kotlinx.coroutines.flow.catch
51 import kotlinx.coroutines.flow.consumeAsFlow
52 import kotlinx.coroutines.flow.first
53 import kotlinx.coroutines.flow.launchIn
54 import kotlinx.coroutines.flow.onCompletion
55 import kotlinx.coroutines.flow.onEach
56 import kotlinx.coroutines.launch
57 import kotlinx.coroutines.runBlocking
58 import kotlinx.coroutines.withTimeout
59 import kotlinx.coroutines.withTimeoutOrNull
60 import pandora.HostProto.Connection
61 import pandora.OsProto.InternalConnectionRef
62 
63 private const val TAG = "PandoraUtils"
64 private val alphanumeric = ('A'..'Z') + ('a'..'z') + ('0'..'9')
65 
66 val initiatedConnection = HashSet<BluetoothDevice>()
67 val waitedAclConnection = HashSet<BluetoothDevice>()
68 val waitedAclDisconnection = HashSet<BluetoothDevice>()
69 
70 fun shell(cmd: String): String {
71     val fd = InstrumentationRegistry.getInstrumentation().getUiAutomation().executeShellCommand(cmd)
72     val input_stream = ParcelFileDescriptor.AutoCloseInputStream(fd)
73     return BufferedReader(InputStreamReader(input_stream)).lines().collect(Collectors.joining("\n"))
74 }
75 
76 /**
77  * Creates a cold flow of intents based on an intent filter. If used multiple times in a same class,
78  * this flow should be transformed into a shared flow.
79  *
80  * @param context context on which to register the broadcast receiver.
81  * @param intentFilter intent filter.
82  * @return cold flow.
83  */
84 @kotlinx.coroutines.ExperimentalCoroutinesApi
<lambda>null85 fun intentFlow(context: Context, intentFilter: IntentFilter, scope: CoroutineScope) = callbackFlow {
86     val broadcastReceiver: BroadcastReceiver =
87         object : BroadcastReceiver() {
88             override fun onReceive(context: Context, intent: Intent) {
89                 if (intent.action == BluetoothDevice.ACTION_ACL_CONNECTED) {
90                     waitedAclDisconnection.remove(intent.getBluetoothDeviceExtra())
91                 }
92                 if (intent.action == BluetoothDevice.ACTION_ACL_DISCONNECTED) {
93                     initiatedConnection.remove(intent.getBluetoothDeviceExtra())
94                     waitedAclConnection.remove(intent.getBluetoothDeviceExtra())
95                     waitedAclDisconnection.add(intent.getBluetoothDeviceExtra())
96                 }
97                 scope.launch { trySendBlocking(intent) }
98             }
99         }
100     context.registerReceiver(broadcastReceiver, intentFilter)
101 
102     awaitClose { context.unregisterReceiver(broadcastReceiver) }
103 }
104 
105 /**
106  * Creates a gRPC coroutine in a given coroutine scope which executes a given suspended function
107  * returning a gRPC response and sends it on a given gRPC stream observer.
108  *
109  * @param T the type of gRPC response.
110  * @param scope coroutine scope used to run the coroutine.
111  * @param responseObserver the gRPC stream observer on which to send the response.
112  * @param timeout the duration in seconds after which the coroutine is automatically cancelled and
113  *   returns a timeout error. Default: 60s.
114  * @param block the suspended function to execute to get the response.
115  * @return reference to the coroutine as a Job.
116  *
117  * Example usage:
118  * ```
119  * override fun grpcMethod(
120  *   request: TypeOfRequest,
121  *   responseObserver: StreamObserver<TypeOfResponse> {
122  *     grpcUnary(scope, responseObserver) {
123  *       block
124  *     }
125  *   }
126  * }
127  * ```
128  */
129 @kotlinx.coroutines.ExperimentalCoroutinesApi
grpcUnarynull130 fun <T> grpcUnary(
131     scope: CoroutineScope,
132     responseObserver: StreamObserver<T>,
133     timeout: Long = 60,
134     block: suspend () -> T
135 ): Job {
136     return scope.launch {
137         try {
138             val response = withTimeout(timeout * 1000) { block() }
139             responseObserver.onNext(response)
140             responseObserver.onCompleted()
141         } catch (e: Throwable) {
142             e.printStackTrace()
143             val sw = StringWriter()
144             e.printStackTrace(PrintWriter(sw))
145             responseObserver.onError(
146                 Status.UNKNOWN.withCause(e).withDescription(sw.toString()).asException()
147             )
148         }
149     }
150 }
151 
152 /**
153  * Creates a gRPC coroutine in a given coroutine scope which executes a given suspended function
154  * taking in a Flow of gRPC requests and returning a Flow of gRPC responses and sends it on a given
155  * gRPC stream observer.
156  *
157  * @param T the type of gRPC response.
158  * @param scope coroutine scope used to run the coroutine.
159  * @param responseObserver the gRPC stream observer on which to send the response.
160  * @param block the suspended function transforming the request Flow to the response Flow.
161  * @return a StreamObserver for the incoming requests.
162  *
163  * Example usage:
164  * ```
165  * override fun grpcMethod(
166  *   responseObserver: StreamObserver<TypeOfResponse> {
167  *     grpcBidirectionalStream(scope, responseObserver) {
168  *       block
169  *     }
170  *   }
171  * }
172  * ```
173  */
174 @kotlinx.coroutines.ExperimentalCoroutinesApi
grpcBidirectionalStreamnull175 fun <T, U> grpcBidirectionalStream(
176     scope: CoroutineScope,
177     responseObserver: StreamObserver<U>,
178     block: CoroutineScope.(Flow<T>) -> Flow<U>
179 ): StreamObserver<T> {
180 
181     val inputChannel = Channel<T>()
182     val serverCallStreamObserver = responseObserver as ServerCallStreamObserver<T>
183 
184     val job =
185         scope.launch {
186             block(inputChannel.consumeAsFlow())
187                 .onEach { responseObserver.onNext(it) }
188                 .onCompletion { error ->
189                     if (error == null) {
190                         responseObserver.onCompleted()
191                     }
192                 }
193                 .catch {
194                     it.printStackTrace()
195                     val sw = StringWriter()
196                     it.printStackTrace(PrintWriter(sw))
197                     responseObserver.onError(
198                         Status.UNKNOWN.withCause(it).withDescription(sw.toString()).asException()
199                     )
200                 }
201                 .launchIn(this)
202         }
203 
204     serverCallStreamObserver.setOnCancelHandler { job.cancel() }
205 
206     return object : StreamObserver<T> {
207         override fun onNext(req: T) {
208             // Note: this should be made a blocking call, and the handler should run in a separate
209             // thread
210             // so we get flow control - but for now we can live with this
211             if (inputChannel.trySend(req).isFailure) {
212                 job.cancel(CancellationException("too many incoming requests, buffer exceeded"))
213                 responseObserver.onError(
214                     CancellationException("too many incoming requests, buffer exceeded")
215                 )
216             }
217         }
218 
219         override fun onCompleted() {
220             // stop the input flow, but keep the job running
221             inputChannel.close()
222         }
223 
224         override fun onError(e: Throwable) {
225             job.cancel()
226             e.printStackTrace()
227             val sw = StringWriter()
228             e.printStackTrace(PrintWriter(sw))
229             responseObserver.onError(
230                 Status.UNKNOWN.withCause(e).withDescription(sw.toString()).asException()
231             )
232         }
233     }
234 }
235 
236 /**
237  * Creates a gRPC coroutine in a given coroutine scope which executes a given suspended function
238  * taking in a Flow of gRPC requests and returning a Flow of gRPC responses and sends it on a given
239  * gRPC stream observer.
240  *
241  * @param T the type of gRPC response.
242  * @param scope coroutine scope used to run the coroutine.
243  * @param responseObserver the gRPC stream observer on which to send the response.
244  * @param block the suspended function producing the response Flow.
245  * @return a StreamObserver for the incoming requests.
246  *
247  * Example usage:
248  * ```
249  * override fun grpcMethod(
250  *   request: TypeOfRequest,
251  *   responseObserver: StreamObserver<TypeOfResponse> {
252  *     grpcServerStream(scope, responseObserver) {
253  *       block
254  *     }
255  *   }
256  * }
257  * ```
258  */
259 @kotlinx.coroutines.ExperimentalCoroutinesApi
grpcServerStreamnull260 fun <T> grpcServerStream(
261     scope: CoroutineScope,
262     responseObserver: StreamObserver<T>,
263     block: CoroutineScope.() -> Flow<T>
264 ) {
265     val serverCallStreamObserver = responseObserver as ServerCallStreamObserver<T>
266 
267     val job =
268         scope.launch {
269             block()
270                 .onEach { responseObserver.onNext(it) }
271                 .onCompletion { error ->
272                     if (error == null) {
273                         responseObserver.onCompleted()
274                     }
275                 }
276                 .catch {
277                     it.printStackTrace()
278                     val sw = StringWriter()
279                     it.printStackTrace(PrintWriter(sw))
280                     responseObserver.onError(
281                         Status.UNKNOWN.withCause(it).withDescription(sw.toString()).asException()
282                     )
283                 }
284                 .launchIn(this)
285         }
286 
287     serverCallStreamObserver.setOnCancelHandler { job.cancel() }
288 }
289 
290 /**
291  * Synchronous method to get a Bluetooth profile proxy.
292  *
293  * @param T the type of profile proxy (e.g. BluetoothA2dp)
294  * @param context context
295  * @param bluetoothAdapter local Bluetooth adapter
296  * @param profile identifier of the Bluetooth profile (e.g. BluetoothProfile#A2DP)
297  * @return T the desired profile proxy
298  */
299 @Suppress("UNCHECKED_CAST")
300 @kotlinx.coroutines.ExperimentalCoroutinesApi
getProfileProxynull301 fun <T> getProfileProxy(context: Context, profile: Int): T {
302     var proxy: BluetoothProfile?
303     runBlocking {
304         val bluetoothManager = context.getSystemService(BluetoothManager::class.java)!!
305         val bluetoothAdapter = bluetoothManager.adapter
306 
307         val flow = callbackFlow {
308             val serviceListener =
309                 object : BluetoothProfile.ServiceListener {
310                     override fun onServiceConnected(profile: Int, proxy: BluetoothProfile) {
311                         trySendBlocking(proxy)
312                     }
313                     override fun onServiceDisconnected(profile: Int) {}
314                 }
315 
316             bluetoothAdapter.getProfileProxy(context, serviceListener, profile)
317 
318             awaitClose {}
319         }
320         proxy = withTimeoutOrNull(5_000) { flow.first() }
321     }
322     if (proxy == null) {
323         Log.w(TAG, "profile proxy $profile is null")
324     }
325     return proxy!! as T
326 }
327 
Intentnull328 fun Intent.getBluetoothDeviceExtra(): BluetoothDevice =
329     this.getParcelableExtra(BluetoothDevice.EXTRA_DEVICE, BluetoothDevice::class.java)!!
330 
331 fun ByteString.decodeAsMacAddressToString(): String =
332     MacAddress.fromBytes(this.toByteArray()).toString().uppercase()
333 
334 fun ByteString.toBluetoothDevice(adapter: BluetoothAdapter): BluetoothDevice =
335     adapter.getRemoteDevice(this.decodeAsMacAddressToString())
336 
337 fun Connection.toBluetoothDevice(adapter: BluetoothAdapter): BluetoothDevice =
338     adapter.getRemoteDevice(this.address)
339 
340 val Connection.address: String
341     get() = InternalConnectionRef.parseFrom(this.cookie.value).address.decodeAsMacAddressToString()
342 
343 val Connection.transport: Int
344     get() = InternalConnectionRef.parseFrom(this.cookie.value).transport
345 
346 fun BluetoothDevice.toByteString() =
347     ByteString.copyFrom(MacAddress.fromString(this.address).toByteArray())!!
348 
349 fun BluetoothDevice.toConnection(transport: Int): Connection {
350     val internal_connection_ref =
351         InternalConnectionRef.newBuilder()
352             .setAddress(ByteString.copyFrom(MacAddress.fromString(this.address).toByteArray()))
353             .setTransport(transport)
354             .build()
355     val cookie = Any.newBuilder().setValue(internal_connection_ref.toByteString()).build()
356 
357     return Connection.newBuilder().setCookie(cookie).build()
358 }
359 
360 /** Creates Audio track instance and returns the reference. */
buildAudioTracknull361 fun buildAudioTrack(): AudioTrack? {
362     return AudioTrack.Builder()
363         .setAudioAttributes(
364             AudioAttributes.Builder()
365                 .setUsage(AudioAttributes.USAGE_MEDIA)
366                 .setContentType(AudioAttributes.CONTENT_TYPE_MUSIC)
367                 .build()
368         )
369         .setAudioFormat(
370             AudioFormat.Builder()
371                 .setEncoding(AudioFormat.ENCODING_PCM_16BIT)
372                 .setSampleRate(44100)
373                 .setChannelMask(AudioFormat.CHANNEL_OUT_STEREO)
374                 .build()
375         )
376         .setTransferMode(AudioTrack.MODE_STREAM)
377         .setBufferSizeInBytes(44100 * 2 * 2)
378         .build()
379 }
380 
381 /**
382  * Generates Alpha-numeric string of given length.
383  *
384  * @param length required string size.
385  * @return a generated string
386  */
generateAlphanumericStringnull387 fun generateAlphanumericString(length: Int): String {
388     return buildString { repeat(length) { append(alphanumeric.random()) } }
389 }
390