1 /* 2 * Copyright (C) 2014 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.camera.processing.imagebackend; 18 19 import android.os.Process; 20 21 import com.android.camera.async.AndroidPriorityThread; 22 import com.android.camera.debug.Log; 23 import com.android.camera.processing.ProcessingTaskConsumer; 24 import com.android.camera.processing.memory.ByteBufferDirectPool; 25 import com.android.camera.processing.memory.LruResourcePool; 26 import com.android.camera.session.CaptureSession; 27 import com.android.camera.util.Size; 28 import com.google.common.base.Optional; 29 30 import java.nio.ByteBuffer; 31 import java.util.HashMap; 32 import java.util.HashSet; 33 import java.util.Map; 34 import java.util.Set; 35 import java.util.concurrent.Executor; 36 import java.util.concurrent.ExecutorService; 37 import java.util.concurrent.Executors; 38 import java.util.concurrent.ThreadFactory; 39 import java.util.concurrent.locks.Condition; 40 import java.util.concurrent.locks.ReentrantLock; 41 42 /** 43 * This ImageBackend is created for the purpose of creating a task-running 44 * infrastructure that has two-level of priority and doing the book-keeping to 45 * keep track of tasks that use Android Images. Android.media.images are 46 * critical system resources that MUST be properly managed in order to maintain 47 * camera application performance. Android.media.images are merely Java handles 48 * to regions of physically contiguous memory used by the camera hardware as a 49 * destination for imaging data. In general, this physically contiguous memory 50 * is not counted as an application resource, but as a system resources held by 51 * the application and does NOT count against the limits of application memory. 52 * The performance pressures of both computing and memory resources must often 53 * be prioritized in releasing Android.media.images in a timely manner. In order 54 * to properly balance these concerns, most image processing requested should be 55 * routed through this object. This object is also responsible for releasing 56 * Android.media image as soon as possible, so as not to stall the camera 57 * hardware subsystem. Image that reserve these images are a subclass of the 58 * basic Java Runnable with a few conditions placed upon their run() 59 * implementation: 60 * <ol> 61 * <li>The task will try to release the image as early as possible by calling 62 * the releaseSemaphoreReference as soon as a reference to the original image is 63 * no longer required.</li> 64 * <li>A set of tasks that require ImageData must only happen on the first 65 * receiveImage call. receiveImage must only be called once per image.</li> 66 * <li>However, the submitted tasks may spawn new tasks via the appendTask with 67 * any image that have had a task submitted, but NOT released via 68 * releaseSemaphoreReference.</li> 69 * <li>Computation that is dependent on multiple images should be written into 70 * this task framework in a distributed manner where image task can be computed 71 * independently and join their results to a common shared object.This style of 72 * implementation allows for the earliest release of Android Images while 73 * honoring the resources priorities set by this class. See the Lucky shot 74 * implementation for a concrete example for this shared object and its 75 * respective task {@link TaskLuckyShotSession} {@link LuckyShotSession}</li> 76 * </ol> 77 * To integrate with the ProcessingServiceManager, ImageBackend also signals to 78 * the ProcessingServiceManager its processing state by enqueuing 79 * ImageShadowTasks on each ImageBackend::receiveImage call. These ImageShadow 80 * tasks have no implementation, but emulate the processing delay by blocking 81 * until all tasks submitted and spawned by a particular receiveImage call have 82 * completed their processing. This emulated functionality ensures that other 83 * ProcessingTasks associated with Lens Blur and Panorama are not processing 84 * while the ImageBackend is running. Unfairly, the ImageBackend proceeds with 85 * its own processing regardless of the state of ImageShadowTask. 86 * ImageShadowTasks that are associated with ImageBackend tasks that have 87 * already been completed should return immediately on its process call. 88 */ 89 public class ImageBackend implements ImageConsumer, ImageTaskManager { 90 private static final Log.Tag TAG = new Log.Tag("ImageBackend"); 91 92 protected static final int NUM_THREADS_FAST = 2; 93 protected static final int NUM_THREADS_AVERAGE = 2; 94 protected static final int NUM_THREADS_SLOW = 2; 95 96 private static final int FAST_THREAD_PRIORITY = Process.THREAD_PRIORITY_DISPLAY; 97 private static final int AVERAGE_THREAD_PRIORITY = Process.THREAD_PRIORITY_DEFAULT 98 + Process.THREAD_PRIORITY_LESS_FAVORABLE; 99 private static final int SLOW_THREAD_PRIORITY = Process.THREAD_PRIORITY_BACKGROUND 100 + Process.THREAD_PRIORITY_MORE_FAVORABLE; 101 102 private static final int IMAGE_BACKEND_HARD_REF_POOL_SIZE = 2; 103 104 protected final ProcessingTaskConsumer mProcessingTaskConsumer; 105 106 /** 107 * Map for TaskImageContainer and the release of ImageProxy Book-keeping 108 */ 109 protected final Map<ImageToProcess, ImageReleaseProtocol> mImageSemaphoreMap; 110 /** 111 * Map for ImageShadowTask and release of blocking on 112 * ImageShadowTask::process 113 */ 114 protected final Map<CaptureSession, ImageShadowTask> mShadowTaskMap; 115 116 // The available threadpools for scheduling 117 protected final ExecutorService mThreadPoolFast; 118 protected final ExecutorService mThreadPoolAverage; 119 protected final ExecutorService mThreadPoolSlow; 120 121 private final LruResourcePool<Integer, ByteBuffer> mByteBufferDirectPool; 122 123 /** 124 * Approximate viewable size (in pixels) for the fast thumbnail in the 125 * current UX definition of the product. Note that these values will be the 126 * minimum size of FAST_THUMBNAIL target for the CONVERT_TO_RGB_PREVIEW 127 * task. 128 */ 129 private final Size mTinyThumbnailTargetSize; 130 131 /** 132 * A standard viewable size (in pixels) for the filmstrip thumbnail in the 133 * current UX definition of the product. Note that this size is the minimum 134 * size for the Preview on the filmstrip associated with 135 * COMPRESS_TO_JPEG_AND_WRITE_TO_DISK task. 136 */ 137 private final static Size FILMSTRIP_THUMBNAIL_TARGET_SIZE = new Size(512, 384); 138 139 // Some invariants to know that we're keeping track of everything 140 // that reflect the state of mImageSemaphoreMap 141 private int mOutstandingImageRefs = 0; 142 143 private int mOutstandingImageOpened = 0; 144 145 private int mOutstandingImageClosed = 0; 146 147 // Objects that may be registered to this objects events. 148 private ImageProcessorProxyListener mProxyListener = null; 149 150 // Default constructor, values are conservatively targeted to the Nexus 6 ImageBackend(ProcessingTaskConsumer processingTaskConsumer, int tinyThumbnailSize)151 public ImageBackend(ProcessingTaskConsumer processingTaskConsumer, int tinyThumbnailSize) { 152 mThreadPoolFast = Executors.newFixedThreadPool(NUM_THREADS_FAST, new FastThreadFactory()); 153 mThreadPoolAverage = Executors.newFixedThreadPool(NUM_THREADS_AVERAGE, 154 new AverageThreadFactory()); 155 mThreadPoolSlow = Executors.newFixedThreadPool(NUM_THREADS_SLOW, new SlowThreadFactory()); 156 mByteBufferDirectPool = new ByteBufferDirectPool(IMAGE_BACKEND_HARD_REF_POOL_SIZE); 157 mProxyListener = new ImageProcessorProxyListener(); 158 mImageSemaphoreMap = new HashMap<>(); 159 mShadowTaskMap = new HashMap<>(); 160 mProcessingTaskConsumer = processingTaskConsumer; 161 mTinyThumbnailTargetSize = new Size(tinyThumbnailSize, tinyThumbnailSize); 162 } 163 164 /** 165 * Direct Injection Constructor for Testing purposes. 166 * 167 * @param fastService Service where Tasks of FAST Priority are placed. 168 * @param averageService Service where Tasks of AVERAGE Priority are placed. 169 * @param slowService Service where Tasks of SLOW Priority are placed. 170 * @param imageProcessorProxyListener iamge proxy listener to be used 171 */ ImageBackend(ExecutorService fastService, ExecutorService averageService, ExecutorService slowService, LruResourcePool<Integer, ByteBuffer> byteBufferDirectPool, ImageProcessorProxyListener imageProcessorProxyListener, ProcessingTaskConsumer processingTaskConsumer, int tinyThumbnailSize)172 public ImageBackend(ExecutorService fastService, 173 ExecutorService averageService, 174 ExecutorService slowService, 175 LruResourcePool<Integer, ByteBuffer> byteBufferDirectPool, 176 ImageProcessorProxyListener imageProcessorProxyListener, 177 ProcessingTaskConsumer processingTaskConsumer, 178 int tinyThumbnailSize) { 179 mThreadPoolFast = fastService; 180 mThreadPoolAverage = averageService; 181 mThreadPoolSlow = slowService; 182 mByteBufferDirectPool = byteBufferDirectPool; 183 mProxyListener = imageProcessorProxyListener; 184 mImageSemaphoreMap = new HashMap<>(); 185 mShadowTaskMap = new HashMap<>(); 186 mProcessingTaskConsumer = processingTaskConsumer; 187 mTinyThumbnailTargetSize = new Size(tinyThumbnailSize, tinyThumbnailSize); 188 } 189 190 /** 191 * Simple getter for the associated listener object associated with this 192 * instantiation that handles registration of events listeners. 193 * 194 * @return listener proxy that handles events messaging for this object. 195 */ getProxyListener()196 public ImageProcessorProxyListener getProxyListener() { 197 return mProxyListener; 198 } 199 200 /** 201 * Wrapper function for all log messages created by this object. Default 202 * implementation is to send messages to the Android logger. For test 203 * purposes, this method can be overridden to avoid "Stub!" Runtime 204 * exceptions in Unit Tests. 205 */ logWrapper(String message)206 public void logWrapper(String message) { 207 Log.v(TAG, message); 208 } 209 210 /** 211 * @return Number of Image references currently held by this instance 212 */ 213 @Override getNumberOfReservedOpenImages()214 public int getNumberOfReservedOpenImages() { 215 synchronized (mImageSemaphoreMap) { 216 // since mOutstandingImageOpened, mOutstandingImageClosed reflect 217 // the historical state of mImageSemaphoreMap, we need to lock on 218 // before we return a value. 219 return mOutstandingImageOpened - mOutstandingImageClosed; 220 } 221 } 222 223 /** 224 * Returns of the number of receiveImage calls that are currently enqueued 225 * and/or being processed. 226 * 227 * @return The number of receiveImage calls that are currently enqueued 228 * and/or being processed 229 */ 230 @Override getNumberOfOutstandingCalls()231 public int getNumberOfOutstandingCalls() { 232 synchronized (mShadowTaskMap) { 233 return mShadowTaskMap.size(); 234 } 235 } 236 237 /** 238 * Signals the ImageBackend that a tasks has released a reference to the 239 * image. Imagebackend determines whether all references have been released 240 * and applies its specified release protocol of closing image and/or 241 * unblocking the caller. Should ONLY be called by the tasks running on this 242 * class. 243 * 244 * @param img the image to be released by the task. 245 * @param executor the executor on which the image close is run. if null, 246 * image close is run by the calling thread (usually the main 247 * task thread). 248 */ 249 @Override releaseSemaphoreReference(final ImageToProcess img, Executor executor)250 public void releaseSemaphoreReference(final ImageToProcess img, Executor executor) { 251 synchronized (mImageSemaphoreMap) { 252 ImageReleaseProtocol protocol = mImageSemaphoreMap.get(img); 253 if (protocol == null || protocol.getCount() <= 0) { 254 // That means task implementation has allowed an unbalanced 255 // semaphore release. 256 throw new RuntimeException( 257 "ERROR: Task implementation did NOT balance its release."); 258 } 259 260 // Normal operation from here. 261 protocol.addCount(-1); 262 mOutstandingImageRefs--; 263 logWrapper("Ref release. Total refs = " + mOutstandingImageRefs); 264 if (protocol.getCount() == 0) { 265 // Image is ready to be released 266 // Remove the image from the map so that it may be submitted 267 // again. 268 mImageSemaphoreMap.remove(img); 269 270 // Conditionally close the image, specified by initial 271 // receiveImage call 272 if (protocol.closeOnRelease) { 273 closeImageExecutorSafe(img, executor); 274 logWrapper("Ref release close."); 275 } 276 277 // Conditionally signal the blocking thread to go. 278 if (protocol.blockUntilRelease) { 279 protocol.signal(); 280 } 281 } else { 282 // Image is still being held by other tasks. 283 // Otherwise, update the semaphore 284 mImageSemaphoreMap.put(img, protocol); 285 } 286 } 287 } 288 289 /** 290 * Spawns dependent tasks from internal implementation of a set of tasks. If 291 * a dependent task does NOT require the image reference, it should be 292 * passed a null pointer as an image reference. In general, this method 293 * should be called after the task has completed its own computations, but 294 * before it has released its own image reference (via the 295 * releaseSemaphoreReference call). 296 * 297 * @param tasks The set of tasks to be run 298 * @return whether tasks are successfully submitted. 299 */ 300 @Override appendTasks(ImageToProcess img, Set<TaskImageContainer> tasks)301 public boolean appendTasks(ImageToProcess img, Set<TaskImageContainer> tasks) { 302 // Make sure that referred images are all the same, if it exists. 303 // And count how image references need to be kept track of. 304 int countImageRefs = numPropagatedImageReferences(img, tasks); 305 306 if (img != null) { 307 // If you're still holding onto the reference, make sure you keep 308 // count 309 incrementSemaphoreReferenceCount(img, countImageRefs); 310 } 311 312 // Update the done count on the new tasks. 313 incrementTaskDone(tasks); 314 315 scheduleTasks(tasks); 316 return true; 317 } 318 319 /** 320 * Spawns a single dependent task from internal implementation of a task. 321 * 322 * @param task The task to be run 323 * @return whether tasks are successfully submitted. 324 */ 325 @Override appendTasks(ImageToProcess img, TaskImageContainer task)326 public boolean appendTasks(ImageToProcess img, TaskImageContainer task) { 327 Set<TaskImageContainer> tasks = new HashSet<TaskImageContainer>(1); 328 tasks.add(task); 329 return appendTasks(img, tasks); 330 } 331 332 /** 333 * Implements that top-level image single task submission that is defined by 334 * the ImageConsumer interface w/o Runnable to executed. 335 * 336 * @param img Image required by the task 337 * @param task Task to be run 338 * @param blockUntilImageRelease If true, call blocks until the object img 339 * is no longer referred by any task. If false, call is 340 * non-blocking 341 * @param closeOnImageRelease If true, images is closed when the object img 342 * is is no longer referred by any task. If false, After an image 343 * is submitted, it should never be submitted again to the 344 * interface until all tasks and their spawned tasks are 345 * finished. 346 * @return whether jobs were enqueued to the ImageBackend. 347 */ 348 @Override receiveImage(ImageToProcess img, TaskImageContainer task, boolean blockUntilImageRelease, boolean closeOnImageRelease)349 public boolean receiveImage(ImageToProcess img, TaskImageContainer task, 350 boolean blockUntilImageRelease, boolean closeOnImageRelease) 351 throws InterruptedException { 352 return receiveImage(img, task, blockUntilImageRelease, closeOnImageRelease, 353 Optional.<Runnable> absent()); 354 } 355 356 /** 357 * Implements that top-level image single task submission that is defined by 358 * the ImageConsumer interface. 359 * 360 * @param img Image required by the task 361 * @param task Task to be run 362 * @param blockUntilImageRelease If true, call blocks until the object img 363 * is no longer referred by any task. If false, call is 364 * non-blocking 365 * @param closeOnImageRelease If true, images is closed when the object img 366 * is is no longer referred by any task. If false, After an image 367 * is submitted, it should never be submitted again to the 368 * interface until all tasks and their spawned tasks are 369 * finished. 370 * @param runnableWhenDone Optional runnable to be executed when the set of 371 * tasks are done. 372 * @return whether jobs were enqueued to the ImageBackend. 373 */ 374 @Override receiveImage(ImageToProcess img, TaskImageContainer task, boolean blockUntilImageRelease, boolean closeOnImageRelease, Optional<Runnable> runnableWhenDone)375 public boolean receiveImage(ImageToProcess img, TaskImageContainer task, 376 boolean blockUntilImageRelease, boolean closeOnImageRelease, 377 Optional<Runnable> runnableWhenDone) 378 throws InterruptedException { 379 Set<TaskImageContainer> passTasks = new HashSet<TaskImageContainer>(1); 380 passTasks.add(task); 381 return receiveImage(img, passTasks, blockUntilImageRelease, closeOnImageRelease, 382 runnableWhenDone); 383 } 384 385 /** 386 * Returns an informational string about the current status of ImageBackend, 387 * along with an approximate number of references being held. 388 * 389 * @return an informational string suitable to be dumped into logcat 390 */ 391 @Override toString()392 public String toString() { 393 return "ImageBackend Status BEGIN:\n" + 394 "Shadow Image Map Size = " + mShadowTaskMap.size() + "\n" + 395 "Image Semaphore Map Size = " + mImageSemaphoreMap.size() + "\n" + 396 "OutstandingImageRefs = " + mOutstandingImageRefs + "\n" + 397 "Proxy Listener Map Size = " + mProxyListener.getMapSize() + "\n" + 398 "Proxy Listener = " + mProxyListener.getNumRegisteredListeners() + "\n" + 399 "ImageBackend Status END:\n"; 400 } 401 402 /** 403 * Implements that top-level image single task submission that is defined by 404 * the ImageConsumer interface. 405 * 406 * @param img Image required by the task 407 * @param tasks A set of Tasks to be run 408 * @param blockUntilImageRelease If true, call blocks until the object img 409 * is no longer referred by any task. If false, call is 410 * non-blocking 411 * @param closeOnImageRelease If true, images is closed when the object img 412 * is is no longer referred by any task. If false, After an image 413 * is submitted, it should never be submitted again to the 414 * interface until all tasks and their spawned tasks are 415 * finished. 416 * @param runnableWhenDone Optional runnable to be executed when the set of 417 * tasks are done. 418 * @return whether receiveImage succeeded. Generally, only happens when the 419 * image reference is null or the task set is empty. 420 * @throws InterruptedException occurs when call is set to be blocking and 421 * is interrupted. 422 */ 423 @Override receiveImage(ImageToProcess img, Set<TaskImageContainer> tasks, boolean blockUntilImageRelease, boolean closeOnImageRelease, Optional<Runnable> runnableWhenDone)424 public boolean receiveImage(ImageToProcess img, Set<TaskImageContainer> tasks, 425 boolean blockUntilImageRelease, boolean closeOnImageRelease, 426 Optional<Runnable> runnableWhenDone) 427 throws InterruptedException { 428 429 // Short circuit if no tasks submitted. 430 if (tasks == null || tasks.size() <= 0) { 431 return false; 432 } 433 434 if (img == null) { 435 // TODO: Determine whether you need to be so strict at the top level 436 throw new RuntimeException("ERROR: Initial call must reference valid Image!"); 437 } 438 439 // Make sure that referred images are all the same, if it exists. 440 // And count how image references need to be kept track of. 441 int countImageRefs = numPropagatedImageReferences(img, tasks); 442 443 // Initialize the counters for process-level tasks 444 initializeTaskDone(tasks, runnableWhenDone); 445 446 // Set the semaphore, given that the number of tasks that need to be 447 // scheduled 448 // and the boolean flags for imaging closing and thread blocking 449 ImageReleaseProtocol protocol = setSemaphoreReferenceCount(img, countImageRefs, 450 blockUntilImageRelease, closeOnImageRelease); 451 452 // Put the tasks on their respective queues. 453 scheduleTasks(tasks); 454 455 // Implement blocking if required 456 if (protocol.blockUntilRelease) { 457 protocol.block(); 458 } 459 460 return true; 461 } 462 463 /** 464 * Implements that top-level image task submission short-cut that is defined 465 * by the ImageConsumer interface. 466 * 467 * @param img Image required by the task 468 * @param executor Executor to run events and image closes, in case of 469 * control leakage 470 * @param processingFlags Magical bit vector that specifies jobs to be run 471 * After an image is submitted, it should never be submitted 472 * again to the interface until all tasks and their spawned tasks 473 * are finished. 474 * @param imageProcessorListener Optional listener to automatically register 475 * at the job task and unregister after all tasks are done 476 * @return whether receiveImage succeeded. Generally, only happens when the 477 * image reference is null or the task set is empty. 478 * @throws InterruptedException occurs when call is set to be blocking and 479 * is interrupted. 480 */ 481 @Override receiveImage(ImageToProcess img, Executor executor, Set<ImageTaskFlags> processingFlags, CaptureSession session, Optional<ImageProcessorListener> imageProcessorListener)482 public boolean receiveImage(ImageToProcess img, Executor executor, 483 Set<ImageTaskFlags> processingFlags, CaptureSession session, 484 Optional<ImageProcessorListener> imageProcessorListener) 485 throws InterruptedException { 486 487 // Uncomment for occasional debugging 488 // Log.v(TAG, toString()); 489 490 Set<TaskImageContainer> tasksToExecute = new HashSet<TaskImageContainer>(); 491 492 if (img == null) { 493 // No data to process, just pure message. 494 return true; 495 } 496 497 // Now add the pre-mixed versions of the tasks. 498 499 if (processingFlags.contains(ImageTaskFlags.COMPRESS_TO_JPEG_AND_WRITE_TO_DISK)) { 500 if (processingFlags.contains(ImageTaskFlags.CREATE_EARLY_FILMSTRIP_PREVIEW)) { 501 // Request job that creates both filmstrip thumbnail from YUV, 502 // JPEG compression of the YUV Image, and writes the result to 503 // disk 504 tasksToExecute.add(new TaskPreviewChainedJpeg(img, executor, this, session, 505 FILMSTRIP_THUMBNAIL_TARGET_SIZE, mByteBufferDirectPool)); 506 } else { 507 // Request job that only does JPEG compression and writes the 508 // result to disk 509 tasksToExecute.add(new TaskCompressImageToJpeg(img, executor, this, session, 510 mByteBufferDirectPool)); 511 } 512 } 513 514 if (processingFlags.contains(ImageTaskFlags.CONVERT_TO_RGB_PREVIEW)) { 515 // Add an additional type of task to the appropriate queue. 516 tasksToExecute.add(new TaskConvertImageToRGBPreview(img, executor, 517 this, TaskImageContainer.ProcessingPriority.FAST, session, 518 mTinyThumbnailTargetSize, 519 TaskConvertImageToRGBPreview.ThumbnailShape.SQUARE_ASPECT_CIRCULAR_INSET)); 520 } 521 522 // Wrap the listener in a runnable that will be fired when all tasks are 523 // complete. 524 final Optional<Runnable> runnableOptional; 525 if (imageProcessorListener.isPresent()) { 526 final ImageProcessorListener finalImageProcessorListener = imageProcessorListener.get(); 527 Runnable unregisterRunnable = new Runnable() { 528 @Override 529 public void run() { 530 getProxyListener().unregisterListener(finalImageProcessorListener); 531 } 532 }; 533 runnableOptional = Optional.of(unregisterRunnable); 534 } else { 535 runnableOptional = Optional.<Runnable> absent(); 536 } 537 538 if (receiveImage(img, tasksToExecute, 539 processingFlags.contains(ImageTaskFlags.BLOCK_UNTIL_ALL_TASKS_RELEASE), 540 processingFlags.contains(ImageTaskFlags.CLOSE_ON_ALL_TASKS_RELEASE), 541 runnableOptional)) { 542 if (imageProcessorListener.isPresent()) { 543 getProxyListener().registerListener(imageProcessorListener.get(), img.proxy); 544 } 545 return true; 546 } else { 547 return false; 548 } 549 } 550 551 /** 552 * Factory functions, in case, you want some shake and bake functionality. 553 */ createTaskConvertImageToRGBPreview( ImageToProcess image, Executor executor, ImageBackend imageBackend, CaptureSession session, Size targetSize, TaskConvertImageToRGBPreview.ThumbnailShape thumbnailShape)554 public TaskConvertImageToRGBPreview createTaskConvertImageToRGBPreview( 555 ImageToProcess image, Executor executor, ImageBackend imageBackend, 556 CaptureSession session, Size targetSize, 557 TaskConvertImageToRGBPreview.ThumbnailShape thumbnailShape) { 558 return new TaskConvertImageToRGBPreview(image, executor, imageBackend, 559 TaskImageContainer.ProcessingPriority.FAST, session, 560 mTinyThumbnailTargetSize, thumbnailShape); 561 } 562 createTaskCompressImageToJpeg(ImageToProcess image, Executor executor, ImageBackend imageBackend, CaptureSession session)563 public TaskCompressImageToJpeg createTaskCompressImageToJpeg(ImageToProcess image, 564 Executor executor, ImageBackend imageBackend, CaptureSession session) { 565 return new TaskCompressImageToJpeg(image, executor, imageBackend, session, 566 mByteBufferDirectPool); 567 } 568 569 /** 570 * Blocks and waits for all tasks to complete. 571 */ 572 @Override shutdown()573 public void shutdown() { 574 mThreadPoolSlow.shutdown(); 575 mThreadPoolFast.shutdown(); 576 } 577 578 /** 579 * For a given set of starting tasks, initialize the associated sessions 580 * with a proper blocking semaphore and value of number of tasks to be run. 581 * For each semaphore, a ImageShadowTask will be instantiated and enqueued 582 * onto the selected ProcessingSerivceManager. 583 * 584 * @param tasks The set of ImageContainer tasks to be run on ImageBackend 585 */ initializeTaskDone(Set<TaskImageContainer> tasks, Optional<Runnable> runnableWhenDone)586 protected void initializeTaskDone(Set<TaskImageContainer> tasks, 587 Optional<Runnable> runnableWhenDone) { 588 Set<CaptureSession> sessionSet = new HashSet<>(); 589 Map<CaptureSession, Integer> sessionTaskCount = new HashMap<>(); 590 591 // Create a set w/ no session duplicates and count them 592 for (TaskImageContainer task : tasks) { 593 sessionSet.add(task.mSession); 594 Integer currentCount = sessionTaskCount.get(task.mSession); 595 if (currentCount == null) { 596 sessionTaskCount.put(task.mSession, 1); 597 } else { 598 sessionTaskCount.put(task.mSession, currentCount + 1); 599 } 600 } 601 602 // Create a new blocking semaphore for each set of tasks on a given 603 // session. 604 synchronized (mShadowTaskMap) { 605 for (CaptureSession captureSession : sessionSet) { 606 BlockSignalProtocol protocol = new BlockSignalProtocol(); 607 protocol.setCount(sessionTaskCount.get(captureSession)); 608 final ImageShadowTask shadowTask; 609 shadowTask = new ImageShadowTask(protocol, captureSession, 610 runnableWhenDone); 611 mShadowTaskMap.put(captureSession, shadowTask); 612 mProcessingTaskConsumer.enqueueTask(shadowTask); 613 } 614 } 615 } 616 617 /** 618 * For ImageBackend tasks that spawn their own tasks, increase the semaphore 619 * count to take into account the new tasks being spawned. 620 * 621 * @param tasks The set of tasks to be spawned. 622 */ incrementTaskDone(Set<TaskImageContainer> tasks)623 protected void incrementTaskDone(Set<TaskImageContainer> tasks) throws RuntimeException { 624 // TODO: Add invariant test so that all sessions are the same. 625 synchronized (mShadowTaskMap) { 626 for (TaskImageContainer task : tasks) { 627 ImageShadowTask shadowTask = mShadowTaskMap.get(task.mSession); 628 if (shadowTask == null) { 629 throw new RuntimeException( 630 "Session NOT previously registered." 631 + " ImageShadowTask booking-keeping is incorrect."); 632 } 633 shadowTask.getProtocol().addCount(1); 634 } 635 } 636 } 637 638 /** 639 * Decrement the semaphore count of the ImageShadowTask. Should be called 640 * when a task completes its processing in ImageBackend. 641 * 642 * @param imageShadowTask The ImageShadow task that contains the blocking 643 * semaphore. 644 * @return whether all the tasks associated with an ImageShadowTask are done 645 */ decrementTaskDone(ImageShadowTask imageShadowTask)646 protected boolean decrementTaskDone(ImageShadowTask imageShadowTask) { 647 synchronized (mShadowTaskMap) { 648 int remainingTasks = imageShadowTask.getProtocol().addCount(-1); 649 if (remainingTasks == 0) { 650 mShadowTaskMap.remove(imageShadowTask.getSession()); 651 imageShadowTask.getProtocol().signal(); 652 return true; 653 } else { 654 return false; 655 } 656 } 657 658 } 659 660 /** 661 * Puts the tasks on the specified queue. May be more complicated in the 662 * future. 663 * 664 * @param tasks The set of tasks to be run 665 */ scheduleTasks(Set<TaskImageContainer> tasks)666 protected void scheduleTasks(Set<TaskImageContainer> tasks) { 667 synchronized (mShadowTaskMap) { 668 for (TaskImageContainer task : tasks) { 669 ImageShadowTask shadowTask = mShadowTaskMap.get(task.mSession); 670 if (shadowTask == null) { 671 throw new IllegalStateException("Scheduling a task with a unknown session."); 672 } 673 // Before scheduling, wrap TaskImageContainer inside of the 674 // TaskDoneWrapper to add 675 // instrumentation for managing ImageShadowTasks 676 switch (task.getProcessingPriority()) { 677 case FAST: 678 mThreadPoolFast.execute(new TaskDoneWrapper(this, shadowTask, task)); 679 break; 680 case AVERAGE: 681 mThreadPoolAverage.execute(new TaskDoneWrapper(this, shadowTask, task)); 682 break; 683 case SLOW: 684 mThreadPoolSlow.execute(new TaskDoneWrapper(this, shadowTask, task)); 685 break; 686 default: 687 mThreadPoolSlow.execute(new TaskDoneWrapper(this, shadowTask, task)); 688 break; 689 } 690 } 691 } 692 } 693 694 /** 695 * Initializes the semaphore count for the image 696 * 697 * @return The protocol object that keeps tracks of the image reference 698 * count and actions to be taken on release. 699 */ setSemaphoreReferenceCount(ImageToProcess img, int count, boolean blockUntilRelease, boolean closeOnRelease)700 protected ImageReleaseProtocol setSemaphoreReferenceCount(ImageToProcess img, int count, 701 boolean blockUntilRelease, boolean closeOnRelease) throws RuntimeException { 702 synchronized (mImageSemaphoreMap) { 703 if (mImageSemaphoreMap.get(img) != null) { 704 throw new RuntimeException( 705 "ERROR: Rewriting of Semaphore Lock." 706 + " Image references may not freed properly"); 707 } 708 709 // Create the new booking-keeping object. 710 ImageReleaseProtocol protocol = new ImageReleaseProtocol(blockUntilRelease, 711 closeOnRelease); 712 protocol.setCount(count); 713 714 mImageSemaphoreMap.put(img, protocol); 715 mOutstandingImageRefs += count; 716 mOutstandingImageOpened++; 717 logWrapper("Received an opened image: " + mOutstandingImageOpened + "/" 718 + mOutstandingImageClosed); 719 logWrapper("Setting an image reference count of " + count + " Total refs = " 720 + mOutstandingImageRefs); 721 return protocol; 722 } 723 } 724 725 /** 726 * Increments the semaphore count for the image. Should ONLY be internally 727 * via appendTasks by internal tasks. Otherwise, image references could get 728 * out of whack. 729 * 730 * @param img The Image associated with the set of tasks running on it. 731 * @param count The number of tasks to be added 732 * @throws RuntimeException Indicates image Closing Bookkeeping is screwed 733 * up. 734 */ incrementSemaphoreReferenceCount(ImageToProcess img, int count)735 protected void incrementSemaphoreReferenceCount(ImageToProcess img, int count) 736 throws RuntimeException { 737 synchronized (mImageSemaphoreMap) { 738 ImageReleaseProtocol protocol = mImageSemaphoreMap.get(img); 739 if (mImageSemaphoreMap.get(img) == null) { 740 throw new RuntimeException( 741 "Image Reference has already been released or has never been held."); 742 } 743 744 protocol.addCount(count); 745 mImageSemaphoreMap.put(img, protocol); 746 747 mOutstandingImageRefs += count; 748 } 749 } 750 751 /** 752 * Close an Image with a executor if it's available and does the proper 753 * booking keeping on the object. 754 * 755 * @param img Image to be closed 756 * @param executor Executor to be used, if executor is null, the close is 757 * run on the task thread 758 */ closeImageExecutorSafe(final ImageToProcess img, Executor executor)759 private void closeImageExecutorSafe(final ImageToProcess img, Executor executor) { 760 Runnable closeTask = new Runnable() { 761 @Override 762 public void run() { 763 img.proxy.close(); 764 mOutstandingImageClosed++; 765 logWrapper("Release of image occurred. Good fun. " + "Total Images Open/Closed = " 766 + mOutstandingImageOpened + "/" + mOutstandingImageClosed); 767 } 768 }; 769 if (executor == null) { 770 // Just run it on the main thread. 771 closeTask.run(); 772 } else { 773 executor.execute(closeTask); 774 } 775 } 776 777 /** 778 * Calculates the number of new Image references in a set of dependent 779 * tasks. Checks to make sure no new image references are being introduced. 780 * 781 * @param tasks The set of dependent tasks to be run 782 */ numPropagatedImageReferences(ImageToProcess img, Set<TaskImageContainer> tasks)783 private int numPropagatedImageReferences(ImageToProcess img, Set<TaskImageContainer> tasks) 784 throws RuntimeException { 785 int countImageRefs = 0; 786 for (TaskImageContainer task : tasks) { 787 if (task.mImage != null && task.mImage != img) { 788 throw new RuntimeException("ERROR: Spawned tasks cannot reference new images!"); 789 } 790 791 if (task.mImage != null) { 792 countImageRefs++; 793 } 794 } 795 796 return countImageRefs; 797 } 798 799 /** 800 * Simple wrapper task to instrument when tasks ends so that ImageBackend 801 * can fire events when set of tasks created by a ReceiveImage call have all 802 * completed. 803 */ 804 private class TaskDoneWrapper implements Runnable { 805 private final ImageBackend mImageBackend; 806 private final ImageShadowTask mImageShadowTask; 807 private final TaskImageContainer mWrappedTask; 808 809 /** 810 * Constructor 811 * 812 * @param imageBackend ImageBackend that the task is running on 813 * @param imageShadowTask ImageShadowTask that is blocking on the 814 * completion of the task 815 * @param wrappedTask The task to be run w/o instrumentation 816 */ TaskDoneWrapper(ImageBackend imageBackend, ImageShadowTask imageShadowTask, TaskImageContainer wrappedTask)817 public TaskDoneWrapper(ImageBackend imageBackend, ImageShadowTask imageShadowTask, 818 TaskImageContainer wrappedTask) { 819 mImageBackend = imageBackend; 820 mImageShadowTask = imageShadowTask; 821 mWrappedTask = wrappedTask; 822 } 823 824 /** 825 * Adds instrumentation that runs when a TaskImageContainer completes. 826 */ 827 @Override run()828 public void run() { 829 mWrappedTask.run(); 830 // Decrement count 831 if (mImageBackend.decrementTaskDone(mImageShadowTask)) { 832 // If you're the last one... 833 Runnable doneRunnable = mImageShadowTask.getRunnableWhenDone(); 834 if (doneRunnable != null) { 835 if (mWrappedTask.mExecutor == null) { 836 doneRunnable.run(); 837 } else { 838 mWrappedTask.mExecutor.execute(doneRunnable); 839 } 840 } 841 } 842 } 843 } 844 845 /** 846 * Encapsulates all synchronization for semaphore signaling and blocking. 847 */ 848 static public class BlockSignalProtocol { 849 private int count; 850 851 private final ReentrantLock mLock = new ReentrantLock(); 852 853 private Condition mSignal; 854 setCount(int value)855 public void setCount(int value) { 856 mLock.lock(); 857 count = value; 858 mLock.unlock(); 859 } 860 getCount()861 public int getCount() { 862 int value; 863 mLock.lock(); 864 value = count; 865 mLock.unlock(); 866 return value; 867 } 868 addCount(int value)869 public int addCount(int value) { 870 mLock.lock(); 871 try { 872 count += value; 873 return count; 874 } finally { 875 mLock.unlock(); 876 } 877 } 878 BlockSignalProtocol()879 BlockSignalProtocol() { 880 count = 0; 881 mSignal = mLock.newCondition(); 882 } 883 block()884 public void block() throws InterruptedException { 885 mLock.lock(); 886 try { 887 while (count != 0) { 888 // Spin to deal with spurious signals. 889 mSignal.await(); 890 } 891 } catch (InterruptedException e) { 892 // TODO: on interruption, figure out what to do. 893 throw (e); 894 } finally { 895 mLock.unlock(); 896 } 897 } 898 signal()899 public void signal() { 900 mLock.lock(); 901 mSignal.signal(); 902 mLock.unlock(); 903 } 904 905 } 906 907 /** 908 * A simple tuple class to keep track of image reference, and whether to 909 * block and/or close on final image release. Instantiated on every task 910 * submission call. 911 */ 912 static public class ImageReleaseProtocol extends BlockSignalProtocol { 913 914 public final boolean blockUntilRelease; 915 916 public final boolean closeOnRelease; 917 ImageReleaseProtocol(boolean block, boolean close)918 ImageReleaseProtocol(boolean block, boolean close) { 919 super(); 920 blockUntilRelease = block; 921 closeOnRelease = close; 922 } 923 924 } 925 926 // Thread factories for a default constructor 927 private class FastThreadFactory implements ThreadFactory { 928 @Override newThread(Runnable r)929 public Thread newThread(Runnable r) { 930 Thread t = new AndroidPriorityThread(FAST_THREAD_PRIORITY, r); 931 return t; 932 } 933 } 934 935 private class AverageThreadFactory implements ThreadFactory { 936 @Override newThread(Runnable r)937 public Thread newThread(Runnable r) { 938 Thread t = new AndroidPriorityThread(AVERAGE_THREAD_PRIORITY, r); 939 return t; 940 } 941 } 942 943 private class SlowThreadFactory implements ThreadFactory { 944 @Override newThread(Runnable r)945 public Thread newThread(Runnable r) { 946 Thread t = new AndroidPriorityThread(SLOW_THREAD_PRIORITY, r); 947 return t; 948 } 949 } 950 951 } 952