1 /* 2 * Copyright (C) 2019 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.internal.infra; 18 19 import static java.util.concurrent.TimeUnit.SECONDS; 20 21 import android.os.AsyncTask; 22 import android.os.ParcelFileDescriptor; 23 24 import com.android.internal.util.FunctionalUtils.ThrowingConsumer; 25 import com.android.internal.util.FunctionalUtils.ThrowingFunction; 26 27 import libcore.io.IoUtils; 28 29 import java.io.ByteArrayOutputStream; 30 import java.io.Closeable; 31 import java.io.IOException; 32 import java.io.InputStream; 33 import java.io.OutputStream; 34 import java.util.concurrent.Executor; 35 36 /** 37 * Utility class for streaming bytes across IPC, using standard APIs such as 38 * {@link InputStream}/{@link OutputStream} or simply {@code byte[]} 39 * 40 * <p> 41 * To use this, you'll want to declare your IPC methods to accept a {@link ParcelFileDescriptor}, 42 * and call them from within lambdas passed to {@link #receiveBytes}/{@link #sendBytes}, 43 * passing on the provided {@link ParcelFileDescriptor}. 44 * 45 * <p> 46 * E.g.: 47 * {@code 48 * //IFoo.aidl 49 * oneway interface IFoo { 50 * void sendGreetings(in ParcelFileDescriptor pipe); 51 * void receiveGreetings(in ParcelFileDescriptor pipe); 52 * } 53 * 54 * //Foo.java 55 * mServiceConnector.postAsync(service -> RemoteStream.sendBytes( 56 * pipe -> service.sendGreetings(pipe, greetings)))... 57 * 58 * mServiceConnector.postAsync(service -> RemoteStream.receiveBytes( 59 * pipe -> service.receiveGreetings(pipe))) 60 * .whenComplete((greetings, err) -> ...); 61 * } 62 * 63 * <p> 64 * Each operation has a 30 second timeout by default, as it's possible for an operation to be 65 * stuck forever otherwise. 66 * You can {@link #cancelTimeout cancel} and/or {@link #orTimeout set a custom timeout}, using the 67 * {@link AndroidFuture} you get as a result. 68 * 69 * <p> 70 * You can also {@link #cancel} the operation, which will result in closing the underlying 71 * {@link ParcelFileDescriptor}. 72 * 73 * @see #sendBytes 74 * @see #receiveBytes 75 * 76 * @param <RES> the result of a successful streaming. 77 * @param <IOSTREAM> either {@link InputStream} or {@link OutputStream} depending on the direction. 78 */ 79 public abstract class RemoteStream<RES, IOSTREAM extends Closeable> 80 extends AndroidFuture<RES> 81 implements Runnable { 82 83 private final ThrowingFunction<IOSTREAM, RES> mHandleStream; 84 private volatile ParcelFileDescriptor mLocalPipe; 85 86 /** 87 * Call an IPC, and process incoming bytes as an {@link InputStream} within {@code read}. 88 * 89 * @param ipc action to perform the IPC. Called directly on the calling thread. 90 * @param read action to read from an {@link InputStream}, transforming data into {@code R}. 91 * Called asynchronously on the background thread. 92 * @param <R> type of the end result of reading the bytes (if any). 93 * @return an {@link AndroidFuture} that can be used to track operation's completion and 94 * retrieve its result (if any). 95 */ receiveBytes( ThrowingConsumer<ParcelFileDescriptor> ipc, ThrowingFunction<InputStream, R> read)96 public static <R> AndroidFuture<R> receiveBytes( 97 ThrowingConsumer<ParcelFileDescriptor> ipc, ThrowingFunction<InputStream, R> read) { 98 return new RemoteStream<R, InputStream>( 99 ipc, read, AsyncTask.THREAD_POOL_EXECUTOR, true /* read */) { 100 @Override 101 protected InputStream createStream(ParcelFileDescriptor fd) { 102 return new ParcelFileDescriptor.AutoCloseInputStream(fd); 103 } 104 }; 105 } 106 107 /** 108 * Call an IPC, and asynchronously return incoming bytes as {@code byte[]}. 109 * 110 * @param ipc action to perform the IPC. Called directly on the calling thread. 111 * @return an {@link AndroidFuture} that can be used to track operation's completion and 112 * retrieve its result. 113 */ 114 public static AndroidFuture<byte[]> receiveBytes(ThrowingConsumer<ParcelFileDescriptor> ipc) { 115 return receiveBytes(ipc, RemoteStream::readAll); 116 } 117 118 /** 119 * Convert a given {@link InputStream} into {@code byte[]}. 120 * 121 * <p> 122 * This doesn't close the given {@link InputStream} 123 */ 124 public static byte[] readAll(InputStream inputStream) throws IOException { 125 ByteArrayOutputStream combinedBuffer = new ByteArrayOutputStream(); 126 byte[] buffer = new byte[16 * 1024]; 127 while (true) { 128 int numRead = inputStream.read(buffer); 129 if (numRead == -1) { 130 break; 131 } 132 combinedBuffer.write(buffer, 0, numRead); 133 } 134 return combinedBuffer.toByteArray(); 135 } 136 137 /** 138 * Call an IPC, and perform sending bytes via an {@link OutputStream} within {@code write}. 139 * 140 * @param ipc action to perform the IPC. Called directly on the calling thread. 141 * @param write action to write to an {@link OutputStream}, optionally returning operation 142 * result as {@code R}. Called asynchronously on the background thread. 143 * @param <R> type of the end result of writing the bytes (if any). 144 * @return an {@link AndroidFuture} that can be used to track operation's completion and 145 * retrieve its result (if any). 146 */ 147 public static <R> AndroidFuture<R> sendBytes( 148 ThrowingConsumer<ParcelFileDescriptor> ipc, ThrowingFunction<OutputStream, R> write) { 149 return new RemoteStream<R, OutputStream>( 150 ipc, write, AsyncTask.THREAD_POOL_EXECUTOR, false /* read */) { 151 @Override 152 protected OutputStream createStream(ParcelFileDescriptor fd) { 153 return new ParcelFileDescriptor.AutoCloseOutputStream(fd); 154 } 155 }; 156 } 157 158 /** 159 * Same as {@link #sendBytes(ThrowingConsumer, ThrowingFunction)}, but explicitly avoids 160 * returning a result. 161 */ 162 public static AndroidFuture<Void> sendBytes( 163 ThrowingConsumer<ParcelFileDescriptor> ipc, ThrowingConsumer<OutputStream> write) { 164 return sendBytes(ipc, os -> { 165 write.acceptOrThrow(os); 166 return null; 167 }); 168 } 169 170 /** 171 * Same as {@link #sendBytes(ThrowingConsumer, ThrowingFunction)}, but providing the data to 172 * send eagerly as {@code byte[]}. 173 */ 174 public static AndroidFuture<Void> sendBytes( 175 ThrowingConsumer<ParcelFileDescriptor> ipc, byte[] data) { 176 return sendBytes(ipc, os -> { 177 os.write(data); 178 return null; 179 }); 180 } 181 182 private RemoteStream( 183 ThrowingConsumer<ParcelFileDescriptor> ipc, 184 ThrowingFunction<IOSTREAM, RES> handleStream, 185 Executor backgroundExecutor, 186 boolean read) { 187 mHandleStream = handleStream; 188 189 ParcelFileDescriptor[] pipe; 190 try { 191 //TODO consider using createReliablePipe 192 pipe = ParcelFileDescriptor.createPipe(); 193 try (ParcelFileDescriptor remotePipe = pipe[read ? 1 : 0]) { 194 ipc.acceptOrThrow(remotePipe); 195 // Remote pipe end is duped by binder call. Local copy is not needed anymore 196 } 197 198 mLocalPipe = pipe[read ? 0 : 1]; 199 backgroundExecutor.execute(this); 200 201 // Guard against getting stuck forever 202 orTimeout(30, SECONDS); 203 } catch (Throwable e) { 204 completeExceptionally(e); 205 // mLocalPipe closes in #onCompleted 206 } 207 } 208 209 protected abstract IOSTREAM createStream(ParcelFileDescriptor fd); 210 211 @Override 212 public void run() { 213 try (IOSTREAM stream = createStream(mLocalPipe)) { 214 complete(mHandleStream.applyOrThrow(stream)); 215 } catch (Throwable t) { 216 completeExceptionally(t); 217 } 218 } 219 220 @Override 221 protected void onCompleted(RES res, Throwable err) { 222 super.onCompleted(res, err); 223 IoUtils.closeQuietly(mLocalPipe); 224 } 225 } 226