1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.example.android.vdmdemo.common;
18 
19 import android.os.Handler;
20 import android.os.HandlerThread;
21 import android.util.ArrayMap;
22 import android.util.Log;
23 
24 import androidx.annotation.GuardedBy;
25 
26 import com.example.android.vdmdemo.common.RemoteEventProto.RemoteEvent;
27 
28 import java.io.IOException;
29 import java.io.InputStream;
30 import java.io.OutputStream;
31 import java.util.Map;
32 import java.util.concurrent.Executor;
33 import java.util.concurrent.Executors;
34 import java.util.function.Consumer;
35 
36 import javax.inject.Inject;
37 import javax.inject.Singleton;
38 
39 /** Simple message exchange framework between the client and the host. */
40 @Singleton
41 public class RemoteIo {
42     public static final String TAG = "VdmRemoteIo";
43 
44     interface StreamClosedCallback {
onStreamClosed()45         void onStreamClosed();
46     }
47 
48     private final Object mLock = new Object();
49 
50     @GuardedBy("mLock")
51     private OutputStream mOutputStream = null;
52 
53     private StreamClosedCallback mOutputStreamClosedCallback = null;
54     private final Handler mSendMessageHandler;
55 
56     @GuardedBy("mMessageConsumers")
57     private final Map<Object, MessageConsumer> mMessageConsumers = new ArrayMap<>();
58 
59     @Inject
RemoteIo()60     RemoteIo() {
61         final HandlerThread sendMessageThread = new HandlerThread("SendMessageThread");
62         sendMessageThread.start();
63         mSendMessageHandler = new Handler(sendMessageThread.getLooper());
64     }
65 
66     @SuppressWarnings("ThreadPriorityCheck")
initialize(InputStream inputStream, StreamClosedCallback inputStreamClosedCallback)67     void initialize(InputStream inputStream, StreamClosedCallback inputStreamClosedCallback) {
68         Thread t = new Thread(new ReceiverRunnable(inputStream, inputStreamClosedCallback));
69         t.setPriority(Thread.MAX_PRIORITY);
70         t.start();
71     }
72 
initialize( OutputStream outputStream, StreamClosedCallback outputStreamClosedCallback)73     void initialize(
74             OutputStream outputStream, StreamClosedCallback outputStreamClosedCallback) {
75         synchronized (mLock) {
76             mOutputStream = outputStream;
77             mOutputStreamClosedCallback = outputStreamClosedCallback;
78         }
79     }
80 
81     /** Registers a consumer for processing events coming from the remote device. */
addMessageConsumer(Consumer<RemoteEvent> consumer)82     public void addMessageConsumer(Consumer<RemoteEvent> consumer) {
83         synchronized (mMessageConsumers) {
84             mMessageConsumers.put(consumer, new MessageConsumer(consumer));
85         }
86     }
87 
88     /** Unregisters a previously registered message consumer. */
removeMessageConsumer(Consumer<RemoteEvent> consumer)89     public void removeMessageConsumer(Consumer<RemoteEvent> consumer) {
90         synchronized (mMessageConsumers) {
91             if (mMessageConsumers.remove(consumer) == null) {
92                 Log.w(TAG, "Failed to remove message consumer.");
93             }
94         }
95     }
96 
97     /** Sends an event to the remote device. */
sendMessage(RemoteEvent event)98     public void sendMessage(RemoteEvent event) {
99         synchronized (mLock) {
100             if (mOutputStream == null) {
101                 Log.e(TAG, "Failed to send event, RemoteIO not initialized.");
102                 return;
103             }
104         }
105         mSendMessageHandler.post(() -> {
106             synchronized (mLock) {
107                 try {
108                     event.writeDelimitedTo(mOutputStream);
109                     mOutputStream.flush();
110                 } catch (IOException e) {
111                     mOutputStream = null;
112                     mOutputStreamClosedCallback.onStreamClosed();
113                 }
114             }
115         });
116     }
117 
118     private class ReceiverRunnable implements Runnable {
119 
120         private final InputStream mInputStream;
121         private final StreamClosedCallback mInputStreamClosedCallback;
122 
ReceiverRunnable(InputStream inputStream, StreamClosedCallback inputStreamClosedCallback)123         ReceiverRunnable(InputStream inputStream, StreamClosedCallback inputStreamClosedCallback) {
124             mInputStream = inputStream;
125             mInputStreamClosedCallback = inputStreamClosedCallback;
126         }
127 
128         @Override
run()129         public void run() {
130             try {
131                 while (true) {
132                     RemoteEvent event = RemoteEvent.parseDelimitedFrom(mInputStream);
133                     if (event == null) {
134                         break;
135                     }
136                     synchronized (mMessageConsumers) {
137                         mMessageConsumers.values().forEach(consumer -> consumer.accept(event));
138                     }
139                 }
140             } catch (IOException e) {
141                 Log.e(TAG, "Failed to obtain event: " + e);
142             }
143             mInputStreamClosedCallback.onStreamClosed();
144         }
145     }
146 
147     private static class MessageConsumer {
148         private final Executor mExecutor;
149         private final Consumer<RemoteEvent> mConsumer;
150 
MessageConsumer(Consumer<RemoteEvent> consumer)151         MessageConsumer(Consumer<RemoteEvent> consumer) {
152             mExecutor = Executors.newSingleThreadExecutor();
153             mConsumer = consumer;
154         }
155 
accept(RemoteEvent event)156         public void accept(RemoteEvent event) {
157             mExecutor.execute(() -> mConsumer.accept(event));
158         }
159     }
160 }
161