1 /*
2  * Copyright (C) 2014 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.camera.processing.imagebackend;
18 
19 import android.os.Process;
20 
21 import com.android.camera.async.AndroidPriorityThread;
22 import com.android.camera.debug.Log;
23 import com.android.camera.processing.ProcessingTaskConsumer;
24 import com.android.camera.processing.memory.ByteBufferDirectPool;
25 import com.android.camera.processing.memory.LruResourcePool;
26 import com.android.camera.session.CaptureSession;
27 import com.android.camera.util.Size;
28 import com.google.common.base.Optional;
29 
30 import java.nio.ByteBuffer;
31 import java.util.HashMap;
32 import java.util.HashSet;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.Executor;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.ThreadFactory;
39 import java.util.concurrent.locks.Condition;
40 import java.util.concurrent.locks.ReentrantLock;
41 
42 /**
43  * This ImageBackend is created for the purpose of creating a task-running
44  * infrastructure that has two-level of priority and doing the book-keeping to
45  * keep track of tasks that use Android Images. Android.media.images are
46  * critical system resources that MUST be properly managed in order to maintain
47  * camera application performance. Android.media.images are merely Java handles
48  * to regions of physically contiguous memory used by the camera hardware as a
49  * destination for imaging data. In general, this physically contiguous memory
50  * is not counted as an application resource, but as a system resources held by
51  * the application and does NOT count against the limits of application memory.
52  * The performance pressures of both computing and memory resources must often
53  * be prioritized in releasing Android.media.images in a timely manner. In order
54  * to properly balance these concerns, most image processing requested should be
55  * routed through this object. This object is also responsible for releasing
56  * Android.media image as soon as possible, so as not to stall the camera
57  * hardware subsystem. Image that reserve these images are a subclass of the
58  * basic Java Runnable with a few conditions placed upon their run()
59  * implementation:
60  * <ol>
61  * <li>The task will try to release the image as early as possible by calling
62  * the releaseSemaphoreReference as soon as a reference to the original image is
63  * no longer required.</li>
64  * <li>A set of tasks that require ImageData must only happen on the first
65  * receiveImage call. receiveImage must only be called once per image.</li>
66  * <li>However, the submitted tasks may spawn new tasks via the appendTask with
67  * any image that have had a task submitted, but NOT released via
68  * releaseSemaphoreReference.</li>
69  * <li>Computation that is dependent on multiple images should be written into
70  * this task framework in a distributed manner where image task can be computed
71  * independently and join their results to a common shared object.This style of
72  * implementation allows for the earliest release of Android Images while
73  * honoring the resources priorities set by this class. See the Lucky shot
74  * implementation for a concrete example for this shared object and its
75  * respective task {@link TaskLuckyShotSession} {@link LuckyShotSession}</li>
76  * </ol>
77  * To integrate with the ProcessingServiceManager, ImageBackend also signals to
78  * the ProcessingServiceManager its processing state by enqueuing
79  * ImageShadowTasks on each ImageBackend::receiveImage call. These ImageShadow
80  * tasks have no implementation, but emulate the processing delay by blocking
81  * until all tasks submitted and spawned by a particular receiveImage call have
82  * completed their processing. This emulated functionality ensures that other
83  * ProcessingTasks associated with Lens Blur and Panorama are not processing
84  * while the ImageBackend is running. Unfairly, the ImageBackend proceeds with
85  * its own processing regardless of the state of ImageShadowTask.
86  * ImageShadowTasks that are associated with ImageBackend tasks that have
87  * already been completed should return immediately on its process call.
88  */
89 public class ImageBackend implements ImageConsumer, ImageTaskManager {
90     private static final Log.Tag TAG = new Log.Tag("ImageBackend");
91 
92     protected static final int NUM_THREADS_FAST = 2;
93     protected static final int NUM_THREADS_AVERAGE = 2;
94     protected static final int NUM_THREADS_SLOW = 2;
95 
96     private static final int FAST_THREAD_PRIORITY = Process.THREAD_PRIORITY_DISPLAY;
97     private static final int AVERAGE_THREAD_PRIORITY = Process.THREAD_PRIORITY_DEFAULT
98             + Process.THREAD_PRIORITY_LESS_FAVORABLE;
99     private static final int SLOW_THREAD_PRIORITY = Process.THREAD_PRIORITY_BACKGROUND
100             + Process.THREAD_PRIORITY_MORE_FAVORABLE;
101 
102     private static final int IMAGE_BACKEND_HARD_REF_POOL_SIZE = 2;
103 
104     protected final ProcessingTaskConsumer mProcessingTaskConsumer;
105 
106     /**
107      * Map for TaskImageContainer and the release of ImageProxy Book-keeping
108      */
109     protected final Map<ImageToProcess, ImageReleaseProtocol> mImageSemaphoreMap;
110     /**
111      * Map for ImageShadowTask and release of blocking on
112      * ImageShadowTask::process
113      */
114     protected final Map<CaptureSession, ImageShadowTask> mShadowTaskMap;
115 
116     // The available threadpools for scheduling
117     protected final ExecutorService mThreadPoolFast;
118     protected final ExecutorService mThreadPoolAverage;
119     protected final ExecutorService mThreadPoolSlow;
120 
121     private final LruResourcePool<Integer, ByteBuffer> mByteBufferDirectPool;
122 
123     /**
124      * Approximate viewable size (in pixels) for the fast thumbnail in the
125      * current UX definition of the product. Note that these values will be the
126      * minimum size of FAST_THUMBNAIL target for the CONVERT_TO_RGB_PREVIEW
127      * task.
128      */
129     private final Size mTinyThumbnailTargetSize;
130 
131     /**
132      * A standard viewable size (in pixels) for the filmstrip thumbnail in the
133      * current UX definition of the product. Note that this size is the minimum
134      * size for the Preview on the filmstrip associated with
135      * COMPRESS_TO_JPEG_AND_WRITE_TO_DISK task.
136      */
137     private final static Size FILMSTRIP_THUMBNAIL_TARGET_SIZE = new Size(512, 384);
138 
139     // Some invariants to know that we're keeping track of everything
140     // that reflect the state of mImageSemaphoreMap
141     private int mOutstandingImageRefs = 0;
142 
143     private int mOutstandingImageOpened = 0;
144 
145     private int mOutstandingImageClosed = 0;
146 
147     // Objects that may be registered to this objects events.
148     private ImageProcessorProxyListener mProxyListener = null;
149 
150     // Default constructor, values are conservatively targeted to the Nexus 6
ImageBackend(ProcessingTaskConsumer processingTaskConsumer, int tinyThumbnailSize)151     public ImageBackend(ProcessingTaskConsumer processingTaskConsumer, int tinyThumbnailSize) {
152         mThreadPoolFast = Executors.newFixedThreadPool(NUM_THREADS_FAST, new FastThreadFactory());
153         mThreadPoolAverage = Executors.newFixedThreadPool(NUM_THREADS_AVERAGE,
154                 new AverageThreadFactory());
155         mThreadPoolSlow = Executors.newFixedThreadPool(NUM_THREADS_SLOW, new SlowThreadFactory());
156         mByteBufferDirectPool = new ByteBufferDirectPool(IMAGE_BACKEND_HARD_REF_POOL_SIZE);
157         mProxyListener = new ImageProcessorProxyListener();
158         mImageSemaphoreMap = new HashMap<>();
159         mShadowTaskMap = new HashMap<>();
160         mProcessingTaskConsumer = processingTaskConsumer;
161         mTinyThumbnailTargetSize = new Size(tinyThumbnailSize, tinyThumbnailSize);
162     }
163 
164     /**
165      * Direct Injection Constructor for Testing purposes.
166      *
167      * @param fastService Service where Tasks of FAST Priority are placed.
168      * @param averageService Service where Tasks of AVERAGE Priority are placed.
169      * @param slowService Service where Tasks of SLOW Priority are placed.
170      * @param imageProcessorProxyListener iamge proxy listener to be used
171      */
ImageBackend(ExecutorService fastService, ExecutorService averageService, ExecutorService slowService, LruResourcePool<Integer, ByteBuffer> byteBufferDirectPool, ImageProcessorProxyListener imageProcessorProxyListener, ProcessingTaskConsumer processingTaskConsumer, int tinyThumbnailSize)172     public ImageBackend(ExecutorService fastService,
173             ExecutorService averageService,
174             ExecutorService slowService,
175             LruResourcePool<Integer, ByteBuffer> byteBufferDirectPool,
176             ImageProcessorProxyListener imageProcessorProxyListener,
177             ProcessingTaskConsumer processingTaskConsumer,
178             int tinyThumbnailSize) {
179         mThreadPoolFast = fastService;
180         mThreadPoolAverage = averageService;
181         mThreadPoolSlow = slowService;
182         mByteBufferDirectPool = byteBufferDirectPool;
183         mProxyListener = imageProcessorProxyListener;
184         mImageSemaphoreMap = new HashMap<>();
185         mShadowTaskMap = new HashMap<>();
186         mProcessingTaskConsumer = processingTaskConsumer;
187         mTinyThumbnailTargetSize = new Size(tinyThumbnailSize, tinyThumbnailSize);
188     }
189 
190     /**
191      * Simple getter for the associated listener object associated with this
192      * instantiation that handles registration of events listeners.
193      *
194      * @return listener proxy that handles events messaging for this object.
195      */
getProxyListener()196     public ImageProcessorProxyListener getProxyListener() {
197         return mProxyListener;
198     }
199 
200     /**
201      * Wrapper function for all log messages created by this object. Default
202      * implementation is to send messages to the Android logger. For test
203      * purposes, this method can be overridden to avoid "Stub!" Runtime
204      * exceptions in Unit Tests.
205      */
logWrapper(String message)206     public void logWrapper(String message) {
207         Log.v(TAG, message);
208     }
209 
210     /**
211      * @return Number of Image references currently held by this instance
212      */
213     @Override
getNumberOfReservedOpenImages()214     public int getNumberOfReservedOpenImages() {
215         synchronized (mImageSemaphoreMap) {
216             // since mOutstandingImageOpened, mOutstandingImageClosed reflect
217             // the historical state of mImageSemaphoreMap, we need to lock on
218             // before we return a value.
219             return mOutstandingImageOpened - mOutstandingImageClosed;
220         }
221     }
222 
223     /**
224      * Returns of the number of receiveImage calls that are currently enqueued
225      * and/or being processed.
226      *
227      * @return The number of receiveImage calls that are currently enqueued
228      *         and/or being processed
229      */
230     @Override
getNumberOfOutstandingCalls()231     public int getNumberOfOutstandingCalls() {
232         synchronized (mShadowTaskMap) {
233             return mShadowTaskMap.size();
234         }
235     }
236 
237     /**
238      * Signals the ImageBackend that a tasks has released a reference to the
239      * image. Imagebackend determines whether all references have been released
240      * and applies its specified release protocol of closing image and/or
241      * unblocking the caller. Should ONLY be called by the tasks running on this
242      * class.
243      *
244      * @param img the image to be released by the task.
245      * @param executor the executor on which the image close is run. if null,
246      *            image close is run by the calling thread (usually the main
247      *            task thread).
248      */
249     @Override
releaseSemaphoreReference(final ImageToProcess img, Executor executor)250     public void releaseSemaphoreReference(final ImageToProcess img, Executor executor) {
251         synchronized (mImageSemaphoreMap) {
252             ImageReleaseProtocol protocol = mImageSemaphoreMap.get(img);
253             if (protocol == null || protocol.getCount() <= 0) {
254                 // That means task implementation has allowed an unbalanced
255                 // semaphore release.
256                 throw new RuntimeException(
257                         "ERROR: Task implementation did NOT balance its release.");
258             }
259 
260             // Normal operation from here.
261             protocol.addCount(-1);
262             mOutstandingImageRefs--;
263             logWrapper("Ref release.  Total refs = " + mOutstandingImageRefs);
264             if (protocol.getCount() == 0) {
265                 // Image is ready to be released
266                 // Remove the image from the map so that it may be submitted
267                 // again.
268                 mImageSemaphoreMap.remove(img);
269 
270                 // Conditionally close the image, specified by initial
271                 // receiveImage call
272                 if (protocol.closeOnRelease) {
273                     closeImageExecutorSafe(img, executor);
274                     logWrapper("Ref release close.");
275                 }
276 
277                 // Conditionally signal the blocking thread to go.
278                 if (protocol.blockUntilRelease) {
279                     protocol.signal();
280                 }
281             } else {
282                 // Image is still being held by other tasks.
283                 // Otherwise, update the semaphore
284                 mImageSemaphoreMap.put(img, protocol);
285             }
286         }
287     }
288 
289     /**
290      * Spawns dependent tasks from internal implementation of a set of tasks. If
291      * a dependent task does NOT require the image reference, it should be
292      * passed a null pointer as an image reference. In general, this method
293      * should be called after the task has completed its own computations, but
294      * before it has released its own image reference (via the
295      * releaseSemaphoreReference call).
296      *
297      * @param tasks The set of tasks to be run
298      * @return whether tasks are successfully submitted.
299      */
300     @Override
appendTasks(ImageToProcess img, Set<TaskImageContainer> tasks)301     public boolean appendTasks(ImageToProcess img, Set<TaskImageContainer> tasks) {
302         // Make sure that referred images are all the same, if it exists.
303         // And count how image references need to be kept track of.
304         int countImageRefs = numPropagatedImageReferences(img, tasks);
305 
306         if (img != null) {
307             // If you're still holding onto the reference, make sure you keep
308             // count
309             incrementSemaphoreReferenceCount(img, countImageRefs);
310         }
311 
312         // Update the done count on the new tasks.
313         incrementTaskDone(tasks);
314 
315         scheduleTasks(tasks);
316         return true;
317     }
318 
319     /**
320      * Spawns a single dependent task from internal implementation of a task.
321      *
322      * @param task The task to be run
323      * @return whether tasks are successfully submitted.
324      */
325     @Override
appendTasks(ImageToProcess img, TaskImageContainer task)326     public boolean appendTasks(ImageToProcess img, TaskImageContainer task) {
327         Set<TaskImageContainer> tasks = new HashSet<TaskImageContainer>(1);
328         tasks.add(task);
329         return appendTasks(img, tasks);
330     }
331 
332     /**
333      * Implements that top-level image single task submission that is defined by
334      * the ImageConsumer interface w/o Runnable to executed.
335      *
336      * @param img Image required by the task
337      * @param task Task to be run
338      * @param blockUntilImageRelease If true, call blocks until the object img
339      *            is no longer referred by any task. If false, call is
340      *            non-blocking
341      * @param closeOnImageRelease If true, images is closed when the object img
342      *            is is no longer referred by any task. If false, After an image
343      *            is submitted, it should never be submitted again to the
344      *            interface until all tasks and their spawned tasks are
345      *            finished.
346      * @return whether jobs were enqueued to the ImageBackend.
347      */
348     @Override
receiveImage(ImageToProcess img, TaskImageContainer task, boolean blockUntilImageRelease, boolean closeOnImageRelease)349     public boolean receiveImage(ImageToProcess img, TaskImageContainer task,
350             boolean blockUntilImageRelease, boolean closeOnImageRelease)
351             throws InterruptedException {
352         return receiveImage(img, task, blockUntilImageRelease, closeOnImageRelease,
353                 Optional.<Runnable> absent());
354     }
355 
356     /**
357      * Implements that top-level image single task submission that is defined by
358      * the ImageConsumer interface.
359      *
360      * @param img Image required by the task
361      * @param task Task to be run
362      * @param blockUntilImageRelease If true, call blocks until the object img
363      *            is no longer referred by any task. If false, call is
364      *            non-blocking
365      * @param closeOnImageRelease If true, images is closed when the object img
366      *            is is no longer referred by any task. If false, After an image
367      *            is submitted, it should never be submitted again to the
368      *            interface until all tasks and their spawned tasks are
369      *            finished.
370      * @param runnableWhenDone Optional runnable to be executed when the set of
371      *            tasks are done.
372      * @return whether jobs were enqueued to the ImageBackend.
373      */
374     @Override
receiveImage(ImageToProcess img, TaskImageContainer task, boolean blockUntilImageRelease, boolean closeOnImageRelease, Optional<Runnable> runnableWhenDone)375     public boolean receiveImage(ImageToProcess img, TaskImageContainer task,
376             boolean blockUntilImageRelease, boolean closeOnImageRelease,
377             Optional<Runnable> runnableWhenDone)
378             throws InterruptedException {
379         Set<TaskImageContainer> passTasks = new HashSet<TaskImageContainer>(1);
380         passTasks.add(task);
381         return receiveImage(img, passTasks, blockUntilImageRelease, closeOnImageRelease,
382                 runnableWhenDone);
383     }
384 
385     /**
386      * Returns an informational string about the current status of ImageBackend,
387      * along with an approximate number of references being held.
388      *
389      * @return an informational string suitable to be dumped into logcat
390      */
391     @Override
toString()392     public String toString() {
393         return "ImageBackend Status BEGIN:\n" +
394                 "Shadow Image Map Size = " + mShadowTaskMap.size() + "\n" +
395                 "Image Semaphore Map Size = " + mImageSemaphoreMap.size() + "\n" +
396                 "OutstandingImageRefs = " + mOutstandingImageRefs + "\n" +
397                 "Proxy Listener Map Size = " + mProxyListener.getMapSize() + "\n" +
398                 "Proxy Listener = " + mProxyListener.getNumRegisteredListeners() + "\n" +
399                 "ImageBackend Status END:\n";
400     }
401 
402     /**
403      * Implements that top-level image single task submission that is defined by
404      * the ImageConsumer interface.
405      *
406      * @param img Image required by the task
407      * @param tasks A set of Tasks to be run
408      * @param blockUntilImageRelease If true, call blocks until the object img
409      *            is no longer referred by any task. If false, call is
410      *            non-blocking
411      * @param closeOnImageRelease If true, images is closed when the object img
412      *            is is no longer referred by any task. If false, After an image
413      *            is submitted, it should never be submitted again to the
414      *            interface until all tasks and their spawned tasks are
415      *            finished.
416      * @param runnableWhenDone Optional runnable to be executed when the set of
417      *            tasks are done.
418      * @return whether receiveImage succeeded. Generally, only happens when the
419      *         image reference is null or the task set is empty.
420      * @throws InterruptedException occurs when call is set to be blocking and
421      *             is interrupted.
422      */
423     @Override
receiveImage(ImageToProcess img, Set<TaskImageContainer> tasks, boolean blockUntilImageRelease, boolean closeOnImageRelease, Optional<Runnable> runnableWhenDone)424     public boolean receiveImage(ImageToProcess img, Set<TaskImageContainer> tasks,
425             boolean blockUntilImageRelease, boolean closeOnImageRelease,
426             Optional<Runnable> runnableWhenDone)
427             throws InterruptedException {
428 
429         // Short circuit if no tasks submitted.
430         if (tasks == null || tasks.size() <= 0) {
431             return false;
432         }
433 
434         if (img == null) {
435             // TODO: Determine whether you need to be so strict at the top level
436             throw new RuntimeException("ERROR: Initial call must reference valid Image!");
437         }
438 
439         // Make sure that referred images are all the same, if it exists.
440         // And count how image references need to be kept track of.
441         int countImageRefs = numPropagatedImageReferences(img, tasks);
442 
443         // Initialize the counters for process-level tasks
444         initializeTaskDone(tasks, runnableWhenDone);
445 
446         // Set the semaphore, given that the number of tasks that need to be
447         // scheduled
448         // and the boolean flags for imaging closing and thread blocking
449         ImageReleaseProtocol protocol = setSemaphoreReferenceCount(img, countImageRefs,
450                 blockUntilImageRelease, closeOnImageRelease);
451 
452         // Put the tasks on their respective queues.
453         scheduleTasks(tasks);
454 
455         // Implement blocking if required
456         if (protocol.blockUntilRelease) {
457             protocol.block();
458         }
459 
460         return true;
461     }
462 
463     /**
464      * Implements that top-level image task submission short-cut that is defined
465      * by the ImageConsumer interface.
466      *
467      * @param img Image required by the task
468      * @param executor Executor to run events and image closes, in case of
469      *            control leakage
470      * @param processingFlags Magical bit vector that specifies jobs to be run
471      *            After an image is submitted, it should never be submitted
472      *            again to the interface until all tasks and their spawned tasks
473      *            are finished.
474      * @param imageProcessorListener Optional listener to automatically register
475      *            at the job task and unregister after all tasks are done
476      * @return whether receiveImage succeeded. Generally, only happens when the
477      *         image reference is null or the task set is empty.
478      * @throws InterruptedException occurs when call is set to be blocking and
479      *             is interrupted.
480      */
481     @Override
receiveImage(ImageToProcess img, Executor executor, Set<ImageTaskFlags> processingFlags, CaptureSession session, Optional<ImageProcessorListener> imageProcessorListener)482     public boolean receiveImage(ImageToProcess img, Executor executor,
483             Set<ImageTaskFlags> processingFlags, CaptureSession session,
484             Optional<ImageProcessorListener> imageProcessorListener)
485             throws InterruptedException {
486 
487         // Uncomment for occasional debugging
488         // Log.v(TAG, toString());
489 
490         Set<TaskImageContainer> tasksToExecute = new HashSet<TaskImageContainer>();
491 
492         if (img == null) {
493             // No data to process, just pure message.
494             return true;
495         }
496 
497         // Now add the pre-mixed versions of the tasks.
498 
499         if (processingFlags.contains(ImageTaskFlags.COMPRESS_TO_JPEG_AND_WRITE_TO_DISK)) {
500             if (processingFlags.contains(ImageTaskFlags.CREATE_EARLY_FILMSTRIP_PREVIEW)) {
501                 // Request job that creates both filmstrip thumbnail from YUV,
502                 // JPEG compression of the YUV Image, and writes the result to
503                 // disk
504                 tasksToExecute.add(new TaskPreviewChainedJpeg(img, executor, this, session,
505                         FILMSTRIP_THUMBNAIL_TARGET_SIZE, mByteBufferDirectPool));
506             } else {
507                 // Request job that only does JPEG compression and writes the
508                 // result to disk
509                 tasksToExecute.add(new TaskCompressImageToJpeg(img, executor, this, session,
510                       mByteBufferDirectPool));
511             }
512         }
513 
514         if (processingFlags.contains(ImageTaskFlags.CONVERT_TO_RGB_PREVIEW)) {
515             // Add an additional type of task to the appropriate queue.
516             tasksToExecute.add(new TaskConvertImageToRGBPreview(img, executor,
517                     this, TaskImageContainer.ProcessingPriority.FAST, session,
518                     mTinyThumbnailTargetSize,
519                     TaskConvertImageToRGBPreview.ThumbnailShape.SQUARE_ASPECT_CIRCULAR_INSET));
520         }
521 
522         // Wrap the listener in a runnable that will be fired when all tasks are
523         // complete.
524         final Optional<Runnable> runnableOptional;
525         if (imageProcessorListener.isPresent()) {
526             final ImageProcessorListener finalImageProcessorListener = imageProcessorListener.get();
527             Runnable unregisterRunnable = new Runnable() {
528                 @Override
529                 public void run() {
530                     getProxyListener().unregisterListener(finalImageProcessorListener);
531                 }
532             };
533             runnableOptional = Optional.of(unregisterRunnable);
534         } else {
535             runnableOptional = Optional.<Runnable> absent();
536         }
537 
538         if (receiveImage(img, tasksToExecute,
539                 processingFlags.contains(ImageTaskFlags.BLOCK_UNTIL_ALL_TASKS_RELEASE),
540                 processingFlags.contains(ImageTaskFlags.CLOSE_ON_ALL_TASKS_RELEASE),
541                 runnableOptional)) {
542             if (imageProcessorListener.isPresent()) {
543                 getProxyListener().registerListener(imageProcessorListener.get(), img.proxy);
544             }
545             return true;
546         } else {
547             return false;
548         }
549     }
550 
551     /**
552      * Factory functions, in case, you want some shake and bake functionality.
553      */
createTaskConvertImageToRGBPreview( ImageToProcess image, Executor executor, ImageBackend imageBackend, CaptureSession session, Size targetSize, TaskConvertImageToRGBPreview.ThumbnailShape thumbnailShape)554     public TaskConvertImageToRGBPreview createTaskConvertImageToRGBPreview(
555             ImageToProcess image, Executor executor, ImageBackend imageBackend,
556             CaptureSession session, Size targetSize,
557             TaskConvertImageToRGBPreview.ThumbnailShape thumbnailShape) {
558         return new TaskConvertImageToRGBPreview(image, executor, imageBackend,
559                 TaskImageContainer.ProcessingPriority.FAST, session,
560                 mTinyThumbnailTargetSize, thumbnailShape);
561     }
562 
createTaskCompressImageToJpeg(ImageToProcess image, Executor executor, ImageBackend imageBackend, CaptureSession session)563     public TaskCompressImageToJpeg createTaskCompressImageToJpeg(ImageToProcess image,
564             Executor executor, ImageBackend imageBackend, CaptureSession session) {
565         return new TaskCompressImageToJpeg(image, executor, imageBackend, session,
566               mByteBufferDirectPool);
567     }
568 
569     /**
570      * Blocks and waits for all tasks to complete.
571      */
572     @Override
shutdown()573     public void shutdown() {
574         mThreadPoolSlow.shutdown();
575         mThreadPoolFast.shutdown();
576     }
577 
578     /**
579      * For a given set of starting tasks, initialize the associated sessions
580      * with a proper blocking semaphore and value of number of tasks to be run.
581      * For each semaphore, a ImageShadowTask will be instantiated and enqueued
582      * onto the selected ProcessingSerivceManager.
583      *
584      * @param tasks The set of ImageContainer tasks to be run on ImageBackend
585      */
initializeTaskDone(Set<TaskImageContainer> tasks, Optional<Runnable> runnableWhenDone)586     protected void initializeTaskDone(Set<TaskImageContainer> tasks,
587             Optional<Runnable> runnableWhenDone) {
588         Set<CaptureSession> sessionSet = new HashSet<>();
589         Map<CaptureSession, Integer> sessionTaskCount = new HashMap<>();
590 
591         // Create a set w/ no session duplicates and count them
592         for (TaskImageContainer task : tasks) {
593             sessionSet.add(task.mSession);
594             Integer currentCount = sessionTaskCount.get(task.mSession);
595             if (currentCount == null) {
596                 sessionTaskCount.put(task.mSession, 1);
597             } else {
598                 sessionTaskCount.put(task.mSession, currentCount + 1);
599             }
600         }
601 
602         // Create a new blocking semaphore for each set of tasks on a given
603         // session.
604         synchronized (mShadowTaskMap) {
605             for (CaptureSession captureSession : sessionSet) {
606                 BlockSignalProtocol protocol = new BlockSignalProtocol();
607                 protocol.setCount(sessionTaskCount.get(captureSession));
608                 final ImageShadowTask shadowTask;
609                 shadowTask = new ImageShadowTask(protocol, captureSession,
610                             runnableWhenDone);
611                 mShadowTaskMap.put(captureSession, shadowTask);
612                 mProcessingTaskConsumer.enqueueTask(shadowTask);
613             }
614         }
615     }
616 
617     /**
618      * For ImageBackend tasks that spawn their own tasks, increase the semaphore
619      * count to take into account the new tasks being spawned.
620      *
621      * @param tasks The set of tasks to be spawned.
622      */
incrementTaskDone(Set<TaskImageContainer> tasks)623     protected void incrementTaskDone(Set<TaskImageContainer> tasks) throws RuntimeException {
624         // TODO: Add invariant test so that all sessions are the same.
625         synchronized (mShadowTaskMap) {
626             for (TaskImageContainer task : tasks) {
627                 ImageShadowTask shadowTask = mShadowTaskMap.get(task.mSession);
628                 if (shadowTask == null) {
629                     throw new RuntimeException(
630                             "Session NOT previously registered."
631                                     + " ImageShadowTask booking-keeping is incorrect.");
632                 }
633                 shadowTask.getProtocol().addCount(1);
634             }
635         }
636     }
637 
638     /**
639      * Decrement the semaphore count of the ImageShadowTask. Should be called
640      * when a task completes its processing in ImageBackend.
641      *
642      * @param imageShadowTask The ImageShadow task that contains the blocking
643      *            semaphore.
644      * @return whether all the tasks associated with an ImageShadowTask are done
645      */
decrementTaskDone(ImageShadowTask imageShadowTask)646     protected boolean decrementTaskDone(ImageShadowTask imageShadowTask) {
647         synchronized (mShadowTaskMap) {
648             int remainingTasks = imageShadowTask.getProtocol().addCount(-1);
649             if (remainingTasks == 0) {
650                 mShadowTaskMap.remove(imageShadowTask.getSession());
651                 imageShadowTask.getProtocol().signal();
652                 return true;
653             } else {
654                 return false;
655             }
656         }
657 
658     }
659 
660     /**
661      * Puts the tasks on the specified queue. May be more complicated in the
662      * future.
663      *
664      * @param tasks The set of tasks to be run
665      */
scheduleTasks(Set<TaskImageContainer> tasks)666     protected void scheduleTasks(Set<TaskImageContainer> tasks) {
667         synchronized (mShadowTaskMap) {
668             for (TaskImageContainer task : tasks) {
669                 ImageShadowTask shadowTask = mShadowTaskMap.get(task.mSession);
670                 if (shadowTask == null) {
671                     throw new IllegalStateException("Scheduling a task with a unknown session.");
672                 }
673                 // Before scheduling, wrap TaskImageContainer inside of the
674                 // TaskDoneWrapper to add
675                 // instrumentation for managing ImageShadowTasks
676                 switch (task.getProcessingPriority()) {
677                     case FAST:
678                         mThreadPoolFast.execute(new TaskDoneWrapper(this, shadowTask, task));
679                         break;
680                     case AVERAGE:
681                         mThreadPoolAverage.execute(new TaskDoneWrapper(this, shadowTask, task));
682                         break;
683                     case SLOW:
684                         mThreadPoolSlow.execute(new TaskDoneWrapper(this, shadowTask, task));
685                         break;
686                     default:
687                         mThreadPoolSlow.execute(new TaskDoneWrapper(this, shadowTask, task));
688                         break;
689                 }
690             }
691         }
692     }
693 
694     /**
695      * Initializes the semaphore count for the image
696      *
697      * @return The protocol object that keeps tracks of the image reference
698      *         count and actions to be taken on release.
699      */
setSemaphoreReferenceCount(ImageToProcess img, int count, boolean blockUntilRelease, boolean closeOnRelease)700     protected ImageReleaseProtocol setSemaphoreReferenceCount(ImageToProcess img, int count,
701             boolean blockUntilRelease, boolean closeOnRelease) throws RuntimeException {
702         synchronized (mImageSemaphoreMap) {
703             if (mImageSemaphoreMap.get(img) != null) {
704                 throw new RuntimeException(
705                         "ERROR: Rewriting of Semaphore Lock."
706                                 + "  Image references may not freed properly");
707             }
708 
709             // Create the new booking-keeping object.
710             ImageReleaseProtocol protocol = new ImageReleaseProtocol(blockUntilRelease,
711                     closeOnRelease);
712             protocol.setCount(count);
713 
714             mImageSemaphoreMap.put(img, protocol);
715             mOutstandingImageRefs += count;
716             mOutstandingImageOpened++;
717             logWrapper("Received an opened image: " + mOutstandingImageOpened + "/"
718                     + mOutstandingImageClosed);
719             logWrapper("Setting an image reference count of " + count + "   Total refs = "
720                     + mOutstandingImageRefs);
721             return protocol;
722         }
723     }
724 
725     /**
726      * Increments the semaphore count for the image. Should ONLY be internally
727      * via appendTasks by internal tasks. Otherwise, image references could get
728      * out of whack.
729      *
730      * @param img The Image associated with the set of tasks running on it.
731      * @param count The number of tasks to be added
732      * @throws RuntimeException Indicates image Closing Bookkeeping is screwed
733      *             up.
734      */
incrementSemaphoreReferenceCount(ImageToProcess img, int count)735     protected void incrementSemaphoreReferenceCount(ImageToProcess img, int count)
736             throws RuntimeException {
737         synchronized (mImageSemaphoreMap) {
738             ImageReleaseProtocol protocol = mImageSemaphoreMap.get(img);
739             if (mImageSemaphoreMap.get(img) == null) {
740                 throw new RuntimeException(
741                         "Image Reference has already been released or has never been held.");
742             }
743 
744             protocol.addCount(count);
745             mImageSemaphoreMap.put(img, protocol);
746 
747             mOutstandingImageRefs += count;
748         }
749     }
750 
751     /**
752      * Close an Image with a executor if it's available and does the proper
753      * booking keeping on the object.
754      *
755      * @param img Image to be closed
756      * @param executor Executor to be used, if executor is null, the close is
757      *            run on the task thread
758      */
closeImageExecutorSafe(final ImageToProcess img, Executor executor)759     private void closeImageExecutorSafe(final ImageToProcess img, Executor executor) {
760         Runnable closeTask = new Runnable() {
761             @Override
762             public void run() {
763                 img.proxy.close();
764                 mOutstandingImageClosed++;
765                 logWrapper("Release of image occurred.  Good fun. " + "Total Images Open/Closed = "
766                         + mOutstandingImageOpened + "/" + mOutstandingImageClosed);
767             }
768         };
769         if (executor == null) {
770             // Just run it on the main thread.
771             closeTask.run();
772         } else {
773             executor.execute(closeTask);
774         }
775     }
776 
777     /**
778      * Calculates the number of new Image references in a set of dependent
779      * tasks. Checks to make sure no new image references are being introduced.
780      *
781      * @param tasks The set of dependent tasks to be run
782      */
numPropagatedImageReferences(ImageToProcess img, Set<TaskImageContainer> tasks)783     private int numPropagatedImageReferences(ImageToProcess img, Set<TaskImageContainer> tasks)
784             throws RuntimeException {
785         int countImageRefs = 0;
786         for (TaskImageContainer task : tasks) {
787             if (task.mImage != null && task.mImage != img) {
788                 throw new RuntimeException("ERROR:  Spawned tasks cannot reference new images!");
789             }
790 
791             if (task.mImage != null) {
792                 countImageRefs++;
793             }
794         }
795 
796         return countImageRefs;
797     }
798 
799     /**
800      * Simple wrapper task to instrument when tasks ends so that ImageBackend
801      * can fire events when set of tasks created by a ReceiveImage call have all
802      * completed.
803      */
804     private class TaskDoneWrapper implements Runnable {
805         private final ImageBackend mImageBackend;
806         private final ImageShadowTask mImageShadowTask;
807         private final TaskImageContainer mWrappedTask;
808 
809         /**
810          * Constructor
811          *
812          * @param imageBackend ImageBackend that the task is running on
813          * @param imageShadowTask ImageShadowTask that is blocking on the
814          *            completion of the task
815          * @param wrappedTask The task to be run w/o instrumentation
816          */
TaskDoneWrapper(ImageBackend imageBackend, ImageShadowTask imageShadowTask, TaskImageContainer wrappedTask)817         public TaskDoneWrapper(ImageBackend imageBackend, ImageShadowTask imageShadowTask,
818                 TaskImageContainer wrappedTask) {
819             mImageBackend = imageBackend;
820             mImageShadowTask = imageShadowTask;
821             mWrappedTask = wrappedTask;
822         }
823 
824         /**
825          * Adds instrumentation that runs when a TaskImageContainer completes.
826          */
827         @Override
run()828         public void run() {
829             mWrappedTask.run();
830             // Decrement count
831             if (mImageBackend.decrementTaskDone(mImageShadowTask)) {
832                 // If you're the last one...
833                 Runnable doneRunnable = mImageShadowTask.getRunnableWhenDone();
834                 if (doneRunnable != null) {
835                     if (mWrappedTask.mExecutor == null) {
836                         doneRunnable.run();
837                     } else {
838                         mWrappedTask.mExecutor.execute(doneRunnable);
839                     }
840                 }
841             }
842         }
843     }
844 
845     /**
846      * Encapsulates all synchronization for semaphore signaling and blocking.
847      */
848     static public class BlockSignalProtocol {
849         private int count;
850 
851         private final ReentrantLock mLock = new ReentrantLock();
852 
853         private Condition mSignal;
854 
setCount(int value)855         public void setCount(int value) {
856             mLock.lock();
857             count = value;
858             mLock.unlock();
859         }
860 
getCount()861         public int getCount() {
862             int value;
863             mLock.lock();
864             value = count;
865             mLock.unlock();
866             return value;
867         }
868 
addCount(int value)869         public int addCount(int value) {
870             mLock.lock();
871             try {
872                 count += value;
873                 return count;
874             } finally {
875                 mLock.unlock();
876             }
877         }
878 
BlockSignalProtocol()879         BlockSignalProtocol() {
880             count = 0;
881             mSignal = mLock.newCondition();
882         }
883 
block()884         public void block() throws InterruptedException {
885             mLock.lock();
886             try {
887                 while (count != 0) {
888                     // Spin to deal with spurious signals.
889                     mSignal.await();
890                 }
891             } catch (InterruptedException e) {
892                 // TODO: on interruption, figure out what to do.
893                 throw (e);
894             } finally {
895                 mLock.unlock();
896             }
897         }
898 
signal()899         public void signal() {
900             mLock.lock();
901             mSignal.signal();
902             mLock.unlock();
903         }
904 
905     }
906 
907     /**
908      * A simple tuple class to keep track of image reference, and whether to
909      * block and/or close on final image release. Instantiated on every task
910      * submission call.
911      */
912     static public class ImageReleaseProtocol extends BlockSignalProtocol {
913 
914         public final boolean blockUntilRelease;
915 
916         public final boolean closeOnRelease;
917 
ImageReleaseProtocol(boolean block, boolean close)918         ImageReleaseProtocol(boolean block, boolean close) {
919             super();
920             blockUntilRelease = block;
921             closeOnRelease = close;
922         }
923 
924     }
925 
926     // Thread factories for a default constructor
927     private class FastThreadFactory implements ThreadFactory {
928         @Override
newThread(Runnable r)929         public Thread newThread(Runnable r) {
930             Thread t = new AndroidPriorityThread(FAST_THREAD_PRIORITY, r);
931             return t;
932         }
933     }
934 
935     private class AverageThreadFactory implements ThreadFactory {
936         @Override
newThread(Runnable r)937         public Thread newThread(Runnable r) {
938             Thread t = new AndroidPriorityThread(AVERAGE_THREAD_PRIORITY, r);
939             return t;
940         }
941     }
942 
943     private class SlowThreadFactory implements ThreadFactory {
944         @Override
newThread(Runnable r)945         public Thread newThread(Runnable r) {
946             Thread t = new AndroidPriorityThread(SLOW_THREAD_PRIORITY, r);
947             return t;
948         }
949     }
950 
951 }
952