1 /*
2  * Copyright (C) 2013 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.accessorydisplay.common;
18 
19 import android.os.Handler;
20 import android.os.Looper;
21 import android.os.Message;
22 import android.util.SparseArray;
23 
24 import java.io.IOException;
25 import java.nio.ByteBuffer;
26 
27 /**
28  * A simple message transport.
29  * <p>
30  * This object's interface is thread-safe, however incoming messages
31  * are always delivered on the {@link Looper} thread on which the transport
32  * was created.
33  * </p>
34  */
35 public abstract class Transport {
36     private static final int MAX_INPUT_BUFFERS = 8;
37 
38     private final Logger mLogger;
39 
40     // The transport thread looper and handler.
41     private final TransportHandler mHandler;
42 
43     // Lock to guard all mutable state.
44     private final Object mLock = new Object();
45 
46     // The output buffer.  Set to null when the transport is closed.
47     private ByteBuffer mOutputBuffer;
48 
49     // The input buffer pool.
50     private BufferPool mInputBufferPool;
51 
52     // The reader thread.  Initialized when reading starts.
53     private ReaderThread mThread;
54 
55     // The list of callbacks indexed by service id.
56     private final SparseArray<Callback> mServices = new SparseArray<Callback>();
57 
Transport(Logger logger, int maxPacketSize)58     public Transport(Logger logger, int maxPacketSize) {
59         mLogger = logger;
60         mHandler = new TransportHandler();
61         mOutputBuffer = ByteBuffer.allocate(maxPacketSize);
62         mInputBufferPool = new BufferPool(
63                 maxPacketSize, Protocol.MAX_ENVELOPE_SIZE, MAX_INPUT_BUFFERS);
64     }
65 
66     /**
67      * Gets the logger for debugging.
68      */
getLogger()69     public Logger getLogger() {
70         return mLogger;
71     }
72 
73     /**
74      * Gets the handler on the transport's thread.
75      */
getHandler()76     public Handler getHandler() {
77         return mHandler;
78     }
79 
80     /**
81      * Closes the transport.
82      */
close()83     public void close() {
84         synchronized (mLock) {
85             if (mOutputBuffer != null) {
86                 if (mThread == null) {
87                     ioClose();
88                 } else {
89                     // If the thread was started then it will be responsible for
90                     // closing the stream when it quits because it may currently
91                     // be in the process of reading from the stream so we can't simply
92                     // shut it down right now.
93                     mThread.quit();
94                 }
95                 mOutputBuffer = null;
96             }
97         }
98     }
99 
100     /**
101      * Sends a message.
102      *
103      * @param service The service to whom the message is addressed.
104      * @param what The message type.
105      * @param content The content, or null if there is none.
106      * @return True if the message was sent successfully, false if an error occurred.
107      */
sendMessage(int service, int what, ByteBuffer content)108     public boolean sendMessage(int service, int what, ByteBuffer content) {
109         checkServiceId(service);
110         checkMessageId(what);
111 
112         try {
113             synchronized (mLock) {
114                 if (mOutputBuffer == null) {
115                     mLogger.logError("Send message failed because transport was closed.");
116                     return false;
117                 }
118 
119                 final byte[] outputArray = mOutputBuffer.array();
120                 final int capacity = mOutputBuffer.capacity();
121                 mOutputBuffer.clear();
122                 mOutputBuffer.putShort((short)service);
123                 mOutputBuffer.putShort((short)what);
124                 if (content == null) {
125                     mOutputBuffer.putInt(0);
126                 } else {
127                     final int contentLimit = content.limit();
128                     int contentPosition = content.position();
129                     int contentRemaining = contentLimit - contentPosition;
130                     if (contentRemaining > Protocol.MAX_CONTENT_SIZE) {
131                         throw new IllegalArgumentException("Message content too large: "
132                                 + contentRemaining + " > " + Protocol.MAX_CONTENT_SIZE);
133                     }
134                     mOutputBuffer.putInt(contentRemaining);
135                     while (contentRemaining != 0) {
136                         final int outputAvailable = capacity - mOutputBuffer.position();
137                         if (contentRemaining <= outputAvailable) {
138                             mOutputBuffer.put(content);
139                             break;
140                         }
141                         content.limit(contentPosition + outputAvailable);
142                         mOutputBuffer.put(content);
143                         content.limit(contentLimit);
144                         ioWrite(outputArray, 0, capacity);
145                         contentPosition += outputAvailable;
146                         contentRemaining -= outputAvailable;
147                         mOutputBuffer.clear();
148                     }
149                 }
150                 ioWrite(outputArray, 0, mOutputBuffer.position());
151                 return true;
152             }
153         } catch (IOException ex) {
154             mLogger.logError("Send message failed: " + ex);
155             return false;
156         }
157     }
158 
159     /**
160      * Starts reading messages on a separate thread.
161      */
startReading()162     public void startReading() {
163         synchronized (mLock) {
164             if (mOutputBuffer == null) {
165                 throw new IllegalStateException("Transport has been closed");
166             }
167 
168             mThread = new ReaderThread();
169             mThread.start();
170         }
171     }
172 
173     /**
174      * Registers a service and provides a callback to receive messages.
175      *
176      * @param service The service id.
177      * @param callback The callback to use.
178      */
registerService(int service, Callback callback)179     public void registerService(int service, Callback callback) {
180         checkServiceId(service);
181         if (callback == null) {
182             throw new IllegalArgumentException("callback must not be null");
183         }
184 
185         synchronized (mLock) {
186             mServices.put(service, callback);
187         }
188     }
189 
190     /**
191      * Unregisters a service.
192      *
193      * @param service The service to unregister.
194      */
unregisterService(int service)195     public void unregisterService(int service) {
196         checkServiceId(service);
197 
198         synchronized (mLock) {
199             mServices.remove(service);
200         }
201     }
202 
dispatchMessageReceived(int service, int what, ByteBuffer content)203     private void dispatchMessageReceived(int service, int what, ByteBuffer content) {
204         final Callback callback;
205         synchronized (mLock) {
206             callback = mServices.get(service);
207         }
208         if (callback != null) {
209             callback.onMessageReceived(service, what, content);
210         } else {
211             mLogger.log("Discarding message " + what
212                     + " for unregistered service " + service);
213         }
214     }
215 
checkServiceId(int service)216     private static void checkServiceId(int service) {
217         if (service < 0 || service > 0xffff) {
218             throw new IllegalArgumentException("service id out of range: " + service);
219         }
220     }
221 
checkMessageId(int what)222     private static void checkMessageId(int what) {
223         if (what < 0 || what > 0xffff) {
224             throw new IllegalArgumentException("message id out of range: " + what);
225         }
226     }
227 
228     // The IO methods must be safe to call on any thread.
229     // They may be called concurrently.
ioClose()230     protected abstract void ioClose();
ioRead(byte[] buffer, int offset, int count)231     protected abstract int ioRead(byte[] buffer, int offset, int count)
232             throws IOException;
ioWrite(byte[] buffer, int offset, int count)233     protected abstract void ioWrite(byte[] buffer, int offset, int count)
234             throws IOException;
235 
236     /**
237      * Callback for services that handle received messages.
238      */
239     public interface Callback {
240         /**
241          * Indicates that a message was received.
242          *
243          * @param service The service to whom the message is addressed.
244          * @param what The message type.
245          * @param content The content, or null if there is none.
246          */
onMessageReceived(int service, int what, ByteBuffer content)247         public void onMessageReceived(int service, int what, ByteBuffer content);
248     }
249 
250     final class TransportHandler extends Handler {
251         @Override
handleMessage(Message msg)252         public void handleMessage(Message msg) {
253             final ByteBuffer buffer = (ByteBuffer)msg.obj;
254             try {
255                 final int limit = buffer.limit();
256                 while (buffer.position() < limit) {
257                     final int service = buffer.getShort() & 0xffff;
258                     final int what = buffer.getShort() & 0xffff;
259                     final int contentSize = buffer.getInt();
260                     if (contentSize == 0) {
261                         dispatchMessageReceived(service, what, null);
262                     } else {
263                         final int end = buffer.position() + contentSize;
264                         buffer.limit(end);
265                         dispatchMessageReceived(service, what, buffer);
266                         buffer.limit(limit);
267                         buffer.position(end);
268                     }
269                 }
270             } finally {
271                 mInputBufferPool.release(buffer);
272             }
273         }
274     }
275 
276     final class ReaderThread extends Thread {
277         // Set to true when quitting.
278         private volatile boolean mQuitting;
279 
ReaderThread()280         public ReaderThread() {
281             super("Accessory Display Transport");
282         }
283 
284         @Override
run()285         public void run() {
286             loop();
287             ioClose();
288         }
289 
loop()290         private void loop() {
291             ByteBuffer buffer = null;
292             int length = Protocol.HEADER_SIZE;
293             int contentSize = -1;
294             outer: while (!mQuitting) {
295                 // Get a buffer.
296                 if (buffer == null) {
297                     buffer = mInputBufferPool.acquire(length);
298                 } else {
299                     buffer = mInputBufferPool.grow(buffer, length);
300                 }
301 
302                 // Read more data until needed number of bytes obtained.
303                 int position = buffer.position();
304                 int count;
305                 try {
306                     count = ioRead(buffer.array(), position, buffer.capacity() - position);
307                     if (count < 0) {
308                         break; // end of stream
309                     }
310                 } catch (IOException ex) {
311                     mLogger.logError("Read failed: " + ex);
312                     break; // error
313                 }
314                 position += count;
315                 buffer.position(position);
316                 if (contentSize < 0 && position >= Protocol.HEADER_SIZE) {
317                     contentSize = buffer.getInt(4);
318                     if (contentSize < 0 || contentSize > Protocol.MAX_CONTENT_SIZE) {
319                         mLogger.logError("Encountered invalid content size: " + contentSize);
320                         break; // malformed stream
321                     }
322                     length += contentSize;
323                 }
324                 if (position < length) {
325                     continue; // need more data
326                 }
327 
328                 // There is at least one complete message in the buffer.
329                 // Find the end of a contiguous chunk of complete messages.
330                 int next = length;
331                 int remaining;
332                 for (;;) {
333                     length = Protocol.HEADER_SIZE;
334                     remaining = position - next;
335                     if (remaining < length) {
336                         contentSize = -1;
337                         break; // incomplete header, need more data
338                     }
339                     contentSize = buffer.getInt(next + 4);
340                     if (contentSize < 0 || contentSize > Protocol.MAX_CONTENT_SIZE) {
341                         mLogger.logError("Encountered invalid content size: " + contentSize);
342                         break outer; // malformed stream
343                     }
344                     length += contentSize;
345                     if (remaining < length) {
346                         break; // incomplete content, need more data
347                     }
348                     next += length;
349                 }
350 
351                 // Post the buffer then don't modify it anymore.
352                 // Now this is kind of sneaky.  We know that no other threads will
353                 // be acquiring buffers from the buffer pool so we can keep on
354                 // referring to this buffer as long as we don't modify its contents.
355                 // This allows us to operate in a single-buffered mode if desired.
356                 buffer.limit(next);
357                 buffer.rewind();
358                 mHandler.obtainMessage(0, buffer).sendToTarget();
359 
360                 // If there is an incomplete message at the end, then we will need
361                 // to copy it to a fresh buffer before continuing.  In the single-buffered
362                 // case, we may acquire the same buffer as before which is fine.
363                 if (remaining == 0) {
364                     buffer = null;
365                 } else {
366                     final ByteBuffer oldBuffer = buffer;
367                     buffer = mInputBufferPool.acquire(length);
368                     System.arraycopy(oldBuffer.array(), next, buffer.array(), 0, remaining);
369                     buffer.position(remaining);
370                 }
371             }
372 
373             if (buffer != null) {
374                 mInputBufferPool.release(buffer);
375             }
376         }
377 
quit()378         public void quit() {
379             mQuitting = true;
380         }
381     }
382 }
383