1 /* 2 * Copyright (C) 2013 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.accessorydisplay.common; 18 19 import android.os.Handler; 20 import android.os.Looper; 21 import android.os.Message; 22 import android.util.SparseArray; 23 24 import java.io.IOException; 25 import java.nio.ByteBuffer; 26 27 /** 28 * A simple message transport. 29 * <p> 30 * This object's interface is thread-safe, however incoming messages 31 * are always delivered on the {@link Looper} thread on which the transport 32 * was created. 33 * </p> 34 */ 35 public abstract class Transport { 36 private static final int MAX_INPUT_BUFFERS = 8; 37 38 private final Logger mLogger; 39 40 // The transport thread looper and handler. 41 private final TransportHandler mHandler; 42 43 // Lock to guard all mutable state. 44 private final Object mLock = new Object(); 45 46 // The output buffer. Set to null when the transport is closed. 47 private ByteBuffer mOutputBuffer; 48 49 // The input buffer pool. 50 private BufferPool mInputBufferPool; 51 52 // The reader thread. Initialized when reading starts. 53 private ReaderThread mThread; 54 55 // The list of callbacks indexed by service id. 56 private final SparseArray<Callback> mServices = new SparseArray<Callback>(); 57 Transport(Logger logger, int maxPacketSize)58 public Transport(Logger logger, int maxPacketSize) { 59 mLogger = logger; 60 mHandler = new TransportHandler(); 61 mOutputBuffer = ByteBuffer.allocate(maxPacketSize); 62 mInputBufferPool = new BufferPool( 63 maxPacketSize, Protocol.MAX_ENVELOPE_SIZE, MAX_INPUT_BUFFERS); 64 } 65 66 /** 67 * Gets the logger for debugging. 68 */ getLogger()69 public Logger getLogger() { 70 return mLogger; 71 } 72 73 /** 74 * Gets the handler on the transport's thread. 75 */ getHandler()76 public Handler getHandler() { 77 return mHandler; 78 } 79 80 /** 81 * Closes the transport. 82 */ close()83 public void close() { 84 synchronized (mLock) { 85 if (mOutputBuffer != null) { 86 if (mThread == null) { 87 ioClose(); 88 } else { 89 // If the thread was started then it will be responsible for 90 // closing the stream when it quits because it may currently 91 // be in the process of reading from the stream so we can't simply 92 // shut it down right now. 93 mThread.quit(); 94 } 95 mOutputBuffer = null; 96 } 97 } 98 } 99 100 /** 101 * Sends a message. 102 * 103 * @param service The service to whom the message is addressed. 104 * @param what The message type. 105 * @param content The content, or null if there is none. 106 * @return True if the message was sent successfully, false if an error occurred. 107 */ sendMessage(int service, int what, ByteBuffer content)108 public boolean sendMessage(int service, int what, ByteBuffer content) { 109 checkServiceId(service); 110 checkMessageId(what); 111 112 try { 113 synchronized (mLock) { 114 if (mOutputBuffer == null) { 115 mLogger.logError("Send message failed because transport was closed."); 116 return false; 117 } 118 119 final byte[] outputArray = mOutputBuffer.array(); 120 final int capacity = mOutputBuffer.capacity(); 121 mOutputBuffer.clear(); 122 mOutputBuffer.putShort((short)service); 123 mOutputBuffer.putShort((short)what); 124 if (content == null) { 125 mOutputBuffer.putInt(0); 126 } else { 127 final int contentLimit = content.limit(); 128 int contentPosition = content.position(); 129 int contentRemaining = contentLimit - contentPosition; 130 if (contentRemaining > Protocol.MAX_CONTENT_SIZE) { 131 throw new IllegalArgumentException("Message content too large: " 132 + contentRemaining + " > " + Protocol.MAX_CONTENT_SIZE); 133 } 134 mOutputBuffer.putInt(contentRemaining); 135 while (contentRemaining != 0) { 136 final int outputAvailable = capacity - mOutputBuffer.position(); 137 if (contentRemaining <= outputAvailable) { 138 mOutputBuffer.put(content); 139 break; 140 } 141 content.limit(contentPosition + outputAvailable); 142 mOutputBuffer.put(content); 143 content.limit(contentLimit); 144 ioWrite(outputArray, 0, capacity); 145 contentPosition += outputAvailable; 146 contentRemaining -= outputAvailable; 147 mOutputBuffer.clear(); 148 } 149 } 150 ioWrite(outputArray, 0, mOutputBuffer.position()); 151 return true; 152 } 153 } catch (IOException ex) { 154 mLogger.logError("Send message failed: " + ex); 155 return false; 156 } 157 } 158 159 /** 160 * Starts reading messages on a separate thread. 161 */ startReading()162 public void startReading() { 163 synchronized (mLock) { 164 if (mOutputBuffer == null) { 165 throw new IllegalStateException("Transport has been closed"); 166 } 167 168 mThread = new ReaderThread(); 169 mThread.start(); 170 } 171 } 172 173 /** 174 * Registers a service and provides a callback to receive messages. 175 * 176 * @param service The service id. 177 * @param callback The callback to use. 178 */ registerService(int service, Callback callback)179 public void registerService(int service, Callback callback) { 180 checkServiceId(service); 181 if (callback == null) { 182 throw new IllegalArgumentException("callback must not be null"); 183 } 184 185 synchronized (mLock) { 186 mServices.put(service, callback); 187 } 188 } 189 190 /** 191 * Unregisters a service. 192 * 193 * @param service The service to unregister. 194 */ unregisterService(int service)195 public void unregisterService(int service) { 196 checkServiceId(service); 197 198 synchronized (mLock) { 199 mServices.remove(service); 200 } 201 } 202 dispatchMessageReceived(int service, int what, ByteBuffer content)203 private void dispatchMessageReceived(int service, int what, ByteBuffer content) { 204 final Callback callback; 205 synchronized (mLock) { 206 callback = mServices.get(service); 207 } 208 if (callback != null) { 209 callback.onMessageReceived(service, what, content); 210 } else { 211 mLogger.log("Discarding message " + what 212 + " for unregistered service " + service); 213 } 214 } 215 checkServiceId(int service)216 private static void checkServiceId(int service) { 217 if (service < 0 || service > 0xffff) { 218 throw new IllegalArgumentException("service id out of range: " + service); 219 } 220 } 221 checkMessageId(int what)222 private static void checkMessageId(int what) { 223 if (what < 0 || what > 0xffff) { 224 throw new IllegalArgumentException("message id out of range: " + what); 225 } 226 } 227 228 // The IO methods must be safe to call on any thread. 229 // They may be called concurrently. ioClose()230 protected abstract void ioClose(); ioRead(byte[] buffer, int offset, int count)231 protected abstract int ioRead(byte[] buffer, int offset, int count) 232 throws IOException; ioWrite(byte[] buffer, int offset, int count)233 protected abstract void ioWrite(byte[] buffer, int offset, int count) 234 throws IOException; 235 236 /** 237 * Callback for services that handle received messages. 238 */ 239 public interface Callback { 240 /** 241 * Indicates that a message was received. 242 * 243 * @param service The service to whom the message is addressed. 244 * @param what The message type. 245 * @param content The content, or null if there is none. 246 */ onMessageReceived(int service, int what, ByteBuffer content)247 public void onMessageReceived(int service, int what, ByteBuffer content); 248 } 249 250 final class TransportHandler extends Handler { 251 @Override handleMessage(Message msg)252 public void handleMessage(Message msg) { 253 final ByteBuffer buffer = (ByteBuffer)msg.obj; 254 try { 255 final int limit = buffer.limit(); 256 while (buffer.position() < limit) { 257 final int service = buffer.getShort() & 0xffff; 258 final int what = buffer.getShort() & 0xffff; 259 final int contentSize = buffer.getInt(); 260 if (contentSize == 0) { 261 dispatchMessageReceived(service, what, null); 262 } else { 263 final int end = buffer.position() + contentSize; 264 buffer.limit(end); 265 dispatchMessageReceived(service, what, buffer); 266 buffer.limit(limit); 267 buffer.position(end); 268 } 269 } 270 } finally { 271 mInputBufferPool.release(buffer); 272 } 273 } 274 } 275 276 final class ReaderThread extends Thread { 277 // Set to true when quitting. 278 private volatile boolean mQuitting; 279 ReaderThread()280 public ReaderThread() { 281 super("Accessory Display Transport"); 282 } 283 284 @Override run()285 public void run() { 286 loop(); 287 ioClose(); 288 } 289 loop()290 private void loop() { 291 ByteBuffer buffer = null; 292 int length = Protocol.HEADER_SIZE; 293 int contentSize = -1; 294 outer: while (!mQuitting) { 295 // Get a buffer. 296 if (buffer == null) { 297 buffer = mInputBufferPool.acquire(length); 298 } else { 299 buffer = mInputBufferPool.grow(buffer, length); 300 } 301 302 // Read more data until needed number of bytes obtained. 303 int position = buffer.position(); 304 int count; 305 try { 306 count = ioRead(buffer.array(), position, buffer.capacity() - position); 307 if (count < 0) { 308 break; // end of stream 309 } 310 } catch (IOException ex) { 311 mLogger.logError("Read failed: " + ex); 312 break; // error 313 } 314 position += count; 315 buffer.position(position); 316 if (contentSize < 0 && position >= Protocol.HEADER_SIZE) { 317 contentSize = buffer.getInt(4); 318 if (contentSize < 0 || contentSize > Protocol.MAX_CONTENT_SIZE) { 319 mLogger.logError("Encountered invalid content size: " + contentSize); 320 break; // malformed stream 321 } 322 length += contentSize; 323 } 324 if (position < length) { 325 continue; // need more data 326 } 327 328 // There is at least one complete message in the buffer. 329 // Find the end of a contiguous chunk of complete messages. 330 int next = length; 331 int remaining; 332 for (;;) { 333 length = Protocol.HEADER_SIZE; 334 remaining = position - next; 335 if (remaining < length) { 336 contentSize = -1; 337 break; // incomplete header, need more data 338 } 339 contentSize = buffer.getInt(next + 4); 340 if (contentSize < 0 || contentSize > Protocol.MAX_CONTENT_SIZE) { 341 mLogger.logError("Encountered invalid content size: " + contentSize); 342 break outer; // malformed stream 343 } 344 length += contentSize; 345 if (remaining < length) { 346 break; // incomplete content, need more data 347 } 348 next += length; 349 } 350 351 // Post the buffer then don't modify it anymore. 352 // Now this is kind of sneaky. We know that no other threads will 353 // be acquiring buffers from the buffer pool so we can keep on 354 // referring to this buffer as long as we don't modify its contents. 355 // This allows us to operate in a single-buffered mode if desired. 356 buffer.limit(next); 357 buffer.rewind(); 358 mHandler.obtainMessage(0, buffer).sendToTarget(); 359 360 // If there is an incomplete message at the end, then we will need 361 // to copy it to a fresh buffer before continuing. In the single-buffered 362 // case, we may acquire the same buffer as before which is fine. 363 if (remaining == 0) { 364 buffer = null; 365 } else { 366 final ByteBuffer oldBuffer = buffer; 367 buffer = mInputBufferPool.acquire(length); 368 System.arraycopy(oldBuffer.array(), next, buffer.array(), 0, remaining); 369 buffer.position(remaining); 370 } 371 } 372 373 if (buffer != null) { 374 mInputBufferPool.release(buffer); 375 } 376 } 377 quit()378 public void quit() { 379 mQuitting = true; 380 } 381 } 382 } 383