1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7 package java.util.concurrent; 8 9 import static java.util.concurrent.TimeUnit.MILLISECONDS; 10 import static java.util.concurrent.TimeUnit.NANOSECONDS; 11 12 import java.util.AbstractQueue; 13 import java.util.Arrays; 14 import java.util.Collection; 15 import java.util.Iterator; 16 import java.util.List; 17 import java.util.NoSuchElementException; 18 import java.util.concurrent.atomic.AtomicLong; 19 import java.util.concurrent.locks.Condition; 20 import java.util.concurrent.locks.ReentrantLock; 21 22 // BEGIN android-note 23 // omit class-level docs on setRemoveOnCancelPolicy() 24 // END android-note 25 26 /** 27 * A {@link ThreadPoolExecutor} that can additionally schedule 28 * commands to run after a given delay, or to execute periodically. 29 * This class is preferable to {@link java.util.Timer} when multiple 30 * worker threads are needed, or when the additional flexibility or 31 * capabilities of {@link ThreadPoolExecutor} (which this class 32 * extends) are required. 33 * 34 * <p>Delayed tasks execute no sooner than they are enabled, but 35 * without any real-time guarantees about when, after they are 36 * enabled, they will commence. Tasks scheduled for exactly the same 37 * execution time are enabled in first-in-first-out (FIFO) order of 38 * submission. 39 * 40 * <p>When a submitted task is cancelled before it is run, execution 41 * is suppressed. By default, such a cancelled task is not 42 * automatically removed from the work queue until its delay elapses. 43 * While this enables further inspection and monitoring, it may also 44 * cause unbounded retention of cancelled tasks. 45 * 46 * <p>Successive executions of a periodic task scheduled via 47 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or 48 * {@link #scheduleWithFixedDelay scheduleWithFixedDelay} 49 * do not overlap. While different executions may be performed by 50 * different threads, the effects of prior executions 51 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 52 * those of subsequent ones. 53 * 54 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few 55 * of the inherited tuning methods are not useful for it. In 56 * particular, because it acts as a fixed-sized pool using 57 * {@code corePoolSize} threads and an unbounded queue, adjustments 58 * to {@code maximumPoolSize} have no useful effect. Additionally, it 59 * is almost never a good idea to set {@code corePoolSize} to zero or 60 * use {@code allowCoreThreadTimeOut} because this may leave the pool 61 * without threads to handle tasks once they become eligible to run. 62 * 63 * <p><b>Extension notes:</b> This class overrides the 64 * {@link ThreadPoolExecutor#execute(Runnable) execute} and 65 * {@link AbstractExecutorService#submit(Runnable) submit} 66 * methods to generate internal {@link ScheduledFuture} objects to 67 * control per-task delays and scheduling. To preserve 68 * functionality, any further overrides of these methods in 69 * subclasses must invoke superclass versions, which effectively 70 * disables additional task customization. However, this class 71 * provides alternative protected extension method 72 * {@code decorateTask} (one version each for {@code Runnable} and 73 * {@code Callable}) that can be used to customize the concrete task 74 * types used to execute commands entered via {@code execute}, 75 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate}, 76 * and {@code scheduleWithFixedDelay}. By default, a 77 * {@code ScheduledThreadPoolExecutor} uses a task type extending 78 * {@link FutureTask}. However, this may be modified or replaced using 79 * subclasses of the form: 80 * 81 * <pre> {@code 82 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor { 83 * 84 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... } 85 * 86 * protected <V> RunnableScheduledFuture<V> decorateTask( 87 * Runnable r, RunnableScheduledFuture<V> task) { 88 * return new CustomTask<V>(r, task); 89 * } 90 * 91 * protected <V> RunnableScheduledFuture<V> decorateTask( 92 * Callable<V> c, RunnableScheduledFuture<V> task) { 93 * return new CustomTask<V>(c, task); 94 * } 95 * // ... add constructors, etc. 96 * }}</pre> 97 * 98 * @since 1.5 99 * @author Doug Lea 100 */ 101 public class ScheduledThreadPoolExecutor 102 extends ThreadPoolExecutor 103 implements ScheduledExecutorService { 104 105 /* 106 * This class specializes ThreadPoolExecutor implementation by 107 * 108 * 1. Using a custom task type ScheduledFutureTask, even for tasks 109 * that don't require scheduling because they are submitted 110 * using ExecutorService rather than ScheduledExecutorService 111 * methods, which are treated as tasks with a delay of zero. 112 * 113 * 2. Using a custom queue (DelayedWorkQueue), a variant of 114 * unbounded DelayQueue. The lack of capacity constraint and 115 * the fact that corePoolSize and maximumPoolSize are 116 * effectively identical simplifies some execution mechanics 117 * (see delayedExecute) compared to ThreadPoolExecutor. 118 * 119 * 3. Supporting optional run-after-shutdown parameters, which 120 * leads to overrides of shutdown methods to remove and cancel 121 * tasks that should NOT be run after shutdown, as well as 122 * different recheck logic when task (re)submission overlaps 123 * with a shutdown. 124 * 125 * 4. Task decoration methods to allow interception and 126 * instrumentation, which are needed because subclasses cannot 127 * otherwise override submit methods to get this effect. These 128 * don't have any impact on pool control logic though. 129 */ 130 131 /** 132 * False if should cancel/suppress periodic tasks on shutdown. 133 */ 134 private volatile boolean continueExistingPeriodicTasksAfterShutdown; 135 136 /** 137 * False if should cancel non-periodic tasks on shutdown. 138 */ 139 private volatile boolean executeExistingDelayedTasksAfterShutdown = true; 140 141 /** 142 * True if ScheduledFutureTask.cancel should remove from queue. 143 */ 144 volatile boolean removeOnCancel; 145 146 /** 147 * Sequence number to break scheduling ties, and in turn to 148 * guarantee FIFO order among tied entries. 149 */ 150 private static final AtomicLong sequencer = new AtomicLong(); 151 152 private class ScheduledFutureTask<V> 153 extends FutureTask<V> implements RunnableScheduledFuture<V> { 154 155 /** Sequence number to break ties FIFO */ 156 private final long sequenceNumber; 157 158 /** The nanoTime-based time when the task is enabled to execute. */ 159 private volatile long time; 160 161 /** 162 * Period for repeating tasks, in nanoseconds. 163 * A positive value indicates fixed-rate execution. 164 * A negative value indicates fixed-delay execution. 165 * A value of 0 indicates a non-repeating (one-shot) task. 166 */ 167 private final long period; 168 169 /** The actual task to be re-enqueued by reExecutePeriodic */ 170 RunnableScheduledFuture<V> outerTask = this; 171 172 /** 173 * Index into delay queue, to support faster cancellation. 174 */ 175 int heapIndex; 176 177 /** 178 * Creates a one-shot action with given nanoTime-based trigger time. 179 */ ScheduledFutureTask(Runnable r, V result, long triggerTime, long sequenceNumber)180 ScheduledFutureTask(Runnable r, V result, long triggerTime, 181 long sequenceNumber) { 182 super(r, result); 183 this.time = triggerTime; 184 this.period = 0; 185 this.sequenceNumber = sequenceNumber; 186 } 187 188 /** 189 * Creates a periodic action with given nanoTime-based initial 190 * trigger time and period. 191 */ ScheduledFutureTask(Runnable r, V result, long triggerTime, long period, long sequenceNumber)192 ScheduledFutureTask(Runnable r, V result, long triggerTime, 193 long period, long sequenceNumber) { 194 super(r, result); 195 this.time = triggerTime; 196 this.period = period; 197 this.sequenceNumber = sequenceNumber; 198 } 199 200 /** 201 * Creates a one-shot action with given nanoTime-based trigger time. 202 */ ScheduledFutureTask(Callable<V> callable, long triggerTime, long sequenceNumber)203 ScheduledFutureTask(Callable<V> callable, long triggerTime, 204 long sequenceNumber) { 205 super(callable); 206 this.time = triggerTime; 207 this.period = 0; 208 this.sequenceNumber = sequenceNumber; 209 } 210 getDelay(TimeUnit unit)211 public long getDelay(TimeUnit unit) { 212 return unit.convert(time - System.nanoTime(), NANOSECONDS); 213 } 214 compareTo(Delayed other)215 public int compareTo(Delayed other) { 216 if (other == this) // compare zero if same object 217 return 0; 218 if (other instanceof ScheduledFutureTask) { 219 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; 220 long diff = time - x.time; 221 if (diff < 0) 222 return -1; 223 else if (diff > 0) 224 return 1; 225 else if (sequenceNumber < x.sequenceNumber) 226 return -1; 227 else 228 return 1; 229 } 230 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); 231 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; 232 } 233 234 /** 235 * Returns {@code true} if this is a periodic (not a one-shot) action. 236 * 237 * @return {@code true} if periodic 238 */ isPeriodic()239 public boolean isPeriodic() { 240 return period != 0; 241 } 242 243 /** 244 * Sets the next time to run for a periodic task. 245 */ setNextRunTime()246 private void setNextRunTime() { 247 long p = period; 248 if (p > 0) 249 time += p; 250 else 251 time = triggerTime(-p); 252 } 253 cancel(boolean mayInterruptIfRunning)254 public boolean cancel(boolean mayInterruptIfRunning) { 255 // The racy read of heapIndex below is benign: 256 // if heapIndex < 0, then OOTA guarantees that we have surely 257 // been removed; else we recheck under lock in remove() 258 boolean cancelled = super.cancel(mayInterruptIfRunning); 259 if (cancelled && removeOnCancel && heapIndex >= 0) 260 remove(this); 261 return cancelled; 262 } 263 264 /** 265 * Overrides FutureTask version so as to reset/requeue if periodic. 266 */ run()267 public void run() { 268 boolean periodic = isPeriodic(); 269 if (!canRunInCurrentRunState(periodic)) 270 cancel(false); 271 else if (!periodic) 272 super.run(); 273 else if (super.runAndReset()) { 274 setNextRunTime(); 275 reExecutePeriodic(outerTask); 276 } 277 } 278 } 279 280 /** 281 * Returns true if can run a task given current run state 282 * and run-after-shutdown parameters. 283 * 284 * @param periodic true if this task periodic, false if delayed 285 */ canRunInCurrentRunState(boolean periodic)286 boolean canRunInCurrentRunState(boolean periodic) { 287 return isRunningOrShutdown(periodic ? 288 continueExistingPeriodicTasksAfterShutdown : 289 executeExistingDelayedTasksAfterShutdown); 290 } 291 292 /** 293 * Main execution method for delayed or periodic tasks. If pool 294 * is shut down, rejects the task. Otherwise adds task to queue 295 * and starts a thread, if necessary, to run it. (We cannot 296 * prestart the thread to run the task because the task (probably) 297 * shouldn't be run yet.) If the pool is shut down while the task 298 * is being added, cancel and remove it if required by state and 299 * run-after-shutdown parameters. 300 * 301 * @param task the task 302 */ delayedExecute(RunnableScheduledFuture<?> task)303 private void delayedExecute(RunnableScheduledFuture<?> task) { 304 if (isShutdown()) 305 reject(task); 306 else { 307 super.getQueue().add(task); 308 if (isShutdown() && 309 !canRunInCurrentRunState(task.isPeriodic()) && 310 remove(task)) 311 task.cancel(false); 312 else 313 ensurePrestart(); 314 } 315 } 316 317 /** 318 * Requeues a periodic task unless current run state precludes it. 319 * Same idea as delayedExecute except drops task rather than rejecting. 320 * 321 * @param task the task 322 */ reExecutePeriodic(RunnableScheduledFuture<?> task)323 void reExecutePeriodic(RunnableScheduledFuture<?> task) { 324 if (canRunInCurrentRunState(true)) { 325 super.getQueue().add(task); 326 if (!canRunInCurrentRunState(true) && remove(task)) 327 task.cancel(false); 328 else 329 ensurePrestart(); 330 } 331 } 332 333 /** 334 * Cancels and clears the queue of all tasks that should not be run 335 * due to shutdown policy. Invoked within super.shutdown. 336 */ onShutdown()337 @Override void onShutdown() { 338 BlockingQueue<Runnable> q = super.getQueue(); 339 boolean keepDelayed = 340 getExecuteExistingDelayedTasksAfterShutdownPolicy(); 341 boolean keepPeriodic = 342 getContinueExistingPeriodicTasksAfterShutdownPolicy(); 343 if (!keepDelayed && !keepPeriodic) { 344 for (Object e : q.toArray()) 345 if (e instanceof RunnableScheduledFuture<?>) 346 ((RunnableScheduledFuture<?>) e).cancel(false); 347 q.clear(); 348 } 349 else { 350 // Traverse snapshot to avoid iterator exceptions 351 for (Object e : q.toArray()) { 352 if (e instanceof RunnableScheduledFuture) { 353 RunnableScheduledFuture<?> t = 354 (RunnableScheduledFuture<?>)e; 355 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || 356 t.isCancelled()) { // also remove if already cancelled 357 if (q.remove(t)) 358 t.cancel(false); 359 } 360 } 361 } 362 } 363 tryTerminate(); 364 } 365 366 /** 367 * Modifies or replaces the task used to execute a runnable. 368 * This method can be used to override the concrete 369 * class used for managing internal tasks. 370 * The default implementation simply returns the given task. 371 * 372 * @param runnable the submitted Runnable 373 * @param task the task created to execute the runnable 374 * @param <V> the type of the task's result 375 * @return a task that can execute the runnable 376 * @since 1.6 377 */ decorateTask( Runnable runnable, RunnableScheduledFuture<V> task)378 protected <V> RunnableScheduledFuture<V> decorateTask( 379 Runnable runnable, RunnableScheduledFuture<V> task) { 380 return task; 381 } 382 383 /** 384 * Modifies or replaces the task used to execute a callable. 385 * This method can be used to override the concrete 386 * class used for managing internal tasks. 387 * The default implementation simply returns the given task. 388 * 389 * @param callable the submitted Callable 390 * @param task the task created to execute the callable 391 * @param <V> the type of the task's result 392 * @return a task that can execute the callable 393 * @since 1.6 394 */ decorateTask( Callable<V> callable, RunnableScheduledFuture<V> task)395 protected <V> RunnableScheduledFuture<V> decorateTask( 396 Callable<V> callable, RunnableScheduledFuture<V> task) { 397 return task; 398 } 399 400 /** 401 * The default keep-alive time for pool threads. 402 * 403 * Normally, this value is unused because all pool threads will be 404 * core threads, but if a user creates a pool with a corePoolSize 405 * of zero (against our advice), we keep a thread alive as long as 406 * there are queued tasks. If the keep alive time is zero (the 407 * historic value), we end up hot-spinning in getTask, wasting a 408 * CPU. But on the other hand, if we set the value too high, and 409 * users create a one-shot pool which they don't cleanly shutdown, 410 * the pool's non-daemon threads will prevent JVM termination. A 411 * small but non-zero value (relative to a JVM's lifetime) seems 412 * best. 413 */ 414 private static final long DEFAULT_KEEPALIVE_MILLIS = 10L; 415 416 /** 417 * Creates a new {@code ScheduledThreadPoolExecutor} with the 418 * given core pool size. 419 * 420 * @param corePoolSize the number of threads to keep in the pool, even 421 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 422 * @throws IllegalArgumentException if {@code corePoolSize < 0} 423 */ ScheduledThreadPoolExecutor(int corePoolSize)424 public ScheduledThreadPoolExecutor(int corePoolSize) { 425 super(corePoolSize, Integer.MAX_VALUE, 426 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, 427 new DelayedWorkQueue()); 428 } 429 430 /** 431 * Creates a new {@code ScheduledThreadPoolExecutor} with the 432 * given initial parameters. 433 * 434 * @param corePoolSize the number of threads to keep in the pool, even 435 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 436 * @param threadFactory the factory to use when the executor 437 * creates a new thread 438 * @throws IllegalArgumentException if {@code corePoolSize < 0} 439 * @throws NullPointerException if {@code threadFactory} is null 440 */ ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)441 public ScheduledThreadPoolExecutor(int corePoolSize, 442 ThreadFactory threadFactory) { 443 super(corePoolSize, Integer.MAX_VALUE, 444 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, 445 new DelayedWorkQueue(), threadFactory); 446 } 447 448 /** 449 * Creates a new {@code ScheduledThreadPoolExecutor} with the 450 * given initial parameters. 451 * 452 * @param corePoolSize the number of threads to keep in the pool, even 453 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 454 * @param handler the handler to use when execution is blocked 455 * because the thread bounds and queue capacities are reached 456 * @throws IllegalArgumentException if {@code corePoolSize < 0} 457 * @throws NullPointerException if {@code handler} is null 458 */ ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)459 public ScheduledThreadPoolExecutor(int corePoolSize, 460 RejectedExecutionHandler handler) { 461 super(corePoolSize, Integer.MAX_VALUE, 462 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, 463 new DelayedWorkQueue(), handler); 464 } 465 466 /** 467 * Creates a new {@code ScheduledThreadPoolExecutor} with the 468 * given initial parameters. 469 * 470 * @param corePoolSize the number of threads to keep in the pool, even 471 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 472 * @param threadFactory the factory to use when the executor 473 * creates a new thread 474 * @param handler the handler to use when execution is blocked 475 * because the thread bounds and queue capacities are reached 476 * @throws IllegalArgumentException if {@code corePoolSize < 0} 477 * @throws NullPointerException if {@code threadFactory} or 478 * {@code handler} is null 479 */ ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)480 public ScheduledThreadPoolExecutor(int corePoolSize, 481 ThreadFactory threadFactory, 482 RejectedExecutionHandler handler) { 483 super(corePoolSize, Integer.MAX_VALUE, 484 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, 485 new DelayedWorkQueue(), threadFactory, handler); 486 } 487 488 /** 489 * Returns the nanoTime-based trigger time of a delayed action. 490 */ triggerTime(long delay, TimeUnit unit)491 private long triggerTime(long delay, TimeUnit unit) { 492 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); 493 } 494 495 /** 496 * Returns the nanoTime-based trigger time of a delayed action. 497 */ triggerTime(long delay)498 long triggerTime(long delay) { 499 return System.nanoTime() + 500 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); 501 } 502 503 /** 504 * Constrains the values of all delays in the queue to be within 505 * Long.MAX_VALUE of each other, to avoid overflow in compareTo. 506 * This may occur if a task is eligible to be dequeued, but has 507 * not yet been, while some other task is added with a delay of 508 * Long.MAX_VALUE. 509 */ overflowFree(long delay)510 private long overflowFree(long delay) { 511 Delayed head = (Delayed) super.getQueue().peek(); 512 if (head != null) { 513 long headDelay = head.getDelay(NANOSECONDS); 514 if (headDelay < 0 && (delay - headDelay < 0)) 515 delay = Long.MAX_VALUE + headDelay; 516 } 517 return delay; 518 } 519 520 /** 521 * @throws RejectedExecutionException {@inheritDoc} 522 * @throws NullPointerException {@inheritDoc} 523 */ schedule(Runnable command, long delay, TimeUnit unit)524 public ScheduledFuture<?> schedule(Runnable command, 525 long delay, 526 TimeUnit unit) { 527 if (command == null || unit == null) 528 throw new NullPointerException(); 529 RunnableScheduledFuture<Void> t = decorateTask(command, 530 new ScheduledFutureTask<Void>(command, null, 531 triggerTime(delay, unit), 532 sequencer.getAndIncrement())); 533 delayedExecute(t); 534 return t; 535 } 536 537 /** 538 * @throws RejectedExecutionException {@inheritDoc} 539 * @throws NullPointerException {@inheritDoc} 540 */ schedule(Callable<V> callable, long delay, TimeUnit unit)541 public <V> ScheduledFuture<V> schedule(Callable<V> callable, 542 long delay, 543 TimeUnit unit) { 544 if (callable == null || unit == null) 545 throw new NullPointerException(); 546 RunnableScheduledFuture<V> t = decorateTask(callable, 547 new ScheduledFutureTask<V>(callable, 548 triggerTime(delay, unit), 549 sequencer.getAndIncrement())); 550 delayedExecute(t); 551 return t; 552 } 553 554 /** 555 * @throws RejectedExecutionException {@inheritDoc} 556 * @throws NullPointerException {@inheritDoc} 557 * @throws IllegalArgumentException {@inheritDoc} 558 */ scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)559 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, 560 long initialDelay, 561 long period, 562 TimeUnit unit) { 563 if (command == null || unit == null) 564 throw new NullPointerException(); 565 if (period <= 0L) 566 throw new IllegalArgumentException(); 567 ScheduledFutureTask<Void> sft = 568 new ScheduledFutureTask<Void>(command, 569 null, 570 triggerTime(initialDelay, unit), 571 unit.toNanos(period), 572 sequencer.getAndIncrement()); 573 RunnableScheduledFuture<Void> t = decorateTask(command, sft); 574 sft.outerTask = t; 575 delayedExecute(t); 576 return t; 577 } 578 579 /** 580 * @throws RejectedExecutionException {@inheritDoc} 581 * @throws NullPointerException {@inheritDoc} 582 * @throws IllegalArgumentException {@inheritDoc} 583 */ scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)584 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 585 long initialDelay, 586 long delay, 587 TimeUnit unit) { 588 if (command == null || unit == null) 589 throw new NullPointerException(); 590 if (delay <= 0L) 591 throw new IllegalArgumentException(); 592 ScheduledFutureTask<Void> sft = 593 new ScheduledFutureTask<Void>(command, 594 null, 595 triggerTime(initialDelay, unit), 596 -unit.toNanos(delay), 597 sequencer.getAndIncrement()); 598 RunnableScheduledFuture<Void> t = decorateTask(command, sft); 599 sft.outerTask = t; 600 delayedExecute(t); 601 return t; 602 } 603 604 /** 605 * Executes {@code command} with zero required delay. 606 * This has effect equivalent to 607 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}. 608 * Note that inspections of the queue and of the list returned by 609 * {@code shutdownNow} will access the zero-delayed 610 * {@link ScheduledFuture}, not the {@code command} itself. 611 * 612 * <p>A consequence of the use of {@code ScheduledFuture} objects is 613 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always 614 * called with a null second {@code Throwable} argument, even if the 615 * {@code command} terminated abruptly. Instead, the {@code Throwable} 616 * thrown by such a task can be obtained via {@link Future#get}. 617 * 618 * @throws RejectedExecutionException at discretion of 619 * {@code RejectedExecutionHandler}, if the task 620 * cannot be accepted for execution because the 621 * executor has been shut down 622 * @throws NullPointerException {@inheritDoc} 623 */ execute(Runnable command)624 public void execute(Runnable command) { 625 schedule(command, 0, NANOSECONDS); 626 } 627 628 // Override AbstractExecutorService methods 629 630 /** 631 * @throws RejectedExecutionException {@inheritDoc} 632 * @throws NullPointerException {@inheritDoc} 633 */ submit(Runnable task)634 public Future<?> submit(Runnable task) { 635 return schedule(task, 0, NANOSECONDS); 636 } 637 638 /** 639 * @throws RejectedExecutionException {@inheritDoc} 640 * @throws NullPointerException {@inheritDoc} 641 */ submit(Runnable task, T result)642 public <T> Future<T> submit(Runnable task, T result) { 643 return schedule(Executors.callable(task, result), 0, NANOSECONDS); 644 } 645 646 /** 647 * @throws RejectedExecutionException {@inheritDoc} 648 * @throws NullPointerException {@inheritDoc} 649 */ submit(Callable<T> task)650 public <T> Future<T> submit(Callable<T> task) { 651 return schedule(task, 0, NANOSECONDS); 652 } 653 654 /** 655 * Sets the policy on whether to continue executing existing 656 * periodic tasks even when this executor has been {@code shutdown}. 657 * In this case, these tasks will only terminate upon 658 * {@code shutdownNow} or after setting the policy to 659 * {@code false} when already shutdown. 660 * This value is by default {@code false}. 661 * 662 * @param value if {@code true}, continue after shutdown, else don't 663 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy 664 */ setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)665 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) { 666 continueExistingPeriodicTasksAfterShutdown = value; 667 if (!value && isShutdown()) 668 onShutdown(); 669 } 670 671 /** 672 * Gets the policy on whether to continue executing existing 673 * periodic tasks even when this executor has been {@code shutdown}. 674 * In this case, these tasks will only terminate upon 675 * {@code shutdownNow} or after setting the policy to 676 * {@code false} when already shutdown. 677 * This value is by default {@code false}. 678 * 679 * @return {@code true} if will continue after shutdown 680 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy 681 */ getContinueExistingPeriodicTasksAfterShutdownPolicy()682 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() { 683 return continueExistingPeriodicTasksAfterShutdown; 684 } 685 686 /** 687 * Sets the policy on whether to execute existing delayed 688 * tasks even when this executor has been {@code shutdown}. 689 * In this case, these tasks will only terminate upon 690 * {@code shutdownNow}, or after setting the policy to 691 * {@code false} when already shutdown. 692 * This value is by default {@code true}. 693 * 694 * @param value if {@code true}, execute after shutdown, else don't 695 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy 696 */ setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value)697 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) { 698 executeExistingDelayedTasksAfterShutdown = value; 699 if (!value && isShutdown()) 700 onShutdown(); 701 } 702 703 /** 704 * Gets the policy on whether to execute existing delayed 705 * tasks even when this executor has been {@code shutdown}. 706 * In this case, these tasks will only terminate upon 707 * {@code shutdownNow}, or after setting the policy to 708 * {@code false} when already shutdown. 709 * This value is by default {@code true}. 710 * 711 * @return {@code true} if will execute after shutdown 712 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy 713 */ getExecuteExistingDelayedTasksAfterShutdownPolicy()714 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() { 715 return executeExistingDelayedTasksAfterShutdown; 716 } 717 718 /** 719 * Sets the policy on whether cancelled tasks should be immediately 720 * removed from the work queue at time of cancellation. This value is 721 * by default {@code false}. 722 * 723 * @param value if {@code true}, remove on cancellation, else don't 724 * @see #getRemoveOnCancelPolicy 725 * @since 1.7 726 */ setRemoveOnCancelPolicy(boolean value)727 public void setRemoveOnCancelPolicy(boolean value) { 728 removeOnCancel = value; 729 } 730 731 /** 732 * Gets the policy on whether cancelled tasks should be immediately 733 * removed from the work queue at time of cancellation. This value is 734 * by default {@code false}. 735 * 736 * @return {@code true} if cancelled tasks are immediately removed 737 * from the queue 738 * @see #setRemoveOnCancelPolicy 739 * @since 1.7 740 */ getRemoveOnCancelPolicy()741 public boolean getRemoveOnCancelPolicy() { 742 return removeOnCancel; 743 } 744 745 /** 746 * Initiates an orderly shutdown in which previously submitted 747 * tasks are executed, but no new tasks will be accepted. 748 * Invocation has no additional effect if already shut down. 749 * 750 * <p>This method does not wait for previously submitted tasks to 751 * complete execution. Use {@link #awaitTermination awaitTermination} 752 * to do that. 753 * 754 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} 755 * has been set {@code false}, existing delayed tasks whose delays 756 * have not yet elapsed are cancelled. And unless the {@code 757 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set 758 * {@code true}, future executions of existing periodic tasks will 759 * be cancelled. 760 */ 761 // android-note: Removed "throws SecurityException" doc. shutdown()762 public void shutdown() { 763 super.shutdown(); 764 } 765 766 /** 767 * Attempts to stop all actively executing tasks, halts the 768 * processing of waiting tasks, and returns a list of the tasks 769 * that were awaiting execution. These tasks are drained (removed) 770 * from the task queue upon return from this method. 771 * 772 * <p>This method does not wait for actively executing tasks to 773 * terminate. Use {@link #awaitTermination awaitTermination} to 774 * do that. 775 * 776 * <p>There are no guarantees beyond best-effort attempts to stop 777 * processing actively executing tasks. This implementation 778 * interrupts tasks via {@link Thread#interrupt}; any task that 779 * fails to respond to interrupts may never terminate. 780 * 781 * @return list of tasks that never commenced execution. 782 * Each element of this list is a {@link ScheduledFuture}. 783 * For tasks submitted via one of the {@code schedule} 784 * methods, the element will be identical to the returned 785 * {@code ScheduledFuture}. For tasks submitted using 786 * {@link #execute execute}, the element will be a 787 * zero-delay {@code ScheduledFuture}. 788 */ 789 // android-note: Removed "throws SecurityException" doc. shutdownNow()790 public List<Runnable> shutdownNow() { 791 return super.shutdownNow(); 792 } 793 794 /** 795 * Returns the task queue used by this executor. Access to the 796 * task queue is intended primarily for debugging and monitoring. 797 * This queue may be in active use. Retrieving the task queue 798 * does not prevent queued tasks from executing. 799 * 800 * <p>Each element of this queue is a {@link ScheduledFuture}. 801 * For tasks submitted via one of the {@code schedule} methods, the 802 * element will be identical to the returned {@code ScheduledFuture}. 803 * For tasks submitted using {@link #execute execute}, the element 804 * will be a zero-delay {@code ScheduledFuture}. 805 * 806 * <p>Iteration over this queue is <em>not</em> guaranteed to traverse 807 * tasks in the order in which they will execute. 808 * 809 * @return the task queue 810 */ getQueue()811 public BlockingQueue<Runnable> getQueue() { 812 return super.getQueue(); 813 } 814 815 /** 816 * Specialized delay queue. To mesh with TPE declarations, this 817 * class must be declared as a BlockingQueue<Runnable> even though 818 * it can only hold RunnableScheduledFutures. 819 */ 820 static class DelayedWorkQueue extends AbstractQueue<Runnable> 821 implements BlockingQueue<Runnable> { 822 823 /* 824 * A DelayedWorkQueue is based on a heap-based data structure 825 * like those in DelayQueue and PriorityQueue, except that 826 * every ScheduledFutureTask also records its index into the 827 * heap array. This eliminates the need to find a task upon 828 * cancellation, greatly speeding up removal (down from O(n) 829 * to O(log n)), and reducing garbage retention that would 830 * otherwise occur by waiting for the element to rise to top 831 * before clearing. But because the queue may also hold 832 * RunnableScheduledFutures that are not ScheduledFutureTasks, 833 * we are not guaranteed to have such indices available, in 834 * which case we fall back to linear search. (We expect that 835 * most tasks will not be decorated, and that the faster cases 836 * will be much more common.) 837 * 838 * All heap operations must record index changes -- mainly 839 * within siftUp and siftDown. Upon removal, a task's 840 * heapIndex is set to -1. Note that ScheduledFutureTasks can 841 * appear at most once in the queue (this need not be true for 842 * other kinds of tasks or work queues), so are uniquely 843 * identified by heapIndex. 844 */ 845 846 private static final int INITIAL_CAPACITY = 16; 847 private RunnableScheduledFuture<?>[] queue = 848 new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; 849 private final ReentrantLock lock = new ReentrantLock(); 850 private int size; 851 852 /** 853 * Thread designated to wait for the task at the head of the 854 * queue. This variant of the Leader-Follower pattern 855 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to 856 * minimize unnecessary timed waiting. When a thread becomes 857 * the leader, it waits only for the next delay to elapse, but 858 * other threads await indefinitely. The leader thread must 859 * signal some other thread before returning from take() or 860 * poll(...), unless some other thread becomes leader in the 861 * interim. Whenever the head of the queue is replaced with a 862 * task with an earlier expiration time, the leader field is 863 * invalidated by being reset to null, and some waiting 864 * thread, but not necessarily the current leader, is 865 * signalled. So waiting threads must be prepared to acquire 866 * and lose leadership while waiting. 867 */ 868 private Thread leader; 869 870 /** 871 * Condition signalled when a newer task becomes available at the 872 * head of the queue or a new thread may need to become leader. 873 */ 874 private final Condition available = lock.newCondition(); 875 876 /** 877 * Sets f's heapIndex if it is a ScheduledFutureTask. 878 */ setIndex(RunnableScheduledFuture<?> f, int idx)879 private void setIndex(RunnableScheduledFuture<?> f, int idx) { 880 if (f instanceof ScheduledFutureTask) 881 ((ScheduledFutureTask)f).heapIndex = idx; 882 } 883 884 /** 885 * Sifts element added at bottom up to its heap-ordered spot. 886 * Call only when holding lock. 887 */ siftUp(int k, RunnableScheduledFuture<?> key)888 private void siftUp(int k, RunnableScheduledFuture<?> key) { 889 while (k > 0) { 890 int parent = (k - 1) >>> 1; 891 RunnableScheduledFuture<?> e = queue[parent]; 892 if (key.compareTo(e) >= 0) 893 break; 894 queue[k] = e; 895 setIndex(e, k); 896 k = parent; 897 } 898 queue[k] = key; 899 setIndex(key, k); 900 } 901 902 /** 903 * Sifts element added at top down to its heap-ordered spot. 904 * Call only when holding lock. 905 */ siftDown(int k, RunnableScheduledFuture<?> key)906 private void siftDown(int k, RunnableScheduledFuture<?> key) { 907 int half = size >>> 1; 908 while (k < half) { 909 int child = (k << 1) + 1; 910 RunnableScheduledFuture<?> c = queue[child]; 911 int right = child + 1; 912 if (right < size && c.compareTo(queue[right]) > 0) 913 c = queue[child = right]; 914 if (key.compareTo(c) <= 0) 915 break; 916 queue[k] = c; 917 setIndex(c, k); 918 k = child; 919 } 920 queue[k] = key; 921 setIndex(key, k); 922 } 923 924 /** 925 * Resizes the heap array. Call only when holding lock. 926 */ grow()927 private void grow() { 928 int oldCapacity = queue.length; 929 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% 930 if (newCapacity < 0) // overflow 931 newCapacity = Integer.MAX_VALUE; 932 queue = Arrays.copyOf(queue, newCapacity); 933 } 934 935 /** 936 * Finds index of given object, or -1 if absent. 937 */ indexOf(Object x)938 private int indexOf(Object x) { 939 if (x != null) { 940 if (x instanceof ScheduledFutureTask) { 941 int i = ((ScheduledFutureTask) x).heapIndex; 942 // Sanity check; x could conceivably be a 943 // ScheduledFutureTask from some other pool. 944 if (i >= 0 && i < size && queue[i] == x) 945 return i; 946 } else { 947 for (int i = 0; i < size; i++) 948 if (x.equals(queue[i])) 949 return i; 950 } 951 } 952 return -1; 953 } 954 contains(Object x)955 public boolean contains(Object x) { 956 final ReentrantLock lock = this.lock; 957 lock.lock(); 958 try { 959 return indexOf(x) != -1; 960 } finally { 961 lock.unlock(); 962 } 963 } 964 remove(Object x)965 public boolean remove(Object x) { 966 final ReentrantLock lock = this.lock; 967 lock.lock(); 968 try { 969 int i = indexOf(x); 970 if (i < 0) 971 return false; 972 973 setIndex(queue[i], -1); 974 int s = --size; 975 RunnableScheduledFuture<?> replacement = queue[s]; 976 queue[s] = null; 977 if (s != i) { 978 siftDown(i, replacement); 979 if (queue[i] == replacement) 980 siftUp(i, replacement); 981 } 982 return true; 983 } finally { 984 lock.unlock(); 985 } 986 } 987 size()988 public int size() { 989 final ReentrantLock lock = this.lock; 990 lock.lock(); 991 try { 992 return size; 993 } finally { 994 lock.unlock(); 995 } 996 } 997 isEmpty()998 public boolean isEmpty() { 999 return size() == 0; 1000 } 1001 remainingCapacity()1002 public int remainingCapacity() { 1003 return Integer.MAX_VALUE; 1004 } 1005 peek()1006 public RunnableScheduledFuture<?> peek() { 1007 final ReentrantLock lock = this.lock; 1008 lock.lock(); 1009 try { 1010 return queue[0]; 1011 } finally { 1012 lock.unlock(); 1013 } 1014 } 1015 offer(Runnable x)1016 public boolean offer(Runnable x) { 1017 if (x == null) 1018 throw new NullPointerException(); 1019 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; 1020 final ReentrantLock lock = this.lock; 1021 lock.lock(); 1022 try { 1023 int i = size; 1024 if (i >= queue.length) 1025 grow(); 1026 size = i + 1; 1027 if (i == 0) { 1028 queue[0] = e; 1029 setIndex(e, 0); 1030 } else { 1031 siftUp(i, e); 1032 } 1033 if (queue[0] == e) { 1034 leader = null; 1035 available.signal(); 1036 } 1037 } finally { 1038 lock.unlock(); 1039 } 1040 return true; 1041 } 1042 put(Runnable e)1043 public void put(Runnable e) { 1044 offer(e); 1045 } 1046 add(Runnable e)1047 public boolean add(Runnable e) { 1048 return offer(e); 1049 } 1050 offer(Runnable e, long timeout, TimeUnit unit)1051 public boolean offer(Runnable e, long timeout, TimeUnit unit) { 1052 return offer(e); 1053 } 1054 1055 /** 1056 * Performs common bookkeeping for poll and take: Replaces 1057 * first element with last and sifts it down. Call only when 1058 * holding lock. 1059 * @param f the task to remove and return 1060 */ finishPoll(RunnableScheduledFuture<?> f)1061 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { 1062 int s = --size; 1063 RunnableScheduledFuture<?> x = queue[s]; 1064 queue[s] = null; 1065 if (s != 0) 1066 siftDown(0, x); 1067 setIndex(f, -1); 1068 return f; 1069 } 1070 poll()1071 public RunnableScheduledFuture<?> poll() { 1072 final ReentrantLock lock = this.lock; 1073 lock.lock(); 1074 try { 1075 RunnableScheduledFuture<?> first = queue[0]; 1076 return (first == null || first.getDelay(NANOSECONDS) > 0) 1077 ? null 1078 : finishPoll(first); 1079 } finally { 1080 lock.unlock(); 1081 } 1082 } 1083 take()1084 public RunnableScheduledFuture<?> take() throws InterruptedException { 1085 final ReentrantLock lock = this.lock; 1086 lock.lockInterruptibly(); 1087 try { 1088 for (;;) { 1089 RunnableScheduledFuture<?> first = queue[0]; 1090 if (first == null) 1091 available.await(); 1092 else { 1093 long delay = first.getDelay(NANOSECONDS); 1094 if (delay <= 0L) 1095 return finishPoll(first); 1096 first = null; // don't retain ref while waiting 1097 if (leader != null) 1098 available.await(); 1099 else { 1100 Thread thisThread = Thread.currentThread(); 1101 leader = thisThread; 1102 try { 1103 available.awaitNanos(delay); 1104 } finally { 1105 if (leader == thisThread) 1106 leader = null; 1107 } 1108 } 1109 } 1110 } 1111 } finally { 1112 if (leader == null && queue[0] != null) 1113 available.signal(); 1114 lock.unlock(); 1115 } 1116 } 1117 poll(long timeout, TimeUnit unit)1118 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) 1119 throws InterruptedException { 1120 long nanos = unit.toNanos(timeout); 1121 final ReentrantLock lock = this.lock; 1122 lock.lockInterruptibly(); 1123 try { 1124 for (;;) { 1125 RunnableScheduledFuture<?> first = queue[0]; 1126 if (first == null) { 1127 if (nanos <= 0L) 1128 return null; 1129 else 1130 nanos = available.awaitNanos(nanos); 1131 } else { 1132 long delay = first.getDelay(NANOSECONDS); 1133 if (delay <= 0L) 1134 return finishPoll(first); 1135 if (nanos <= 0L) 1136 return null; 1137 first = null; // don't retain ref while waiting 1138 if (nanos < delay || leader != null) 1139 nanos = available.awaitNanos(nanos); 1140 else { 1141 Thread thisThread = Thread.currentThread(); 1142 leader = thisThread; 1143 try { 1144 long timeLeft = available.awaitNanos(delay); 1145 nanos -= delay - timeLeft; 1146 } finally { 1147 if (leader == thisThread) 1148 leader = null; 1149 } 1150 } 1151 } 1152 } 1153 } finally { 1154 if (leader == null && queue[0] != null) 1155 available.signal(); 1156 lock.unlock(); 1157 } 1158 } 1159 clear()1160 public void clear() { 1161 final ReentrantLock lock = this.lock; 1162 lock.lock(); 1163 try { 1164 for (int i = 0; i < size; i++) { 1165 RunnableScheduledFuture<?> t = queue[i]; 1166 if (t != null) { 1167 queue[i] = null; 1168 setIndex(t, -1); 1169 } 1170 } 1171 size = 0; 1172 } finally { 1173 lock.unlock(); 1174 } 1175 } 1176 1177 /** 1178 * Returns first element only if it is expired. 1179 * Used only by drainTo. Call only when holding lock. 1180 */ peekExpired()1181 private RunnableScheduledFuture<?> peekExpired() { 1182 // assert lock.isHeldByCurrentThread(); 1183 RunnableScheduledFuture<?> first = queue[0]; 1184 return (first == null || first.getDelay(NANOSECONDS) > 0) ? 1185 null : first; 1186 } 1187 drainTo(Collection<? super Runnable> c)1188 public int drainTo(Collection<? super Runnable> c) { 1189 if (c == null) 1190 throw new NullPointerException(); 1191 if (c == this) 1192 throw new IllegalArgumentException(); 1193 final ReentrantLock lock = this.lock; 1194 lock.lock(); 1195 try { 1196 RunnableScheduledFuture<?> first; 1197 int n = 0; 1198 while ((first = peekExpired()) != null) { 1199 c.add(first); // In this order, in case add() throws. 1200 finishPoll(first); 1201 ++n; 1202 } 1203 return n; 1204 } finally { 1205 lock.unlock(); 1206 } 1207 } 1208 drainTo(Collection<? super Runnable> c, int maxElements)1209 public int drainTo(Collection<? super Runnable> c, int maxElements) { 1210 if (c == null) 1211 throw new NullPointerException(); 1212 if (c == this) 1213 throw new IllegalArgumentException(); 1214 if (maxElements <= 0) 1215 return 0; 1216 final ReentrantLock lock = this.lock; 1217 lock.lock(); 1218 try { 1219 RunnableScheduledFuture<?> first; 1220 int n = 0; 1221 while (n < maxElements && (first = peekExpired()) != null) { 1222 c.add(first); // In this order, in case add() throws. 1223 finishPoll(first); 1224 ++n; 1225 } 1226 return n; 1227 } finally { 1228 lock.unlock(); 1229 } 1230 } 1231 toArray()1232 public Object[] toArray() { 1233 final ReentrantLock lock = this.lock; 1234 lock.lock(); 1235 try { 1236 return Arrays.copyOf(queue, size, Object[].class); 1237 } finally { 1238 lock.unlock(); 1239 } 1240 } 1241 1242 @SuppressWarnings("unchecked") toArray(T[] a)1243 public <T> T[] toArray(T[] a) { 1244 final ReentrantLock lock = this.lock; 1245 lock.lock(); 1246 try { 1247 if (a.length < size) 1248 return (T[]) Arrays.copyOf(queue, size, a.getClass()); 1249 System.arraycopy(queue, 0, a, 0, size); 1250 if (a.length > size) 1251 a[size] = null; 1252 return a; 1253 } finally { 1254 lock.unlock(); 1255 } 1256 } 1257 iterator()1258 public Iterator<Runnable> iterator() { 1259 return new Itr(Arrays.copyOf(queue, size)); 1260 } 1261 1262 /** 1263 * Snapshot iterator that works off copy of underlying q array. 1264 */ 1265 private class Itr implements Iterator<Runnable> { 1266 final RunnableScheduledFuture<?>[] array; 1267 int cursor; // index of next element to return; initially 0 1268 int lastRet = -1; // index of last element returned; -1 if no such 1269 Itr(RunnableScheduledFuture<?>[] array)1270 Itr(RunnableScheduledFuture<?>[] array) { 1271 this.array = array; 1272 } 1273 hasNext()1274 public boolean hasNext() { 1275 return cursor < array.length; 1276 } 1277 next()1278 public Runnable next() { 1279 if (cursor >= array.length) 1280 throw new NoSuchElementException(); 1281 lastRet = cursor; 1282 return array[cursor++]; 1283 } 1284 remove()1285 public void remove() { 1286 if (lastRet < 0) 1287 throw new IllegalStateException(); 1288 DelayedWorkQueue.this.remove(array[lastRet]); 1289 lastRet = -1; 1290 } 1291 } 1292 } 1293 } 1294