/* * Copyright (C) 2014 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.android.camera.processing.imagebackend; import android.os.Process; import com.android.camera.async.AndroidPriorityThread; import com.android.camera.debug.Log; import com.android.camera.processing.ProcessingTaskConsumer; import com.android.camera.processing.memory.ByteBufferDirectPool; import com.android.camera.processing.memory.LruResourcePool; import com.android.camera.session.CaptureSession; import com.android.camera.util.Size; import com.google.common.base.Optional; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * This ImageBackend is created for the purpose of creating a task-running * infrastructure that has two-level of priority and doing the book-keeping to * keep track of tasks that use Android Images. Android.media.images are * critical system resources that MUST be properly managed in order to maintain * camera application performance. Android.media.images are merely Java handles * to regions of physically contiguous memory used by the camera hardware as a * destination for imaging data. In general, this physically contiguous memory * is not counted as an application resource, but as a system resources held by * the application and does NOT count against the limits of application memory. * The performance pressures of both computing and memory resources must often * be prioritized in releasing Android.media.images in a timely manner. In order * to properly balance these concerns, most image processing requested should be * routed through this object. This object is also responsible for releasing * Android.media image as soon as possible, so as not to stall the camera * hardware subsystem. Image that reserve these images are a subclass of the * basic Java Runnable with a few conditions placed upon their run() * implementation: *
    *
  1. The task will try to release the image as early as possible by calling * the releaseSemaphoreReference as soon as a reference to the original image is * no longer required.
  2. *
  3. A set of tasks that require ImageData must only happen on the first * receiveImage call. receiveImage must only be called once per image.
  4. *
  5. However, the submitted tasks may spawn new tasks via the appendTask with * any image that have had a task submitted, but NOT released via * releaseSemaphoreReference.
  6. *
  7. Computation that is dependent on multiple images should be written into * this task framework in a distributed manner where image task can be computed * independently and join their results to a common shared object.This style of * implementation allows for the earliest release of Android Images while * honoring the resources priorities set by this class. See the Lucky shot * implementation for a concrete example for this shared object and its * respective task {@link TaskLuckyShotSession} {@link LuckyShotSession}
  8. *
* To integrate with the ProcessingServiceManager, ImageBackend also signals to * the ProcessingServiceManager its processing state by enqueuing * ImageShadowTasks on each ImageBackend::receiveImage call. These ImageShadow * tasks have no implementation, but emulate the processing delay by blocking * until all tasks submitted and spawned by a particular receiveImage call have * completed their processing. This emulated functionality ensures that other * ProcessingTasks associated with Lens Blur and Panorama are not processing * while the ImageBackend is running. Unfairly, the ImageBackend proceeds with * its own processing regardless of the state of ImageShadowTask. * ImageShadowTasks that are associated with ImageBackend tasks that have * already been completed should return immediately on its process call. */ public class ImageBackend implements ImageConsumer, ImageTaskManager { private static final Log.Tag TAG = new Log.Tag("ImageBackend"); protected static final int NUM_THREADS_FAST = 2; protected static final int NUM_THREADS_AVERAGE = 2; protected static final int NUM_THREADS_SLOW = 2; private static final int FAST_THREAD_PRIORITY = Process.THREAD_PRIORITY_DISPLAY; private static final int AVERAGE_THREAD_PRIORITY = Process.THREAD_PRIORITY_DEFAULT + Process.THREAD_PRIORITY_LESS_FAVORABLE; private static final int SLOW_THREAD_PRIORITY = Process.THREAD_PRIORITY_BACKGROUND + Process.THREAD_PRIORITY_MORE_FAVORABLE; private static final int IMAGE_BACKEND_HARD_REF_POOL_SIZE = 2; protected final ProcessingTaskConsumer mProcessingTaskConsumer; /** * Map for TaskImageContainer and the release of ImageProxy Book-keeping */ protected final Map mImageSemaphoreMap; /** * Map for ImageShadowTask and release of blocking on * ImageShadowTask::process */ protected final Map mShadowTaskMap; // The available threadpools for scheduling protected final ExecutorService mThreadPoolFast; protected final ExecutorService mThreadPoolAverage; protected final ExecutorService mThreadPoolSlow; private final LruResourcePool mByteBufferDirectPool; /** * Approximate viewable size (in pixels) for the fast thumbnail in the * current UX definition of the product. Note that these values will be the * minimum size of FAST_THUMBNAIL target for the CONVERT_TO_RGB_PREVIEW * task. */ private final Size mTinyThumbnailTargetSize; /** * A standard viewable size (in pixels) for the filmstrip thumbnail in the * current UX definition of the product. Note that this size is the minimum * size for the Preview on the filmstrip associated with * COMPRESS_TO_JPEG_AND_WRITE_TO_DISK task. */ private final static Size FILMSTRIP_THUMBNAIL_TARGET_SIZE = new Size(512, 384); // Some invariants to know that we're keeping track of everything // that reflect the state of mImageSemaphoreMap private int mOutstandingImageRefs = 0; private int mOutstandingImageOpened = 0; private int mOutstandingImageClosed = 0; // Objects that may be registered to this objects events. private ImageProcessorProxyListener mProxyListener = null; // Default constructor, values are conservatively targeted to the Nexus 6 public ImageBackend(ProcessingTaskConsumer processingTaskConsumer, int tinyThumbnailSize) { mThreadPoolFast = Executors.newFixedThreadPool(NUM_THREADS_FAST, new FastThreadFactory()); mThreadPoolAverage = Executors.newFixedThreadPool(NUM_THREADS_AVERAGE, new AverageThreadFactory()); mThreadPoolSlow = Executors.newFixedThreadPool(NUM_THREADS_SLOW, new SlowThreadFactory()); mByteBufferDirectPool = new ByteBufferDirectPool(IMAGE_BACKEND_HARD_REF_POOL_SIZE); mProxyListener = new ImageProcessorProxyListener(); mImageSemaphoreMap = new HashMap<>(); mShadowTaskMap = new HashMap<>(); mProcessingTaskConsumer = processingTaskConsumer; mTinyThumbnailTargetSize = new Size(tinyThumbnailSize, tinyThumbnailSize); } /** * Direct Injection Constructor for Testing purposes. * * @param fastService Service where Tasks of FAST Priority are placed. * @param averageService Service where Tasks of AVERAGE Priority are placed. * @param slowService Service where Tasks of SLOW Priority are placed. * @param imageProcessorProxyListener iamge proxy listener to be used */ public ImageBackend(ExecutorService fastService, ExecutorService averageService, ExecutorService slowService, LruResourcePool byteBufferDirectPool, ImageProcessorProxyListener imageProcessorProxyListener, ProcessingTaskConsumer processingTaskConsumer, int tinyThumbnailSize) { mThreadPoolFast = fastService; mThreadPoolAverage = averageService; mThreadPoolSlow = slowService; mByteBufferDirectPool = byteBufferDirectPool; mProxyListener = imageProcessorProxyListener; mImageSemaphoreMap = new HashMap<>(); mShadowTaskMap = new HashMap<>(); mProcessingTaskConsumer = processingTaskConsumer; mTinyThumbnailTargetSize = new Size(tinyThumbnailSize, tinyThumbnailSize); } /** * Simple getter for the associated listener object associated with this * instantiation that handles registration of events listeners. * * @return listener proxy that handles events messaging for this object. */ public ImageProcessorProxyListener getProxyListener() { return mProxyListener; } /** * Wrapper function for all log messages created by this object. Default * implementation is to send messages to the Android logger. For test * purposes, this method can be overridden to avoid "Stub!" Runtime * exceptions in Unit Tests. */ public void logWrapper(String message) { Log.v(TAG, message); } /** * @return Number of Image references currently held by this instance */ @Override public int getNumberOfReservedOpenImages() { synchronized (mImageSemaphoreMap) { // since mOutstandingImageOpened, mOutstandingImageClosed reflect // the historical state of mImageSemaphoreMap, we need to lock on // before we return a value. return mOutstandingImageOpened - mOutstandingImageClosed; } } /** * Returns of the number of receiveImage calls that are currently enqueued * and/or being processed. * * @return The number of receiveImage calls that are currently enqueued * and/or being processed */ @Override public int getNumberOfOutstandingCalls() { synchronized (mShadowTaskMap) { return mShadowTaskMap.size(); } } /** * Signals the ImageBackend that a tasks has released a reference to the * image. Imagebackend determines whether all references have been released * and applies its specified release protocol of closing image and/or * unblocking the caller. Should ONLY be called by the tasks running on this * class. * * @param img the image to be released by the task. * @param executor the executor on which the image close is run. if null, * image close is run by the calling thread (usually the main * task thread). */ @Override public void releaseSemaphoreReference(final ImageToProcess img, Executor executor) { synchronized (mImageSemaphoreMap) { ImageReleaseProtocol protocol = mImageSemaphoreMap.get(img); if (protocol == null || protocol.getCount() <= 0) { // That means task implementation has allowed an unbalanced // semaphore release. throw new RuntimeException( "ERROR: Task implementation did NOT balance its release."); } // Normal operation from here. protocol.addCount(-1); mOutstandingImageRefs--; logWrapper("Ref release. Total refs = " + mOutstandingImageRefs); if (protocol.getCount() == 0) { // Image is ready to be released // Remove the image from the map so that it may be submitted // again. mImageSemaphoreMap.remove(img); // Conditionally close the image, specified by initial // receiveImage call if (protocol.closeOnRelease) { closeImageExecutorSafe(img, executor); logWrapper("Ref release close."); } // Conditionally signal the blocking thread to go. if (protocol.blockUntilRelease) { protocol.signal(); } } else { // Image is still being held by other tasks. // Otherwise, update the semaphore mImageSemaphoreMap.put(img, protocol); } } } /** * Spawns dependent tasks from internal implementation of a set of tasks. If * a dependent task does NOT require the image reference, it should be * passed a null pointer as an image reference. In general, this method * should be called after the task has completed its own computations, but * before it has released its own image reference (via the * releaseSemaphoreReference call). * * @param tasks The set of tasks to be run * @return whether tasks are successfully submitted. */ @Override public boolean appendTasks(ImageToProcess img, Set tasks) { // Make sure that referred images are all the same, if it exists. // And count how image references need to be kept track of. int countImageRefs = numPropagatedImageReferences(img, tasks); if (img != null) { // If you're still holding onto the reference, make sure you keep // count incrementSemaphoreReferenceCount(img, countImageRefs); } // Update the done count on the new tasks. incrementTaskDone(tasks); scheduleTasks(tasks); return true; } /** * Spawns a single dependent task from internal implementation of a task. * * @param task The task to be run * @return whether tasks are successfully submitted. */ @Override public boolean appendTasks(ImageToProcess img, TaskImageContainer task) { Set tasks = new HashSet(1); tasks.add(task); return appendTasks(img, tasks); } /** * Implements that top-level image single task submission that is defined by * the ImageConsumer interface w/o Runnable to executed. * * @param img Image required by the task * @param task Task to be run * @param blockUntilImageRelease If true, call blocks until the object img * is no longer referred by any task. If false, call is * non-blocking * @param closeOnImageRelease If true, images is closed when the object img * is is no longer referred by any task. If false, After an image * is submitted, it should never be submitted again to the * interface until all tasks and their spawned tasks are * finished. * @return whether jobs were enqueued to the ImageBackend. */ @Override public boolean receiveImage(ImageToProcess img, TaskImageContainer task, boolean blockUntilImageRelease, boolean closeOnImageRelease) throws InterruptedException { return receiveImage(img, task, blockUntilImageRelease, closeOnImageRelease, Optional. absent()); } /** * Implements that top-level image single task submission that is defined by * the ImageConsumer interface. * * @param img Image required by the task * @param task Task to be run * @param blockUntilImageRelease If true, call blocks until the object img * is no longer referred by any task. If false, call is * non-blocking * @param closeOnImageRelease If true, images is closed when the object img * is is no longer referred by any task. If false, After an image * is submitted, it should never be submitted again to the * interface until all tasks and their spawned tasks are * finished. * @param runnableWhenDone Optional runnable to be executed when the set of * tasks are done. * @return whether jobs were enqueued to the ImageBackend. */ @Override public boolean receiveImage(ImageToProcess img, TaskImageContainer task, boolean blockUntilImageRelease, boolean closeOnImageRelease, Optional runnableWhenDone) throws InterruptedException { Set passTasks = new HashSet(1); passTasks.add(task); return receiveImage(img, passTasks, blockUntilImageRelease, closeOnImageRelease, runnableWhenDone); } /** * Returns an informational string about the current status of ImageBackend, * along with an approximate number of references being held. * * @return an informational string suitable to be dumped into logcat */ @Override public String toString() { return "ImageBackend Status BEGIN:\n" + "Shadow Image Map Size = " + mShadowTaskMap.size() + "\n" + "Image Semaphore Map Size = " + mImageSemaphoreMap.size() + "\n" + "OutstandingImageRefs = " + mOutstandingImageRefs + "\n" + "Proxy Listener Map Size = " + mProxyListener.getMapSize() + "\n" + "Proxy Listener = " + mProxyListener.getNumRegisteredListeners() + "\n" + "ImageBackend Status END:\n"; } /** * Implements that top-level image single task submission that is defined by * the ImageConsumer interface. * * @param img Image required by the task * @param tasks A set of Tasks to be run * @param blockUntilImageRelease If true, call blocks until the object img * is no longer referred by any task. If false, call is * non-blocking * @param closeOnImageRelease If true, images is closed when the object img * is is no longer referred by any task. If false, After an image * is submitted, it should never be submitted again to the * interface until all tasks and their spawned tasks are * finished. * @param runnableWhenDone Optional runnable to be executed when the set of * tasks are done. * @return whether receiveImage succeeded. Generally, only happens when the * image reference is null or the task set is empty. * @throws InterruptedException occurs when call is set to be blocking and * is interrupted. */ @Override public boolean receiveImage(ImageToProcess img, Set tasks, boolean blockUntilImageRelease, boolean closeOnImageRelease, Optional runnableWhenDone) throws InterruptedException { // Short circuit if no tasks submitted. if (tasks == null || tasks.size() <= 0) { return false; } if (img == null) { // TODO: Determine whether you need to be so strict at the top level throw new RuntimeException("ERROR: Initial call must reference valid Image!"); } // Make sure that referred images are all the same, if it exists. // And count how image references need to be kept track of. int countImageRefs = numPropagatedImageReferences(img, tasks); // Initialize the counters for process-level tasks initializeTaskDone(tasks, runnableWhenDone); // Set the semaphore, given that the number of tasks that need to be // scheduled // and the boolean flags for imaging closing and thread blocking ImageReleaseProtocol protocol = setSemaphoreReferenceCount(img, countImageRefs, blockUntilImageRelease, closeOnImageRelease); // Put the tasks on their respective queues. scheduleTasks(tasks); // Implement blocking if required if (protocol.blockUntilRelease) { protocol.block(); } return true; } /** * Implements that top-level image task submission short-cut that is defined * by the ImageConsumer interface. * * @param img Image required by the task * @param executor Executor to run events and image closes, in case of * control leakage * @param processingFlags Magical bit vector that specifies jobs to be run * After an image is submitted, it should never be submitted * again to the interface until all tasks and their spawned tasks * are finished. * @param imageProcessorListener Optional listener to automatically register * at the job task and unregister after all tasks are done * @return whether receiveImage succeeded. Generally, only happens when the * image reference is null or the task set is empty. * @throws InterruptedException occurs when call is set to be blocking and * is interrupted. */ @Override public boolean receiveImage(ImageToProcess img, Executor executor, Set processingFlags, CaptureSession session, Optional imageProcessorListener) throws InterruptedException { // Uncomment for occasional debugging // Log.v(TAG, toString()); Set tasksToExecute = new HashSet(); if (img == null) { // No data to process, just pure message. return true; } // Now add the pre-mixed versions of the tasks. if (processingFlags.contains(ImageTaskFlags.COMPRESS_TO_JPEG_AND_WRITE_TO_DISK)) { if (processingFlags.contains(ImageTaskFlags.CREATE_EARLY_FILMSTRIP_PREVIEW)) { // Request job that creates both filmstrip thumbnail from YUV, // JPEG compression of the YUV Image, and writes the result to // disk tasksToExecute.add(new TaskPreviewChainedJpeg(img, executor, this, session, FILMSTRIP_THUMBNAIL_TARGET_SIZE, mByteBufferDirectPool)); } else { // Request job that only does JPEG compression and writes the // result to disk tasksToExecute.add(new TaskCompressImageToJpeg(img, executor, this, session, mByteBufferDirectPool)); } } if (processingFlags.contains(ImageTaskFlags.CONVERT_TO_RGB_PREVIEW)) { // Add an additional type of task to the appropriate queue. tasksToExecute.add(new TaskConvertImageToRGBPreview(img, executor, this, TaskImageContainer.ProcessingPriority.FAST, session, mTinyThumbnailTargetSize, TaskConvertImageToRGBPreview.ThumbnailShape.SQUARE_ASPECT_CIRCULAR_INSET)); } // Wrap the listener in a runnable that will be fired when all tasks are // complete. final Optional runnableOptional; if (imageProcessorListener.isPresent()) { final ImageProcessorListener finalImageProcessorListener = imageProcessorListener.get(); Runnable unregisterRunnable = new Runnable() { @Override public void run() { getProxyListener().unregisterListener(finalImageProcessorListener); } }; runnableOptional = Optional.of(unregisterRunnable); } else { runnableOptional = Optional. absent(); } if (receiveImage(img, tasksToExecute, processingFlags.contains(ImageTaskFlags.BLOCK_UNTIL_ALL_TASKS_RELEASE), processingFlags.contains(ImageTaskFlags.CLOSE_ON_ALL_TASKS_RELEASE), runnableOptional)) { if (imageProcessorListener.isPresent()) { getProxyListener().registerListener(imageProcessorListener.get(), img.proxy); } return true; } else { return false; } } /** * Factory functions, in case, you want some shake and bake functionality. */ public TaskConvertImageToRGBPreview createTaskConvertImageToRGBPreview( ImageToProcess image, Executor executor, ImageBackend imageBackend, CaptureSession session, Size targetSize, TaskConvertImageToRGBPreview.ThumbnailShape thumbnailShape) { return new TaskConvertImageToRGBPreview(image, executor, imageBackend, TaskImageContainer.ProcessingPriority.FAST, session, mTinyThumbnailTargetSize, thumbnailShape); } public TaskCompressImageToJpeg createTaskCompressImageToJpeg(ImageToProcess image, Executor executor, ImageBackend imageBackend, CaptureSession session) { return new TaskCompressImageToJpeg(image, executor, imageBackend, session, mByteBufferDirectPool); } /** * Blocks and waits for all tasks to complete. */ @Override public void shutdown() { mThreadPoolSlow.shutdown(); mThreadPoolFast.shutdown(); } /** * For a given set of starting tasks, initialize the associated sessions * with a proper blocking semaphore and value of number of tasks to be run. * For each semaphore, a ImageShadowTask will be instantiated and enqueued * onto the selected ProcessingSerivceManager. * * @param tasks The set of ImageContainer tasks to be run on ImageBackend */ protected void initializeTaskDone(Set tasks, Optional runnableWhenDone) { Set sessionSet = new HashSet<>(); Map sessionTaskCount = new HashMap<>(); // Create a set w/ no session duplicates and count them for (TaskImageContainer task : tasks) { sessionSet.add(task.mSession); Integer currentCount = sessionTaskCount.get(task.mSession); if (currentCount == null) { sessionTaskCount.put(task.mSession, 1); } else { sessionTaskCount.put(task.mSession, currentCount + 1); } } // Create a new blocking semaphore for each set of tasks on a given // session. synchronized (mShadowTaskMap) { for (CaptureSession captureSession : sessionSet) { BlockSignalProtocol protocol = new BlockSignalProtocol(); protocol.setCount(sessionTaskCount.get(captureSession)); final ImageShadowTask shadowTask; shadowTask = new ImageShadowTask(protocol, captureSession, runnableWhenDone); mShadowTaskMap.put(captureSession, shadowTask); mProcessingTaskConsumer.enqueueTask(shadowTask); } } } /** * For ImageBackend tasks that spawn their own tasks, increase the semaphore * count to take into account the new tasks being spawned. * * @param tasks The set of tasks to be spawned. */ protected void incrementTaskDone(Set tasks) throws RuntimeException { // TODO: Add invariant test so that all sessions are the same. synchronized (mShadowTaskMap) { for (TaskImageContainer task : tasks) { ImageShadowTask shadowTask = mShadowTaskMap.get(task.mSession); if (shadowTask == null) { throw new RuntimeException( "Session NOT previously registered." + " ImageShadowTask booking-keeping is incorrect."); } shadowTask.getProtocol().addCount(1); } } } /** * Decrement the semaphore count of the ImageShadowTask. Should be called * when a task completes its processing in ImageBackend. * * @param imageShadowTask The ImageShadow task that contains the blocking * semaphore. * @return whether all the tasks associated with an ImageShadowTask are done */ protected boolean decrementTaskDone(ImageShadowTask imageShadowTask) { synchronized (mShadowTaskMap) { int remainingTasks = imageShadowTask.getProtocol().addCount(-1); if (remainingTasks == 0) { mShadowTaskMap.remove(imageShadowTask.getSession()); imageShadowTask.getProtocol().signal(); return true; } else { return false; } } } /** * Puts the tasks on the specified queue. May be more complicated in the * future. * * @param tasks The set of tasks to be run */ protected void scheduleTasks(Set tasks) { synchronized (mShadowTaskMap) { for (TaskImageContainer task : tasks) { ImageShadowTask shadowTask = mShadowTaskMap.get(task.mSession); if (shadowTask == null) { throw new IllegalStateException("Scheduling a task with a unknown session."); } // Before scheduling, wrap TaskImageContainer inside of the // TaskDoneWrapper to add // instrumentation for managing ImageShadowTasks switch (task.getProcessingPriority()) { case FAST: mThreadPoolFast.execute(new TaskDoneWrapper(this, shadowTask, task)); break; case AVERAGE: mThreadPoolAverage.execute(new TaskDoneWrapper(this, shadowTask, task)); break; case SLOW: mThreadPoolSlow.execute(new TaskDoneWrapper(this, shadowTask, task)); break; default: mThreadPoolSlow.execute(new TaskDoneWrapper(this, shadowTask, task)); break; } } } } /** * Initializes the semaphore count for the image * * @return The protocol object that keeps tracks of the image reference * count and actions to be taken on release. */ protected ImageReleaseProtocol setSemaphoreReferenceCount(ImageToProcess img, int count, boolean blockUntilRelease, boolean closeOnRelease) throws RuntimeException { synchronized (mImageSemaphoreMap) { if (mImageSemaphoreMap.get(img) != null) { throw new RuntimeException( "ERROR: Rewriting of Semaphore Lock." + " Image references may not freed properly"); } // Create the new booking-keeping object. ImageReleaseProtocol protocol = new ImageReleaseProtocol(blockUntilRelease, closeOnRelease); protocol.setCount(count); mImageSemaphoreMap.put(img, protocol); mOutstandingImageRefs += count; mOutstandingImageOpened++; logWrapper("Received an opened image: " + mOutstandingImageOpened + "/" + mOutstandingImageClosed); logWrapper("Setting an image reference count of " + count + " Total refs = " + mOutstandingImageRefs); return protocol; } } /** * Increments the semaphore count for the image. Should ONLY be internally * via appendTasks by internal tasks. Otherwise, image references could get * out of whack. * * @param img The Image associated with the set of tasks running on it. * @param count The number of tasks to be added * @throws RuntimeException Indicates image Closing Bookkeeping is screwed * up. */ protected void incrementSemaphoreReferenceCount(ImageToProcess img, int count) throws RuntimeException { synchronized (mImageSemaphoreMap) { ImageReleaseProtocol protocol = mImageSemaphoreMap.get(img); if (mImageSemaphoreMap.get(img) == null) { throw new RuntimeException( "Image Reference has already been released or has never been held."); } protocol.addCount(count); mImageSemaphoreMap.put(img, protocol); mOutstandingImageRefs += count; } } /** * Close an Image with a executor if it's available and does the proper * booking keeping on the object. * * @param img Image to be closed * @param executor Executor to be used, if executor is null, the close is * run on the task thread */ private void closeImageExecutorSafe(final ImageToProcess img, Executor executor) { Runnable closeTask = new Runnable() { @Override public void run() { img.proxy.close(); mOutstandingImageClosed++; logWrapper("Release of image occurred. Good fun. " + "Total Images Open/Closed = " + mOutstandingImageOpened + "/" + mOutstandingImageClosed); } }; if (executor == null) { // Just run it on the main thread. closeTask.run(); } else { executor.execute(closeTask); } } /** * Calculates the number of new Image references in a set of dependent * tasks. Checks to make sure no new image references are being introduced. * * @param tasks The set of dependent tasks to be run */ private int numPropagatedImageReferences(ImageToProcess img, Set tasks) throws RuntimeException { int countImageRefs = 0; for (TaskImageContainer task : tasks) { if (task.mImage != null && task.mImage != img) { throw new RuntimeException("ERROR: Spawned tasks cannot reference new images!"); } if (task.mImage != null) { countImageRefs++; } } return countImageRefs; } /** * Simple wrapper task to instrument when tasks ends so that ImageBackend * can fire events when set of tasks created by a ReceiveImage call have all * completed. */ private class TaskDoneWrapper implements Runnable { private final ImageBackend mImageBackend; private final ImageShadowTask mImageShadowTask; private final TaskImageContainer mWrappedTask; /** * Constructor * * @param imageBackend ImageBackend that the task is running on * @param imageShadowTask ImageShadowTask that is blocking on the * completion of the task * @param wrappedTask The task to be run w/o instrumentation */ public TaskDoneWrapper(ImageBackend imageBackend, ImageShadowTask imageShadowTask, TaskImageContainer wrappedTask) { mImageBackend = imageBackend; mImageShadowTask = imageShadowTask; mWrappedTask = wrappedTask; } /** * Adds instrumentation that runs when a TaskImageContainer completes. */ @Override public void run() { mWrappedTask.run(); // Decrement count if (mImageBackend.decrementTaskDone(mImageShadowTask)) { // If you're the last one... Runnable doneRunnable = mImageShadowTask.getRunnableWhenDone(); if (doneRunnable != null) { if (mWrappedTask.mExecutor == null) { doneRunnable.run(); } else { mWrappedTask.mExecutor.execute(doneRunnable); } } } } } /** * Encapsulates all synchronization for semaphore signaling and blocking. */ static public class BlockSignalProtocol { private int count; private final ReentrantLock mLock = new ReentrantLock(); private Condition mSignal; public void setCount(int value) { mLock.lock(); count = value; mLock.unlock(); } public int getCount() { int value; mLock.lock(); value = count; mLock.unlock(); return value; } public int addCount(int value) { mLock.lock(); try { count += value; return count; } finally { mLock.unlock(); } } BlockSignalProtocol() { count = 0; mSignal = mLock.newCondition(); } public void block() throws InterruptedException { mLock.lock(); try { while (count != 0) { // Spin to deal with spurious signals. mSignal.await(); } } catch (InterruptedException e) { // TODO: on interruption, figure out what to do. throw (e); } finally { mLock.unlock(); } } public void signal() { mLock.lock(); mSignal.signal(); mLock.unlock(); } } /** * A simple tuple class to keep track of image reference, and whether to * block and/or close on final image release. Instantiated on every task * submission call. */ static public class ImageReleaseProtocol extends BlockSignalProtocol { public final boolean blockUntilRelease; public final boolean closeOnRelease; ImageReleaseProtocol(boolean block, boolean close) { super(); blockUntilRelease = block; closeOnRelease = close; } } // Thread factories for a default constructor private class FastThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable r) { Thread t = new AndroidPriorityThread(FAST_THREAD_PRIORITY, r); return t; } } private class AverageThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable r) { Thread t = new AndroidPriorityThread(AVERAGE_THREAD_PRIORITY, r); return t; } } private class SlowThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable r) { Thread t = new AndroidPriorityThread(SLOW_THREAD_PRIORITY, r); return t; } } }