1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.internal.infra;
18 
19 import static java.util.concurrent.TimeUnit.SECONDS;
20 
21 import android.os.AsyncTask;
22 import android.os.ParcelFileDescriptor;
23 
24 import com.android.internal.util.FunctionalUtils.ThrowingConsumer;
25 import com.android.internal.util.FunctionalUtils.ThrowingFunction;
26 
27 import libcore.io.IoUtils;
28 
29 import java.io.ByteArrayOutputStream;
30 import java.io.Closeable;
31 import java.io.IOException;
32 import java.io.InputStream;
33 import java.io.OutputStream;
34 import java.util.concurrent.Executor;
35 
36 /**
37  * Utility class for streaming bytes across IPC, using standard APIs such as
38  * {@link InputStream}/{@link OutputStream} or simply {@code byte[]}
39  *
40  * <p>
41  * To use this, you'll want to declare your IPC methods to accept a {@link ParcelFileDescriptor},
42  * and call them from within lambdas passed to {@link #receiveBytes}/{@link #sendBytes},
43  * passing on the provided {@link ParcelFileDescriptor}.
44  *
45  * <p>
46  * E.g.:
47  * {@code
48  *     //IFoo.aidl
49  *     oneway interface IFoo {
50  *         void sendGreetings(in ParcelFileDescriptor pipe);
51  *         void receiveGreetings(in ParcelFileDescriptor pipe);
52  *     }
53  *
54  *     //Foo.java
55  *     mServiceConnector.postAsync(service -> RemoteStream.sendBytes(
56  *             pipe -> service.sendGreetings(pipe, greetings)))...
57  *
58  *     mServiceConnector.postAsync(service -> RemoteStream.receiveBytes(
59  *                    pipe -> service.receiveGreetings(pipe)))
60  *                .whenComplete((greetings, err) -> ...);
61  * }
62  *
63  * <p>
64  * Each operation has a 30 second timeout by default, as it's possible for an operation to be
65  * stuck forever otherwise.
66  * You can {@link #cancelTimeout cancel} and/or {@link #orTimeout set a custom timeout}, using the
67  * {@link AndroidFuture} you get as a result.
68  *
69  * <p>
70  * You can also {@link #cancel} the operation, which will result in closing the underlying
71  * {@link ParcelFileDescriptor}.
72  *
73  * @see #sendBytes
74  * @see #receiveBytes
75  *
76  * @param <RES> the result of a successful streaming.
77  * @param <IOSTREAM> either {@link InputStream} or {@link OutputStream} depending on the direction.
78  */
79 public abstract class RemoteStream<RES, IOSTREAM extends Closeable>
80         extends AndroidFuture<RES>
81         implements Runnable {
82 
83     private final ThrowingFunction<IOSTREAM, RES> mHandleStream;
84     private volatile ParcelFileDescriptor mLocalPipe;
85 
86     /**
87      * Call an IPC, and process incoming bytes as an {@link InputStream} within {@code read}.
88      *
89      * @param ipc action to perform the IPC. Called directly on the calling thread.
90      * @param read action to read from an {@link InputStream}, transforming data into {@code R}.
91      *             Called asynchronously on the background thread.
92      * @param <R> type of the end result of reading the bytes (if any).
93      * @return an {@link AndroidFuture} that can be used to track operation's completion and
94      *         retrieve its result (if any).
95      */
receiveBytes( ThrowingConsumer<ParcelFileDescriptor> ipc, ThrowingFunction<InputStream, R> read)96     public static <R> AndroidFuture<R> receiveBytes(
97             ThrowingConsumer<ParcelFileDescriptor> ipc, ThrowingFunction<InputStream, R> read) {
98         return new RemoteStream<R, InputStream>(
99                 ipc, read, AsyncTask.THREAD_POOL_EXECUTOR, true /* read */) {
100             @Override
101             protected InputStream createStream(ParcelFileDescriptor fd) {
102                 return new ParcelFileDescriptor.AutoCloseInputStream(fd);
103             }
104         };
105     }
106 
107     /**
108      * Call an IPC, and asynchronously return incoming bytes as {@code byte[]}.
109      *
110      * @param ipc action to perform the IPC. Called directly on the calling thread.
111      * @return an {@link AndroidFuture} that can be used to track operation's completion and
112      *         retrieve its result.
113      */
114     public static AndroidFuture<byte[]> receiveBytes(ThrowingConsumer<ParcelFileDescriptor> ipc) {
115         return receiveBytes(ipc, RemoteStream::readAll);
116     }
117 
118     /**
119      * Convert a given {@link InputStream} into {@code byte[]}.
120      *
121      * <p>
122      * This doesn't close the given {@link InputStream}
123      */
124     public static byte[] readAll(InputStream inputStream) throws IOException {
125         ByteArrayOutputStream combinedBuffer = new ByteArrayOutputStream();
126         byte[] buffer = new byte[16 * 1024];
127         while (true) {
128             int numRead = inputStream.read(buffer);
129             if (numRead == -1) {
130                 break;
131             }
132             combinedBuffer.write(buffer, 0, numRead);
133         }
134         return combinedBuffer.toByteArray();
135     }
136 
137     /**
138      * Call an IPC, and perform sending bytes via an {@link OutputStream} within {@code write}.
139      *
140      * @param ipc action to perform the IPC. Called directly on the calling thread.
141      * @param write action to write to an {@link OutputStream}, optionally returning operation
142      *              result as {@code R}. Called asynchronously on the background thread.
143      * @param <R> type of the end result of writing the bytes (if any).
144      * @return an {@link AndroidFuture} that can be used to track operation's completion and
145      *         retrieve its result (if any).
146      */
147     public static <R> AndroidFuture<R> sendBytes(
148             ThrowingConsumer<ParcelFileDescriptor> ipc, ThrowingFunction<OutputStream, R> write) {
149         return new RemoteStream<R, OutputStream>(
150                 ipc, write, AsyncTask.THREAD_POOL_EXECUTOR, false /* read */) {
151             @Override
152             protected OutputStream createStream(ParcelFileDescriptor fd) {
153                 return new ParcelFileDescriptor.AutoCloseOutputStream(fd);
154             }
155         };
156     }
157 
158     /**
159      * Same as {@link #sendBytes(ThrowingConsumer, ThrowingFunction)}, but explicitly avoids
160      * returning a result.
161      */
162     public static AndroidFuture<Void> sendBytes(
163             ThrowingConsumer<ParcelFileDescriptor> ipc, ThrowingConsumer<OutputStream> write) {
164         return sendBytes(ipc, os -> {
165             write.acceptOrThrow(os);
166             return null;
167         });
168     }
169 
170     /**
171      * Same as {@link #sendBytes(ThrowingConsumer, ThrowingFunction)}, but providing the data to
172      * send eagerly as {@code byte[]}.
173      */
174     public static AndroidFuture<Void> sendBytes(
175             ThrowingConsumer<ParcelFileDescriptor> ipc, byte[] data) {
176         return sendBytes(ipc, os -> {
177             os.write(data);
178             return null;
179         });
180     }
181 
182     private RemoteStream(
183             ThrowingConsumer<ParcelFileDescriptor> ipc,
184             ThrowingFunction<IOSTREAM, RES> handleStream,
185             Executor backgroundExecutor,
186             boolean read) {
187         mHandleStream = handleStream;
188 
189         ParcelFileDescriptor[] pipe;
190         try {
191             //TODO consider using createReliablePipe
192             pipe = ParcelFileDescriptor.createPipe();
193             try (ParcelFileDescriptor remotePipe = pipe[read ? 1 : 0]) {
194                 ipc.acceptOrThrow(remotePipe);
195                 // Remote pipe end is duped by binder call. Local copy is not needed anymore
196             }
197 
198             mLocalPipe = pipe[read ? 0 : 1];
199             backgroundExecutor.execute(this);
200 
201             // Guard against getting stuck forever
202             orTimeout(30, SECONDS);
203         } catch (Throwable e) {
204             completeExceptionally(e);
205             // mLocalPipe closes in #onCompleted
206         }
207     }
208 
209     protected abstract IOSTREAM createStream(ParcelFileDescriptor fd);
210 
211     @Override
212     public void run() {
213         try (IOSTREAM stream = createStream(mLocalPipe)) {
214             complete(mHandleStream.applyOrThrow(stream));
215         } catch (Throwable t) {
216             completeExceptionally(t);
217         }
218     }
219 
220     @Override
221     protected void onCompleted(RES res, Throwable err) {
222         super.onCompleted(res, err);
223         IoUtils.closeQuietly(mLocalPipe);
224     }
225 }
226