1 /* 2 * Copyright (C) 2023 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.example.android.vdmdemo.common; 18 19 import android.os.Handler; 20 import android.os.HandlerThread; 21 import android.util.ArrayMap; 22 import android.util.Log; 23 24 import androidx.annotation.GuardedBy; 25 26 import com.example.android.vdmdemo.common.RemoteEventProto.RemoteEvent; 27 28 import java.io.IOException; 29 import java.io.InputStream; 30 import java.io.OutputStream; 31 import java.util.Map; 32 import java.util.concurrent.Executor; 33 import java.util.concurrent.Executors; 34 import java.util.function.Consumer; 35 36 import javax.inject.Inject; 37 import javax.inject.Singleton; 38 39 /** Simple message exchange framework between the client and the host. */ 40 @Singleton 41 public class RemoteIo { 42 public static final String TAG = "VdmRemoteIo"; 43 44 interface StreamClosedCallback { onStreamClosed()45 void onStreamClosed(); 46 } 47 48 private final Object mLock = new Object(); 49 50 @GuardedBy("mLock") 51 private OutputStream mOutputStream = null; 52 53 private StreamClosedCallback mOutputStreamClosedCallback = null; 54 private final Handler mSendMessageHandler; 55 56 @GuardedBy("mMessageConsumers") 57 private final Map<Object, MessageConsumer> mMessageConsumers = new ArrayMap<>(); 58 59 @Inject RemoteIo()60 RemoteIo() { 61 final HandlerThread sendMessageThread = new HandlerThread("SendMessageThread"); 62 sendMessageThread.start(); 63 mSendMessageHandler = new Handler(sendMessageThread.getLooper()); 64 } 65 66 @SuppressWarnings("ThreadPriorityCheck") initialize(InputStream inputStream, StreamClosedCallback inputStreamClosedCallback)67 void initialize(InputStream inputStream, StreamClosedCallback inputStreamClosedCallback) { 68 Thread t = new Thread(new ReceiverRunnable(inputStream, inputStreamClosedCallback)); 69 t.setPriority(Thread.MAX_PRIORITY); 70 t.start(); 71 } 72 initialize( OutputStream outputStream, StreamClosedCallback outputStreamClosedCallback)73 void initialize( 74 OutputStream outputStream, StreamClosedCallback outputStreamClosedCallback) { 75 synchronized (mLock) { 76 mOutputStream = outputStream; 77 mOutputStreamClosedCallback = outputStreamClosedCallback; 78 } 79 } 80 81 /** Registers a consumer for processing events coming from the remote device. */ addMessageConsumer(Consumer<RemoteEvent> consumer)82 public void addMessageConsumer(Consumer<RemoteEvent> consumer) { 83 synchronized (mMessageConsumers) { 84 mMessageConsumers.put(consumer, new MessageConsumer(consumer)); 85 } 86 } 87 88 /** Unregisters a previously registered message consumer. */ removeMessageConsumer(Consumer<RemoteEvent> consumer)89 public void removeMessageConsumer(Consumer<RemoteEvent> consumer) { 90 synchronized (mMessageConsumers) { 91 if (mMessageConsumers.remove(consumer) == null) { 92 Log.w(TAG, "Failed to remove message consumer."); 93 } 94 } 95 } 96 97 /** Sends an event to the remote device. */ sendMessage(RemoteEvent event)98 public void sendMessage(RemoteEvent event) { 99 synchronized (mLock) { 100 if (mOutputStream == null) { 101 Log.e(TAG, "Failed to send event, RemoteIO not initialized."); 102 return; 103 } 104 } 105 mSendMessageHandler.post(() -> { 106 synchronized (mLock) { 107 try { 108 event.writeDelimitedTo(mOutputStream); 109 mOutputStream.flush(); 110 } catch (IOException e) { 111 mOutputStream = null; 112 mOutputStreamClosedCallback.onStreamClosed(); 113 } 114 } 115 }); 116 } 117 118 private class ReceiverRunnable implements Runnable { 119 120 private final InputStream mInputStream; 121 private final StreamClosedCallback mInputStreamClosedCallback; 122 ReceiverRunnable(InputStream inputStream, StreamClosedCallback inputStreamClosedCallback)123 ReceiverRunnable(InputStream inputStream, StreamClosedCallback inputStreamClosedCallback) { 124 mInputStream = inputStream; 125 mInputStreamClosedCallback = inputStreamClosedCallback; 126 } 127 128 @Override run()129 public void run() { 130 try { 131 while (true) { 132 RemoteEvent event = RemoteEvent.parseDelimitedFrom(mInputStream); 133 if (event == null) { 134 break; 135 } 136 synchronized (mMessageConsumers) { 137 mMessageConsumers.values().forEach(consumer -> consumer.accept(event)); 138 } 139 } 140 } catch (IOException e) { 141 Log.e(TAG, "Failed to obtain event: " + e); 142 } 143 mInputStreamClosedCallback.onStreamClosed(); 144 } 145 } 146 147 private static class MessageConsumer { 148 private final Executor mExecutor; 149 private final Consumer<RemoteEvent> mConsumer; 150 MessageConsumer(Consumer<RemoteEvent> consumer)151 MessageConsumer(Consumer<RemoteEvent> consumer) { 152 mExecutor = Executors.newSingleThreadExecutor(); 153 mConsumer = consumer; 154 } 155 accept(RemoteEvent event)156 public void accept(RemoteEvent event) { 157 mExecutor.execute(() -> mConsumer.accept(event)); 158 } 159 } 160 } 161