1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import static java.util.concurrent.TimeUnit.MILLISECONDS; 39 import static java.util.concurrent.TimeUnit.NANOSECONDS; 40 41 import java.util.AbstractQueue; 42 import java.util.Arrays; 43 import java.util.Collection; 44 import java.util.Iterator; 45 import java.util.List; 46 import java.util.NoSuchElementException; 47 import java.util.concurrent.atomic.AtomicLong; 48 import java.util.concurrent.locks.Condition; 49 import java.util.concurrent.locks.ReentrantLock; 50 51 // BEGIN android-note 52 // omit class-level docs on setRemoveOnCancelPolicy() 53 // END android-note 54 55 /** 56 * A {@link ThreadPoolExecutor} that can additionally schedule 57 * commands to run after a given delay, or to execute periodically. 58 * This class is preferable to {@link java.util.Timer} when multiple 59 * worker threads are needed, or when the additional flexibility or 60 * capabilities of {@link ThreadPoolExecutor} (which this class 61 * extends) are required. 62 * 63 * <p>Delayed tasks execute no sooner than they are enabled, but 64 * without any real-time guarantees about when, after they are 65 * enabled, they will commence. Tasks scheduled for exactly the same 66 * execution time are enabled in first-in-first-out (FIFO) order of 67 * submission. 68 * 69 * <p>When a submitted task is cancelled before it is run, execution 70 * is suppressed. By default, such a cancelled task is not 71 * automatically removed from the work queue until its delay elapses. 72 * While this enables further inspection and monitoring, it may also 73 * cause unbounded retention of cancelled tasks. 74 * 75 * <p>Successive executions of a periodic task scheduled via 76 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or 77 * {@link #scheduleWithFixedDelay scheduleWithFixedDelay} 78 * do not overlap. While different executions may be performed by 79 * different threads, the effects of prior executions 80 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 81 * those of subsequent ones. 82 * 83 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few 84 * of the inherited tuning methods are not useful for it. In 85 * particular, because it acts as a fixed-sized pool using 86 * {@code corePoolSize} threads and an unbounded queue, adjustments 87 * to {@code maximumPoolSize} have no useful effect. Additionally, it 88 * is almost never a good idea to set {@code corePoolSize} to zero or 89 * use {@code allowCoreThreadTimeOut} because this may leave the pool 90 * without threads to handle tasks once they become eligible to run. 91 * 92 * <p><b>Extension notes:</b> This class overrides the 93 * {@link ThreadPoolExecutor#execute(Runnable) execute} and 94 * {@link AbstractExecutorService#submit(Runnable) submit} 95 * methods to generate internal {@link ScheduledFuture} objects to 96 * control per-task delays and scheduling. To preserve 97 * functionality, any further overrides of these methods in 98 * subclasses must invoke superclass versions, which effectively 99 * disables additional task customization. However, this class 100 * provides alternative protected extension method 101 * {@code decorateTask} (one version each for {@code Runnable} and 102 * {@code Callable}) that can be used to customize the concrete task 103 * types used to execute commands entered via {@code execute}, 104 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate}, 105 * and {@code scheduleWithFixedDelay}. By default, a 106 * {@code ScheduledThreadPoolExecutor} uses a task type extending 107 * {@link FutureTask}. However, this may be modified or replaced using 108 * subclasses of the form: 109 * 110 * <pre> {@code 111 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor { 112 * 113 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... } 114 * 115 * protected <V> RunnableScheduledFuture<V> decorateTask( 116 * Runnable r, RunnableScheduledFuture<V> task) { 117 * return new CustomTask<V>(r, task); 118 * } 119 * 120 * protected <V> RunnableScheduledFuture<V> decorateTask( 121 * Callable<V> c, RunnableScheduledFuture<V> task) { 122 * return new CustomTask<V>(c, task); 123 * } 124 * // ... add constructors, etc. 125 * }}</pre> 126 * 127 * @since 1.5 128 * @author Doug Lea 129 */ 130 public class ScheduledThreadPoolExecutor 131 extends ThreadPoolExecutor 132 implements ScheduledExecutorService { 133 134 /* 135 * This class specializes ThreadPoolExecutor implementation by 136 * 137 * 1. Using a custom task type ScheduledFutureTask, even for tasks 138 * that don't require scheduling because they are submitted 139 * using ExecutorService rather than ScheduledExecutorService 140 * methods, which are treated as tasks with a delay of zero. 141 * 142 * 2. Using a custom queue (DelayedWorkQueue), a variant of 143 * unbounded DelayQueue. The lack of capacity constraint and 144 * the fact that corePoolSize and maximumPoolSize are 145 * effectively identical simplifies some execution mechanics 146 * (see delayedExecute) compared to ThreadPoolExecutor. 147 * 148 * 3. Supporting optional run-after-shutdown parameters, which 149 * leads to overrides of shutdown methods to remove and cancel 150 * tasks that should NOT be run after shutdown, as well as 151 * different recheck logic when task (re)submission overlaps 152 * with a shutdown. 153 * 154 * 4. Task decoration methods to allow interception and 155 * instrumentation, which are needed because subclasses cannot 156 * otherwise override submit methods to get this effect. These 157 * don't have any impact on pool control logic though. 158 */ 159 160 /** 161 * False if should cancel/suppress periodic tasks on shutdown. 162 */ 163 private volatile boolean continueExistingPeriodicTasksAfterShutdown; 164 165 /** 166 * False if should cancel non-periodic tasks on shutdown. 167 */ 168 private volatile boolean executeExistingDelayedTasksAfterShutdown = true; 169 170 /** 171 * True if ScheduledFutureTask.cancel should remove from queue. 172 */ 173 volatile boolean removeOnCancel; 174 175 /** 176 * Sequence number to break scheduling ties, and in turn to 177 * guarantee FIFO order among tied entries. 178 */ 179 private static final AtomicLong sequencer = new AtomicLong(); 180 181 private class ScheduledFutureTask<V> 182 extends FutureTask<V> implements RunnableScheduledFuture<V> { 183 184 /** Sequence number to break ties FIFO */ 185 private final long sequenceNumber; 186 187 /** The nanoTime-based time when the task is enabled to execute. */ 188 private volatile long time; 189 190 /** 191 * Period for repeating tasks, in nanoseconds. 192 * A positive value indicates fixed-rate execution. 193 * A negative value indicates fixed-delay execution. 194 * A value of 0 indicates a non-repeating (one-shot) task. 195 */ 196 private final long period; 197 198 /** The actual task to be re-enqueued by reExecutePeriodic */ 199 RunnableScheduledFuture<V> outerTask = this; 200 201 /** 202 * Index into delay queue, to support faster cancellation. 203 */ 204 int heapIndex; 205 206 /** 207 * Creates a one-shot action with given nanoTime-based trigger time. 208 */ ScheduledFutureTask(Runnable r, V result, long triggerTime, long sequenceNumber)209 ScheduledFutureTask(Runnable r, V result, long triggerTime, 210 long sequenceNumber) { 211 super(r, result); 212 this.time = triggerTime; 213 this.period = 0; 214 this.sequenceNumber = sequenceNumber; 215 } 216 217 /** 218 * Creates a periodic action with given nanoTime-based initial 219 * trigger time and period. 220 */ ScheduledFutureTask(Runnable r, V result, long triggerTime, long period, long sequenceNumber)221 ScheduledFutureTask(Runnable r, V result, long triggerTime, 222 long period, long sequenceNumber) { 223 super(r, result); 224 this.time = triggerTime; 225 this.period = period; 226 this.sequenceNumber = sequenceNumber; 227 } 228 229 /** 230 * Creates a one-shot action with given nanoTime-based trigger time. 231 */ ScheduledFutureTask(Callable<V> callable, long triggerTime, long sequenceNumber)232 ScheduledFutureTask(Callable<V> callable, long triggerTime, 233 long sequenceNumber) { 234 super(callable); 235 this.time = triggerTime; 236 this.period = 0; 237 this.sequenceNumber = sequenceNumber; 238 } 239 getDelay(TimeUnit unit)240 public long getDelay(TimeUnit unit) { 241 return unit.convert(time - System.nanoTime(), NANOSECONDS); 242 } 243 compareTo(Delayed other)244 public int compareTo(Delayed other) { 245 if (other == this) // compare zero if same object 246 return 0; 247 if (other instanceof ScheduledFutureTask) { 248 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; 249 long diff = time - x.time; 250 if (diff < 0) 251 return -1; 252 else if (diff > 0) 253 return 1; 254 else if (sequenceNumber < x.sequenceNumber) 255 return -1; 256 else 257 return 1; 258 } 259 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); 260 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; 261 } 262 263 /** 264 * Returns {@code true} if this is a periodic (not a one-shot) action. 265 * 266 * @return {@code true} if periodic 267 */ isPeriodic()268 public boolean isPeriodic() { 269 return period != 0; 270 } 271 272 /** 273 * Sets the next time to run for a periodic task. 274 */ setNextRunTime()275 private void setNextRunTime() { 276 long p = period; 277 if (p > 0) 278 time += p; 279 else 280 time = triggerTime(-p); 281 } 282 cancel(boolean mayInterruptIfRunning)283 public boolean cancel(boolean mayInterruptIfRunning) { 284 // The racy read of heapIndex below is benign: 285 // if heapIndex < 0, then OOTA guarantees that we have surely 286 // been removed; else we recheck under lock in remove() 287 boolean cancelled = super.cancel(mayInterruptIfRunning); 288 if (cancelled && removeOnCancel && heapIndex >= 0) 289 remove(this); 290 return cancelled; 291 } 292 293 /** 294 * Overrides FutureTask version so as to reset/requeue if periodic. 295 */ run()296 public void run() { 297 boolean periodic = isPeriodic(); 298 if (!canRunInCurrentRunState(periodic)) 299 cancel(false); 300 else if (!periodic) 301 super.run(); 302 else if (super.runAndReset()) { 303 setNextRunTime(); 304 reExecutePeriodic(outerTask); 305 } 306 } 307 } 308 309 /** 310 * Returns true if can run a task given current run state 311 * and run-after-shutdown parameters. 312 * 313 * @param periodic true if this task periodic, false if delayed 314 */ canRunInCurrentRunState(boolean periodic)315 boolean canRunInCurrentRunState(boolean periodic) { 316 return isRunningOrShutdown(periodic ? 317 continueExistingPeriodicTasksAfterShutdown : 318 executeExistingDelayedTasksAfterShutdown); 319 } 320 321 /** 322 * Main execution method for delayed or periodic tasks. If pool 323 * is shut down, rejects the task. Otherwise adds task to queue 324 * and starts a thread, if necessary, to run it. (We cannot 325 * prestart the thread to run the task because the task (probably) 326 * shouldn't be run yet.) If the pool is shut down while the task 327 * is being added, cancel and remove it if required by state and 328 * run-after-shutdown parameters. 329 * 330 * @param task the task 331 */ delayedExecute(RunnableScheduledFuture<?> task)332 private void delayedExecute(RunnableScheduledFuture<?> task) { 333 if (isShutdown()) 334 reject(task); 335 else { 336 super.getQueue().add(task); 337 if (isShutdown() && 338 !canRunInCurrentRunState(task.isPeriodic()) && 339 remove(task)) 340 task.cancel(false); 341 else 342 ensurePrestart(); 343 } 344 } 345 346 /** 347 * Requeues a periodic task unless current run state precludes it. 348 * Same idea as delayedExecute except drops task rather than rejecting. 349 * 350 * @param task the task 351 */ reExecutePeriodic(RunnableScheduledFuture<?> task)352 void reExecutePeriodic(RunnableScheduledFuture<?> task) { 353 if (canRunInCurrentRunState(true)) { 354 super.getQueue().add(task); 355 if (!canRunInCurrentRunState(true) && remove(task)) 356 task.cancel(false); 357 else 358 ensurePrestart(); 359 } 360 } 361 362 /** 363 * Cancels and clears the queue of all tasks that should not be run 364 * due to shutdown policy. Invoked within super.shutdown. 365 */ onShutdown()366 @Override void onShutdown() { 367 BlockingQueue<Runnable> q = super.getQueue(); 368 boolean keepDelayed = 369 getExecuteExistingDelayedTasksAfterShutdownPolicy(); 370 boolean keepPeriodic = 371 getContinueExistingPeriodicTasksAfterShutdownPolicy(); 372 if (!keepDelayed && !keepPeriodic) { 373 for (Object e : q.toArray()) 374 if (e instanceof RunnableScheduledFuture<?>) 375 ((RunnableScheduledFuture<?>) e).cancel(false); 376 q.clear(); 377 } 378 else { 379 // Traverse snapshot to avoid iterator exceptions 380 for (Object e : q.toArray()) { 381 if (e instanceof RunnableScheduledFuture) { 382 RunnableScheduledFuture<?> t = 383 (RunnableScheduledFuture<?>)e; 384 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || 385 t.isCancelled()) { // also remove if already cancelled 386 if (q.remove(t)) 387 t.cancel(false); 388 } 389 } 390 } 391 } 392 tryTerminate(); 393 } 394 395 /** 396 * Modifies or replaces the task used to execute a runnable. 397 * This method can be used to override the concrete 398 * class used for managing internal tasks. 399 * The default implementation simply returns the given task. 400 * 401 * @param runnable the submitted Runnable 402 * @param task the task created to execute the runnable 403 * @param <V> the type of the task's result 404 * @return a task that can execute the runnable 405 * @since 1.6 406 */ decorateTask( Runnable runnable, RunnableScheduledFuture<V> task)407 protected <V> RunnableScheduledFuture<V> decorateTask( 408 Runnable runnable, RunnableScheduledFuture<V> task) { 409 return task; 410 } 411 412 /** 413 * Modifies or replaces the task used to execute a callable. 414 * This method can be used to override the concrete 415 * class used for managing internal tasks. 416 * The default implementation simply returns the given task. 417 * 418 * @param callable the submitted Callable 419 * @param task the task created to execute the callable 420 * @param <V> the type of the task's result 421 * @return a task that can execute the callable 422 * @since 1.6 423 */ decorateTask( Callable<V> callable, RunnableScheduledFuture<V> task)424 protected <V> RunnableScheduledFuture<V> decorateTask( 425 Callable<V> callable, RunnableScheduledFuture<V> task) { 426 return task; 427 } 428 429 /** 430 * The default keep-alive time for pool threads. 431 * 432 * Normally, this value is unused because all pool threads will be 433 * core threads, but if a user creates a pool with a corePoolSize 434 * of zero (against our advice), we keep a thread alive as long as 435 * there are queued tasks. If the keep alive time is zero (the 436 * historic value), we end up hot-spinning in getTask, wasting a 437 * CPU. But on the other hand, if we set the value too high, and 438 * users create a one-shot pool which they don't cleanly shutdown, 439 * the pool's non-daemon threads will prevent JVM termination. A 440 * small but non-zero value (relative to a JVM's lifetime) seems 441 * best. 442 */ 443 private static final long DEFAULT_KEEPALIVE_MILLIS = 10L; 444 445 /** 446 * Creates a new {@code ScheduledThreadPoolExecutor} with the 447 * given core pool size. 448 * 449 * @param corePoolSize the number of threads to keep in the pool, even 450 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 451 * @throws IllegalArgumentException if {@code corePoolSize < 0} 452 */ ScheduledThreadPoolExecutor(int corePoolSize)453 public ScheduledThreadPoolExecutor(int corePoolSize) { 454 super(corePoolSize, Integer.MAX_VALUE, 455 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, 456 new DelayedWorkQueue()); 457 } 458 459 /** 460 * Creates a new {@code ScheduledThreadPoolExecutor} with the 461 * given initial parameters. 462 * 463 * @param corePoolSize the number of threads to keep in the pool, even 464 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 465 * @param threadFactory the factory to use when the executor 466 * creates a new thread 467 * @throws IllegalArgumentException if {@code corePoolSize < 0} 468 * @throws NullPointerException if {@code threadFactory} is null 469 */ ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)470 public ScheduledThreadPoolExecutor(int corePoolSize, 471 ThreadFactory threadFactory) { 472 super(corePoolSize, Integer.MAX_VALUE, 473 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, 474 new DelayedWorkQueue(), threadFactory); 475 } 476 477 /** 478 * Creates a new {@code ScheduledThreadPoolExecutor} with the 479 * given initial parameters. 480 * 481 * @param corePoolSize the number of threads to keep in the pool, even 482 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 483 * @param handler the handler to use when execution is blocked 484 * because the thread bounds and queue capacities are reached 485 * @throws IllegalArgumentException if {@code corePoolSize < 0} 486 * @throws NullPointerException if {@code handler} is null 487 */ ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)488 public ScheduledThreadPoolExecutor(int corePoolSize, 489 RejectedExecutionHandler handler) { 490 super(corePoolSize, Integer.MAX_VALUE, 491 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, 492 new DelayedWorkQueue(), handler); 493 } 494 495 /** 496 * Creates a new {@code ScheduledThreadPoolExecutor} with the 497 * given initial parameters. 498 * 499 * @param corePoolSize the number of threads to keep in the pool, even 500 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 501 * @param threadFactory the factory to use when the executor 502 * creates a new thread 503 * @param handler the handler to use when execution is blocked 504 * because the thread bounds and queue capacities are reached 505 * @throws IllegalArgumentException if {@code corePoolSize < 0} 506 * @throws NullPointerException if {@code threadFactory} or 507 * {@code handler} is null 508 */ ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)509 public ScheduledThreadPoolExecutor(int corePoolSize, 510 ThreadFactory threadFactory, 511 RejectedExecutionHandler handler) { 512 super(corePoolSize, Integer.MAX_VALUE, 513 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, 514 new DelayedWorkQueue(), threadFactory, handler); 515 } 516 517 /** 518 * Returns the nanoTime-based trigger time of a delayed action. 519 */ triggerTime(long delay, TimeUnit unit)520 private long triggerTime(long delay, TimeUnit unit) { 521 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); 522 } 523 524 /** 525 * Returns the nanoTime-based trigger time of a delayed action. 526 */ triggerTime(long delay)527 long triggerTime(long delay) { 528 return System.nanoTime() + 529 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); 530 } 531 532 /** 533 * Constrains the values of all delays in the queue to be within 534 * Long.MAX_VALUE of each other, to avoid overflow in compareTo. 535 * This may occur if a task is eligible to be dequeued, but has 536 * not yet been, while some other task is added with a delay of 537 * Long.MAX_VALUE. 538 */ overflowFree(long delay)539 private long overflowFree(long delay) { 540 Delayed head = (Delayed) super.getQueue().peek(); 541 if (head != null) { 542 long headDelay = head.getDelay(NANOSECONDS); 543 if (headDelay < 0 && (delay - headDelay < 0)) 544 delay = Long.MAX_VALUE + headDelay; 545 } 546 return delay; 547 } 548 549 /** 550 * @throws RejectedExecutionException {@inheritDoc} 551 * @throws NullPointerException {@inheritDoc} 552 */ schedule(Runnable command, long delay, TimeUnit unit)553 public ScheduledFuture<?> schedule(Runnable command, 554 long delay, 555 TimeUnit unit) { 556 if (command == null || unit == null) 557 throw new NullPointerException(); 558 RunnableScheduledFuture<Void> t = decorateTask(command, 559 new ScheduledFutureTask<Void>(command, null, 560 triggerTime(delay, unit), 561 sequencer.getAndIncrement())); 562 delayedExecute(t); 563 return t; 564 } 565 566 /** 567 * @throws RejectedExecutionException {@inheritDoc} 568 * @throws NullPointerException {@inheritDoc} 569 */ schedule(Callable<V> callable, long delay, TimeUnit unit)570 public <V> ScheduledFuture<V> schedule(Callable<V> callable, 571 long delay, 572 TimeUnit unit) { 573 if (callable == null || unit == null) 574 throw new NullPointerException(); 575 RunnableScheduledFuture<V> t = decorateTask(callable, 576 new ScheduledFutureTask<V>(callable, 577 triggerTime(delay, unit), 578 sequencer.getAndIncrement())); 579 delayedExecute(t); 580 return t; 581 } 582 583 /** 584 * @throws RejectedExecutionException {@inheritDoc} 585 * @throws NullPointerException {@inheritDoc} 586 * @throws IllegalArgumentException {@inheritDoc} 587 */ scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)588 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, 589 long initialDelay, 590 long period, 591 TimeUnit unit) { 592 if (command == null || unit == null) 593 throw new NullPointerException(); 594 if (period <= 0L) 595 throw new IllegalArgumentException(); 596 ScheduledFutureTask<Void> sft = 597 new ScheduledFutureTask<Void>(command, 598 null, 599 triggerTime(initialDelay, unit), 600 unit.toNanos(period), 601 sequencer.getAndIncrement()); 602 RunnableScheduledFuture<Void> t = decorateTask(command, sft); 603 sft.outerTask = t; 604 delayedExecute(t); 605 return t; 606 } 607 608 /** 609 * @throws RejectedExecutionException {@inheritDoc} 610 * @throws NullPointerException {@inheritDoc} 611 * @throws IllegalArgumentException {@inheritDoc} 612 */ scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)613 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 614 long initialDelay, 615 long delay, 616 TimeUnit unit) { 617 if (command == null || unit == null) 618 throw new NullPointerException(); 619 if (delay <= 0L) 620 throw new IllegalArgumentException(); 621 ScheduledFutureTask<Void> sft = 622 new ScheduledFutureTask<Void>(command, 623 null, 624 triggerTime(initialDelay, unit), 625 -unit.toNanos(delay), 626 sequencer.getAndIncrement()); 627 RunnableScheduledFuture<Void> t = decorateTask(command, sft); 628 sft.outerTask = t; 629 delayedExecute(t); 630 return t; 631 } 632 633 /** 634 * Executes {@code command} with zero required delay. 635 * This has effect equivalent to 636 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}. 637 * Note that inspections of the queue and of the list returned by 638 * {@code shutdownNow} will access the zero-delayed 639 * {@link ScheduledFuture}, not the {@code command} itself. 640 * 641 * <p>A consequence of the use of {@code ScheduledFuture} objects is 642 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always 643 * called with a null second {@code Throwable} argument, even if the 644 * {@code command} terminated abruptly. Instead, the {@code Throwable} 645 * thrown by such a task can be obtained via {@link Future#get}. 646 * 647 * @throws RejectedExecutionException at discretion of 648 * {@code RejectedExecutionHandler}, if the task 649 * cannot be accepted for execution because the 650 * executor has been shut down 651 * @throws NullPointerException {@inheritDoc} 652 */ execute(Runnable command)653 public void execute(Runnable command) { 654 schedule(command, 0, NANOSECONDS); 655 } 656 657 // Override AbstractExecutorService methods 658 659 /** 660 * @throws RejectedExecutionException {@inheritDoc} 661 * @throws NullPointerException {@inheritDoc} 662 */ submit(Runnable task)663 public Future<?> submit(Runnable task) { 664 return schedule(task, 0, NANOSECONDS); 665 } 666 667 /** 668 * @throws RejectedExecutionException {@inheritDoc} 669 * @throws NullPointerException {@inheritDoc} 670 */ submit(Runnable task, T result)671 public <T> Future<T> submit(Runnable task, T result) { 672 return schedule(Executors.callable(task, result), 0, NANOSECONDS); 673 } 674 675 /** 676 * @throws RejectedExecutionException {@inheritDoc} 677 * @throws NullPointerException {@inheritDoc} 678 */ submit(Callable<T> task)679 public <T> Future<T> submit(Callable<T> task) { 680 return schedule(task, 0, NANOSECONDS); 681 } 682 683 /** 684 * Sets the policy on whether to continue executing existing 685 * periodic tasks even when this executor has been {@code shutdown}. 686 * In this case, these tasks will only terminate upon 687 * {@code shutdownNow} or after setting the policy to 688 * {@code false} when already shutdown. 689 * This value is by default {@code false}. 690 * 691 * @param value if {@code true}, continue after shutdown, else don't 692 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy 693 */ setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)694 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) { 695 continueExistingPeriodicTasksAfterShutdown = value; 696 if (!value && isShutdown()) 697 onShutdown(); 698 } 699 700 /** 701 * Gets the policy on whether to continue executing existing 702 * periodic tasks even when this executor has been {@code shutdown}. 703 * In this case, these tasks will only terminate upon 704 * {@code shutdownNow} or after setting the policy to 705 * {@code false} when already shutdown. 706 * This value is by default {@code false}. 707 * 708 * @return {@code true} if will continue after shutdown 709 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy 710 */ getContinueExistingPeriodicTasksAfterShutdownPolicy()711 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() { 712 return continueExistingPeriodicTasksAfterShutdown; 713 } 714 715 /** 716 * Sets the policy on whether to execute existing delayed 717 * tasks even when this executor has been {@code shutdown}. 718 * In this case, these tasks will only terminate upon 719 * {@code shutdownNow}, or after setting the policy to 720 * {@code false} when already shutdown. 721 * This value is by default {@code true}. 722 * 723 * @param value if {@code true}, execute after shutdown, else don't 724 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy 725 */ setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value)726 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) { 727 executeExistingDelayedTasksAfterShutdown = value; 728 if (!value && isShutdown()) 729 onShutdown(); 730 } 731 732 /** 733 * Gets the policy on whether to execute existing delayed 734 * tasks even when this executor has been {@code shutdown}. 735 * In this case, these tasks will only terminate upon 736 * {@code shutdownNow}, or after setting the policy to 737 * {@code false} when already shutdown. 738 * This value is by default {@code true}. 739 * 740 * @return {@code true} if will execute after shutdown 741 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy 742 */ getExecuteExistingDelayedTasksAfterShutdownPolicy()743 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() { 744 return executeExistingDelayedTasksAfterShutdown; 745 } 746 747 /** 748 * Sets the policy on whether cancelled tasks should be immediately 749 * removed from the work queue at time of cancellation. This value is 750 * by default {@code false}. 751 * 752 * @param value if {@code true}, remove on cancellation, else don't 753 * @see #getRemoveOnCancelPolicy 754 * @since 1.7 755 */ setRemoveOnCancelPolicy(boolean value)756 public void setRemoveOnCancelPolicy(boolean value) { 757 removeOnCancel = value; 758 } 759 760 /** 761 * Gets the policy on whether cancelled tasks should be immediately 762 * removed from the work queue at time of cancellation. This value is 763 * by default {@code false}. 764 * 765 * @return {@code true} if cancelled tasks are immediately removed 766 * from the queue 767 * @see #setRemoveOnCancelPolicy 768 * @since 1.7 769 */ getRemoveOnCancelPolicy()770 public boolean getRemoveOnCancelPolicy() { 771 return removeOnCancel; 772 } 773 774 /** 775 * Initiates an orderly shutdown in which previously submitted 776 * tasks are executed, but no new tasks will be accepted. 777 * Invocation has no additional effect if already shut down. 778 * 779 * <p>This method does not wait for previously submitted tasks to 780 * complete execution. Use {@link #awaitTermination awaitTermination} 781 * to do that. 782 * 783 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} 784 * has been set {@code false}, existing delayed tasks whose delays 785 * have not yet elapsed are cancelled. And unless the {@code 786 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set 787 * {@code true}, future executions of existing periodic tasks will 788 * be cancelled. 789 */ 790 // android-note: Removed "throws SecurityException" doc. shutdown()791 public void shutdown() { 792 super.shutdown(); 793 } 794 795 /** 796 * Attempts to stop all actively executing tasks, halts the 797 * processing of waiting tasks, and returns a list of the tasks 798 * that were awaiting execution. These tasks are drained (removed) 799 * from the task queue upon return from this method. 800 * 801 * <p>This method does not wait for actively executing tasks to 802 * terminate. Use {@link #awaitTermination awaitTermination} to 803 * do that. 804 * 805 * <p>There are no guarantees beyond best-effort attempts to stop 806 * processing actively executing tasks. This implementation 807 * interrupts tasks via {@link Thread#interrupt}; any task that 808 * fails to respond to interrupts may never terminate. 809 * 810 * @return list of tasks that never commenced execution. 811 * Each element of this list is a {@link ScheduledFuture}. 812 * For tasks submitted via one of the {@code schedule} 813 * methods, the element will be identical to the returned 814 * {@code ScheduledFuture}. For tasks submitted using 815 * {@link #execute execute}, the element will be a 816 * zero-delay {@code ScheduledFuture}. 817 */ 818 // android-note: Removed "throws SecurityException" doc. shutdownNow()819 public List<Runnable> shutdownNow() { 820 return super.shutdownNow(); 821 } 822 823 /** 824 * Returns the task queue used by this executor. Access to the 825 * task queue is intended primarily for debugging and monitoring. 826 * This queue may be in active use. Retrieving the task queue 827 * does not prevent queued tasks from executing. 828 * 829 * <p>Each element of this queue is a {@link ScheduledFuture}. 830 * For tasks submitted via one of the {@code schedule} methods, the 831 * element will be identical to the returned {@code ScheduledFuture}. 832 * For tasks submitted using {@link #execute execute}, the element 833 * will be a zero-delay {@code ScheduledFuture}. 834 * 835 * <p>Iteration over this queue is <em>not</em> guaranteed to traverse 836 * tasks in the order in which they will execute. 837 * 838 * @return the task queue 839 */ getQueue()840 public BlockingQueue<Runnable> getQueue() { 841 return super.getQueue(); 842 } 843 844 /** 845 * Specialized delay queue. To mesh with TPE declarations, this 846 * class must be declared as a BlockingQueue<Runnable> even though 847 * it can only hold RunnableScheduledFutures. 848 */ 849 static class DelayedWorkQueue extends AbstractQueue<Runnable> 850 implements BlockingQueue<Runnable> { 851 852 /* 853 * A DelayedWorkQueue is based on a heap-based data structure 854 * like those in DelayQueue and PriorityQueue, except that 855 * every ScheduledFutureTask also records its index into the 856 * heap array. This eliminates the need to find a task upon 857 * cancellation, greatly speeding up removal (down from O(n) 858 * to O(log n)), and reducing garbage retention that would 859 * otherwise occur by waiting for the element to rise to top 860 * before clearing. But because the queue may also hold 861 * RunnableScheduledFutures that are not ScheduledFutureTasks, 862 * we are not guaranteed to have such indices available, in 863 * which case we fall back to linear search. (We expect that 864 * most tasks will not be decorated, and that the faster cases 865 * will be much more common.) 866 * 867 * All heap operations must record index changes -- mainly 868 * within siftUp and siftDown. Upon removal, a task's 869 * heapIndex is set to -1. Note that ScheduledFutureTasks can 870 * appear at most once in the queue (this need not be true for 871 * other kinds of tasks or work queues), so are uniquely 872 * identified by heapIndex. 873 */ 874 875 private static final int INITIAL_CAPACITY = 16; 876 private RunnableScheduledFuture<?>[] queue = 877 new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; 878 private final ReentrantLock lock = new ReentrantLock(); 879 private int size; 880 881 /** 882 * Thread designated to wait for the task at the head of the 883 * queue. This variant of the Leader-Follower pattern 884 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to 885 * minimize unnecessary timed waiting. When a thread becomes 886 * the leader, it waits only for the next delay to elapse, but 887 * other threads await indefinitely. The leader thread must 888 * signal some other thread before returning from take() or 889 * poll(...), unless some other thread becomes leader in the 890 * interim. Whenever the head of the queue is replaced with a 891 * task with an earlier expiration time, the leader field is 892 * invalidated by being reset to null, and some waiting 893 * thread, but not necessarily the current leader, is 894 * signalled. So waiting threads must be prepared to acquire 895 * and lose leadership while waiting. 896 */ 897 private Thread leader; 898 899 /** 900 * Condition signalled when a newer task becomes available at the 901 * head of the queue or a new thread may need to become leader. 902 */ 903 private final Condition available = lock.newCondition(); 904 905 /** 906 * Sets f's heapIndex if it is a ScheduledFutureTask. 907 */ setIndex(RunnableScheduledFuture<?> f, int idx)908 private void setIndex(RunnableScheduledFuture<?> f, int idx) { 909 if (f instanceof ScheduledFutureTask) 910 ((ScheduledFutureTask)f).heapIndex = idx; 911 } 912 913 /** 914 * Sifts element added at bottom up to its heap-ordered spot. 915 * Call only when holding lock. 916 */ siftUp(int k, RunnableScheduledFuture<?> key)917 private void siftUp(int k, RunnableScheduledFuture<?> key) { 918 while (k > 0) { 919 int parent = (k - 1) >>> 1; 920 RunnableScheduledFuture<?> e = queue[parent]; 921 if (key.compareTo(e) >= 0) 922 break; 923 queue[k] = e; 924 setIndex(e, k); 925 k = parent; 926 } 927 queue[k] = key; 928 setIndex(key, k); 929 } 930 931 /** 932 * Sifts element added at top down to its heap-ordered spot. 933 * Call only when holding lock. 934 */ siftDown(int k, RunnableScheduledFuture<?> key)935 private void siftDown(int k, RunnableScheduledFuture<?> key) { 936 int half = size >>> 1; 937 while (k < half) { 938 int child = (k << 1) + 1; 939 RunnableScheduledFuture<?> c = queue[child]; 940 int right = child + 1; 941 if (right < size && c.compareTo(queue[right]) > 0) 942 c = queue[child = right]; 943 if (key.compareTo(c) <= 0) 944 break; 945 queue[k] = c; 946 setIndex(c, k); 947 k = child; 948 } 949 queue[k] = key; 950 setIndex(key, k); 951 } 952 953 /** 954 * Resizes the heap array. Call only when holding lock. 955 */ grow()956 private void grow() { 957 int oldCapacity = queue.length; 958 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% 959 if (newCapacity < 0) // overflow 960 newCapacity = Integer.MAX_VALUE; 961 queue = Arrays.copyOf(queue, newCapacity); 962 } 963 964 /** 965 * Finds index of given object, or -1 if absent. 966 */ indexOf(Object x)967 private int indexOf(Object x) { 968 if (x != null) { 969 if (x instanceof ScheduledFutureTask) { 970 int i = ((ScheduledFutureTask) x).heapIndex; 971 // Sanity check; x could conceivably be a 972 // ScheduledFutureTask from some other pool. 973 if (i >= 0 && i < size && queue[i] == x) 974 return i; 975 } else { 976 for (int i = 0; i < size; i++) 977 if (x.equals(queue[i])) 978 return i; 979 } 980 } 981 return -1; 982 } 983 contains(Object x)984 public boolean contains(Object x) { 985 final ReentrantLock lock = this.lock; 986 lock.lock(); 987 try { 988 return indexOf(x) != -1; 989 } finally { 990 lock.unlock(); 991 } 992 } 993 remove(Object x)994 public boolean remove(Object x) { 995 final ReentrantLock lock = this.lock; 996 lock.lock(); 997 try { 998 int i = indexOf(x); 999 if (i < 0) 1000 return false; 1001 1002 setIndex(queue[i], -1); 1003 int s = --size; 1004 RunnableScheduledFuture<?> replacement = queue[s]; 1005 queue[s] = null; 1006 if (s != i) { 1007 siftDown(i, replacement); 1008 if (queue[i] == replacement) 1009 siftUp(i, replacement); 1010 } 1011 return true; 1012 } finally { 1013 lock.unlock(); 1014 } 1015 } 1016 size()1017 public int size() { 1018 final ReentrantLock lock = this.lock; 1019 lock.lock(); 1020 try { 1021 return size; 1022 } finally { 1023 lock.unlock(); 1024 } 1025 } 1026 isEmpty()1027 public boolean isEmpty() { 1028 return size() == 0; 1029 } 1030 remainingCapacity()1031 public int remainingCapacity() { 1032 return Integer.MAX_VALUE; 1033 } 1034 peek()1035 public RunnableScheduledFuture<?> peek() { 1036 final ReentrantLock lock = this.lock; 1037 lock.lock(); 1038 try { 1039 return queue[0]; 1040 } finally { 1041 lock.unlock(); 1042 } 1043 } 1044 offer(Runnable x)1045 public boolean offer(Runnable x) { 1046 if (x == null) 1047 throw new NullPointerException(); 1048 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; 1049 final ReentrantLock lock = this.lock; 1050 lock.lock(); 1051 try { 1052 int i = size; 1053 if (i >= queue.length) 1054 grow(); 1055 size = i + 1; 1056 if (i == 0) { 1057 queue[0] = e; 1058 setIndex(e, 0); 1059 } else { 1060 siftUp(i, e); 1061 } 1062 if (queue[0] == e) { 1063 leader = null; 1064 available.signal(); 1065 } 1066 } finally { 1067 lock.unlock(); 1068 } 1069 return true; 1070 } 1071 put(Runnable e)1072 public void put(Runnable e) { 1073 offer(e); 1074 } 1075 add(Runnable e)1076 public boolean add(Runnable e) { 1077 return offer(e); 1078 } 1079 offer(Runnable e, long timeout, TimeUnit unit)1080 public boolean offer(Runnable e, long timeout, TimeUnit unit) { 1081 return offer(e); 1082 } 1083 1084 /** 1085 * Performs common bookkeeping for poll and take: Replaces 1086 * first element with last and sifts it down. Call only when 1087 * holding lock. 1088 * @param f the task to remove and return 1089 */ finishPoll(RunnableScheduledFuture<?> f)1090 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { 1091 int s = --size; 1092 RunnableScheduledFuture<?> x = queue[s]; 1093 queue[s] = null; 1094 if (s != 0) 1095 siftDown(0, x); 1096 setIndex(f, -1); 1097 return f; 1098 } 1099 poll()1100 public RunnableScheduledFuture<?> poll() { 1101 final ReentrantLock lock = this.lock; 1102 lock.lock(); 1103 try { 1104 RunnableScheduledFuture<?> first = queue[0]; 1105 return (first == null || first.getDelay(NANOSECONDS) > 0) 1106 ? null 1107 : finishPoll(first); 1108 } finally { 1109 lock.unlock(); 1110 } 1111 } 1112 take()1113 public RunnableScheduledFuture<?> take() throws InterruptedException { 1114 final ReentrantLock lock = this.lock; 1115 lock.lockInterruptibly(); 1116 try { 1117 for (;;) { 1118 RunnableScheduledFuture<?> first = queue[0]; 1119 if (first == null) 1120 available.await(); 1121 else { 1122 long delay = first.getDelay(NANOSECONDS); 1123 if (delay <= 0L) 1124 return finishPoll(first); 1125 first = null; // don't retain ref while waiting 1126 if (leader != null) 1127 available.await(); 1128 else { 1129 Thread thisThread = Thread.currentThread(); 1130 leader = thisThread; 1131 try { 1132 available.awaitNanos(delay); 1133 } finally { 1134 if (leader == thisThread) 1135 leader = null; 1136 } 1137 } 1138 } 1139 } 1140 } finally { 1141 if (leader == null && queue[0] != null) 1142 available.signal(); 1143 lock.unlock(); 1144 } 1145 } 1146 poll(long timeout, TimeUnit unit)1147 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) 1148 throws InterruptedException { 1149 long nanos = unit.toNanos(timeout); 1150 final ReentrantLock lock = this.lock; 1151 lock.lockInterruptibly(); 1152 try { 1153 for (;;) { 1154 RunnableScheduledFuture<?> first = queue[0]; 1155 if (first == null) { 1156 if (nanos <= 0L) 1157 return null; 1158 else 1159 nanos = available.awaitNanos(nanos); 1160 } else { 1161 long delay = first.getDelay(NANOSECONDS); 1162 if (delay <= 0L) 1163 return finishPoll(first); 1164 if (nanos <= 0L) 1165 return null; 1166 first = null; // don't retain ref while waiting 1167 if (nanos < delay || leader != null) 1168 nanos = available.awaitNanos(nanos); 1169 else { 1170 Thread thisThread = Thread.currentThread(); 1171 leader = thisThread; 1172 try { 1173 long timeLeft = available.awaitNanos(delay); 1174 nanos -= delay - timeLeft; 1175 } finally { 1176 if (leader == thisThread) 1177 leader = null; 1178 } 1179 } 1180 } 1181 } 1182 } finally { 1183 if (leader == null && queue[0] != null) 1184 available.signal(); 1185 lock.unlock(); 1186 } 1187 } 1188 clear()1189 public void clear() { 1190 final ReentrantLock lock = this.lock; 1191 lock.lock(); 1192 try { 1193 for (int i = 0; i < size; i++) { 1194 RunnableScheduledFuture<?> t = queue[i]; 1195 if (t != null) { 1196 queue[i] = null; 1197 setIndex(t, -1); 1198 } 1199 } 1200 size = 0; 1201 } finally { 1202 lock.unlock(); 1203 } 1204 } 1205 1206 /** 1207 * Returns first element only if it is expired. 1208 * Used only by drainTo. Call only when holding lock. 1209 */ peekExpired()1210 private RunnableScheduledFuture<?> peekExpired() { 1211 // assert lock.isHeldByCurrentThread(); 1212 RunnableScheduledFuture<?> first = queue[0]; 1213 return (first == null || first.getDelay(NANOSECONDS) > 0) ? 1214 null : first; 1215 } 1216 drainTo(Collection<? super Runnable> c)1217 public int drainTo(Collection<? super Runnable> c) { 1218 if (c == null) 1219 throw new NullPointerException(); 1220 if (c == this) 1221 throw new IllegalArgumentException(); 1222 final ReentrantLock lock = this.lock; 1223 lock.lock(); 1224 try { 1225 RunnableScheduledFuture<?> first; 1226 int n = 0; 1227 while ((first = peekExpired()) != null) { 1228 c.add(first); // In this order, in case add() throws. 1229 finishPoll(first); 1230 ++n; 1231 } 1232 return n; 1233 } finally { 1234 lock.unlock(); 1235 } 1236 } 1237 drainTo(Collection<? super Runnable> c, int maxElements)1238 public int drainTo(Collection<? super Runnable> c, int maxElements) { 1239 if (c == null) 1240 throw new NullPointerException(); 1241 if (c == this) 1242 throw new IllegalArgumentException(); 1243 if (maxElements <= 0) 1244 return 0; 1245 final ReentrantLock lock = this.lock; 1246 lock.lock(); 1247 try { 1248 RunnableScheduledFuture<?> first; 1249 int n = 0; 1250 while (n < maxElements && (first = peekExpired()) != null) { 1251 c.add(first); // In this order, in case add() throws. 1252 finishPoll(first); 1253 ++n; 1254 } 1255 return n; 1256 } finally { 1257 lock.unlock(); 1258 } 1259 } 1260 toArray()1261 public Object[] toArray() { 1262 final ReentrantLock lock = this.lock; 1263 lock.lock(); 1264 try { 1265 return Arrays.copyOf(queue, size, Object[].class); 1266 } finally { 1267 lock.unlock(); 1268 } 1269 } 1270 1271 @SuppressWarnings("unchecked") toArray(T[] a)1272 public <T> T[] toArray(T[] a) { 1273 final ReentrantLock lock = this.lock; 1274 lock.lock(); 1275 try { 1276 if (a.length < size) 1277 return (T[]) Arrays.copyOf(queue, size, a.getClass()); 1278 System.arraycopy(queue, 0, a, 0, size); 1279 if (a.length > size) 1280 a[size] = null; 1281 return a; 1282 } finally { 1283 lock.unlock(); 1284 } 1285 } 1286 iterator()1287 public Iterator<Runnable> iterator() { 1288 return new Itr(Arrays.copyOf(queue, size)); 1289 } 1290 1291 /** 1292 * Snapshot iterator that works off copy of underlying q array. 1293 */ 1294 private class Itr implements Iterator<Runnable> { 1295 final RunnableScheduledFuture<?>[] array; 1296 int cursor; // index of next element to return; initially 0 1297 int lastRet = -1; // index of last element returned; -1 if no such 1298 Itr(RunnableScheduledFuture<?>[] array)1299 Itr(RunnableScheduledFuture<?>[] array) { 1300 this.array = array; 1301 } 1302 hasNext()1303 public boolean hasNext() { 1304 return cursor < array.length; 1305 } 1306 next()1307 public Runnable next() { 1308 if (cursor >= array.length) 1309 throw new NoSuchElementException(); 1310 lastRet = cursor; 1311 return array[cursor++]; 1312 } 1313 remove()1314 public void remove() { 1315 if (lastRet < 0) 1316 throw new IllegalStateException(); 1317 DelayedWorkQueue.this.remove(array[lastRet]); 1318 lastRet = -1; 1319 } 1320 } 1321 } 1322 } 1323