1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7 package java.util.concurrent; 8 import static java.util.concurrent.TimeUnit.NANOSECONDS; 9 import java.util.concurrent.atomic.AtomicLong; 10 import java.util.concurrent.locks.Condition; 11 import java.util.concurrent.locks.ReentrantLock; 12 import java.util.*; 13 14 // BEGIN android-note 15 // omit class-level docs on setRemoveOnCancelPolicy() 16 // END android-note 17 18 /** 19 * A {@link ThreadPoolExecutor} that can additionally schedule 20 * commands to run after a given delay, or to execute 21 * periodically. This class is preferable to {@link java.util.Timer} 22 * when multiple worker threads are needed, or when the additional 23 * flexibility or capabilities of {@link ThreadPoolExecutor} (which 24 * this class extends) are required. 25 * 26 * <p>Delayed tasks execute no sooner than they are enabled, but 27 * without any real-time guarantees about when, after they are 28 * enabled, they will commence. Tasks scheduled for exactly the same 29 * execution time are enabled in first-in-first-out (FIFO) order of 30 * submission. 31 * 32 * <p>When a submitted task is cancelled before it is run, execution 33 * is suppressed. By default, such a cancelled task is not 34 * automatically removed from the work queue until its delay 35 * elapses. While this enables further inspection and monitoring, it 36 * may also cause unbounded retention of cancelled tasks. 37 * 38 * <p>Successive executions of a task scheduled via 39 * {@code scheduleAtFixedRate} or 40 * {@code scheduleWithFixedDelay} do not overlap. While different 41 * executions may be performed by different threads, the effects of 42 * prior executions <a 43 * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 44 * those of subsequent ones. 45 * 46 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few 47 * of the inherited tuning methods are not useful for it. In 48 * particular, because it acts as a fixed-sized pool using 49 * {@code corePoolSize} threads and an unbounded queue, adjustments 50 * to {@code maximumPoolSize} have no useful effect. Additionally, it 51 * is almost never a good idea to set {@code corePoolSize} to zero or 52 * use {@code allowCoreThreadTimeOut} because this may leave the pool 53 * without threads to handle tasks once they become eligible to run. 54 * 55 * <p><b>Extension notes:</b> This class overrides the 56 * {@link ThreadPoolExecutor#execute(Runnable) execute} and 57 * {@link AbstractExecutorService#submit(Runnable) submit} 58 * methods to generate internal {@link ScheduledFuture} objects to 59 * control per-task delays and scheduling. To preserve 60 * functionality, any further overrides of these methods in 61 * subclasses must invoke superclass versions, which effectively 62 * disables additional task customization. However, this class 63 * provides alternative protected extension method 64 * {@code decorateTask} (one version each for {@code Runnable} and 65 * {@code Callable}) that can be used to customize the concrete task 66 * types used to execute commands entered via {@code execute}, 67 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate}, 68 * and {@code scheduleWithFixedDelay}. By default, a 69 * {@code ScheduledThreadPoolExecutor} uses a task type extending 70 * {@link FutureTask}. However, this may be modified or replaced using 71 * subclasses of the form: 72 * 73 * <pre> {@code 74 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor { 75 * 76 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... } 77 * 78 * protected <V> RunnableScheduledFuture<V> decorateTask( 79 * Runnable r, RunnableScheduledFuture<V> task) { 80 * return new CustomTask<V>(r, task); 81 * } 82 * 83 * protected <V> RunnableScheduledFuture<V> decorateTask( 84 * Callable<V> c, RunnableScheduledFuture<V> task) { 85 * return new CustomTask<V>(c, task); 86 * } 87 * // ... add constructors, etc. 88 * }}</pre> 89 * 90 * @since 1.5 91 * @author Doug Lea 92 */ 93 public class ScheduledThreadPoolExecutor 94 extends ThreadPoolExecutor 95 implements ScheduledExecutorService { 96 97 /* 98 * This class specializes ThreadPoolExecutor implementation by 99 * 100 * 1. Using a custom task type, ScheduledFutureTask for 101 * tasks, even those that don't require scheduling (i.e., 102 * those submitted using ExecutorService execute, not 103 * ScheduledExecutorService methods) which are treated as 104 * delayed tasks with a delay of zero. 105 * 106 * 2. Using a custom queue (DelayedWorkQueue), a variant of 107 * unbounded DelayQueue. The lack of capacity constraint and 108 * the fact that corePoolSize and maximumPoolSize are 109 * effectively identical simplifies some execution mechanics 110 * (see delayedExecute) compared to ThreadPoolExecutor. 111 * 112 * 3. Supporting optional run-after-shutdown parameters, which 113 * leads to overrides of shutdown methods to remove and cancel 114 * tasks that should NOT be run after shutdown, as well as 115 * different recheck logic when task (re)submission overlaps 116 * with a shutdown. 117 * 118 * 4. Task decoration methods to allow interception and 119 * instrumentation, which are needed because subclasses cannot 120 * otherwise override submit methods to get this effect. These 121 * don't have any impact on pool control logic though. 122 */ 123 124 /** 125 * False if should cancel/suppress periodic tasks on shutdown. 126 */ 127 private volatile boolean continueExistingPeriodicTasksAfterShutdown; 128 129 /** 130 * False if should cancel non-periodic tasks on shutdown. 131 */ 132 private volatile boolean executeExistingDelayedTasksAfterShutdown = true; 133 134 /** 135 * True if ScheduledFutureTask.cancel should remove from queue 136 */ 137 private volatile boolean removeOnCancel = false; 138 139 /** 140 * Sequence number to break scheduling ties, and in turn to 141 * guarantee FIFO order among tied entries. 142 */ 143 private static final AtomicLong sequencer = new AtomicLong(); 144 145 /** 146 * Returns current nanosecond time. 147 */ now()148 final long now() { 149 return System.nanoTime(); 150 } 151 152 private class ScheduledFutureTask<V> 153 extends FutureTask<V> implements RunnableScheduledFuture<V> { 154 155 /** Sequence number to break ties FIFO */ 156 private final long sequenceNumber; 157 158 /** The time the task is enabled to execute in nanoTime units */ 159 private long time; 160 161 /** 162 * Period in nanoseconds for repeating tasks. A positive 163 * value indicates fixed-rate execution. A negative value 164 * indicates fixed-delay execution. A value of 0 indicates a 165 * non-repeating task. 166 */ 167 private final long period; 168 169 /** The actual task to be re-enqueued by reExecutePeriodic */ 170 RunnableScheduledFuture<V> outerTask = this; 171 172 /** 173 * Index into delay queue, to support faster cancellation. 174 */ 175 int heapIndex; 176 177 /** 178 * Creates a one-shot action with given nanoTime-based trigger time. 179 */ ScheduledFutureTask(Runnable r, V result, long ns)180 ScheduledFutureTask(Runnable r, V result, long ns) { 181 super(r, result); 182 this.time = ns; 183 this.period = 0; 184 this.sequenceNumber = sequencer.getAndIncrement(); 185 } 186 187 /** 188 * Creates a periodic action with given nano time and period. 189 */ ScheduledFutureTask(Runnable r, V result, long ns, long period)190 ScheduledFutureTask(Runnable r, V result, long ns, long period) { 191 super(r, result); 192 this.time = ns; 193 this.period = period; 194 this.sequenceNumber = sequencer.getAndIncrement(); 195 } 196 197 /** 198 * Creates a one-shot action with given nanoTime-based trigger time. 199 */ ScheduledFutureTask(Callable<V> callable, long ns)200 ScheduledFutureTask(Callable<V> callable, long ns) { 201 super(callable); 202 this.time = ns; 203 this.period = 0; 204 this.sequenceNumber = sequencer.getAndIncrement(); 205 } 206 getDelay(TimeUnit unit)207 public long getDelay(TimeUnit unit) { 208 return unit.convert(time - now(), NANOSECONDS); 209 } 210 compareTo(Delayed other)211 public int compareTo(Delayed other) { 212 if (other == this) // compare zero if same object 213 return 0; 214 if (other instanceof ScheduledFutureTask) { 215 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; 216 long diff = time - x.time; 217 if (diff < 0) 218 return -1; 219 else if (diff > 0) 220 return 1; 221 else if (sequenceNumber < x.sequenceNumber) 222 return -1; 223 else 224 return 1; 225 } 226 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); 227 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; 228 } 229 230 /** 231 * Returns {@code true} if this is a periodic (not a one-shot) action. 232 * 233 * @return {@code true} if periodic 234 */ isPeriodic()235 public boolean isPeriodic() { 236 return period != 0; 237 } 238 239 /** 240 * Sets the next time to run for a periodic task. 241 */ setNextRunTime()242 private void setNextRunTime() { 243 long p = period; 244 if (p > 0) 245 time += p; 246 else 247 time = triggerTime(-p); 248 } 249 cancel(boolean mayInterruptIfRunning)250 public boolean cancel(boolean mayInterruptIfRunning) { 251 boolean cancelled = super.cancel(mayInterruptIfRunning); 252 if (cancelled && removeOnCancel && heapIndex >= 0) 253 remove(this); 254 return cancelled; 255 } 256 257 /** 258 * Overrides FutureTask version so as to reset/requeue if periodic. 259 */ run()260 public void run() { 261 boolean periodic = isPeriodic(); 262 if (!canRunInCurrentRunState(periodic)) 263 cancel(false); 264 else if (!periodic) 265 ScheduledFutureTask.super.run(); 266 else if (ScheduledFutureTask.super.runAndReset()) { 267 setNextRunTime(); 268 reExecutePeriodic(outerTask); 269 } 270 } 271 } 272 273 /** 274 * Returns true if can run a task given current run state 275 * and run-after-shutdown parameters. 276 * 277 * @param periodic true if this task periodic, false if delayed 278 */ canRunInCurrentRunState(boolean periodic)279 boolean canRunInCurrentRunState(boolean periodic) { 280 return isRunningOrShutdown(periodic ? 281 continueExistingPeriodicTasksAfterShutdown : 282 executeExistingDelayedTasksAfterShutdown); 283 } 284 285 /** 286 * Main execution method for delayed or periodic tasks. If pool 287 * is shut down, rejects the task. Otherwise adds task to queue 288 * and starts a thread, if necessary, to run it. (We cannot 289 * prestart the thread to run the task because the task (probably) 290 * shouldn't be run yet.) If the pool is shut down while the task 291 * is being added, cancel and remove it if required by state and 292 * run-after-shutdown parameters. 293 * 294 * @param task the task 295 */ delayedExecute(RunnableScheduledFuture<?> task)296 private void delayedExecute(RunnableScheduledFuture<?> task) { 297 if (isShutdown()) 298 reject(task); 299 else { 300 super.getQueue().add(task); 301 if (isShutdown() && 302 !canRunInCurrentRunState(task.isPeriodic()) && 303 remove(task)) 304 task.cancel(false); 305 else 306 ensurePrestart(); 307 } 308 } 309 310 /** 311 * Requeues a periodic task unless current run state precludes it. 312 * Same idea as delayedExecute except drops task rather than rejecting. 313 * 314 * @param task the task 315 */ reExecutePeriodic(RunnableScheduledFuture<?> task)316 void reExecutePeriodic(RunnableScheduledFuture<?> task) { 317 if (canRunInCurrentRunState(true)) { 318 super.getQueue().add(task); 319 if (!canRunInCurrentRunState(true) && remove(task)) 320 task.cancel(false); 321 else 322 ensurePrestart(); 323 } 324 } 325 326 /** 327 * Cancels and clears the queue of all tasks that should not be run 328 * due to shutdown policy. Invoked within super.shutdown. 329 */ onShutdown()330 @Override void onShutdown() { 331 BlockingQueue<Runnable> q = super.getQueue(); 332 boolean keepDelayed = 333 getExecuteExistingDelayedTasksAfterShutdownPolicy(); 334 boolean keepPeriodic = 335 getContinueExistingPeriodicTasksAfterShutdownPolicy(); 336 if (!keepDelayed && !keepPeriodic) { 337 for (Object e : q.toArray()) 338 if (e instanceof RunnableScheduledFuture<?>) 339 ((RunnableScheduledFuture<?>) e).cancel(false); 340 q.clear(); 341 } 342 else { 343 // Traverse snapshot to avoid iterator exceptions 344 for (Object e : q.toArray()) { 345 if (e instanceof RunnableScheduledFuture) { 346 RunnableScheduledFuture<?> t = 347 (RunnableScheduledFuture<?>)e; 348 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || 349 t.isCancelled()) { // also remove if already cancelled 350 if (q.remove(t)) 351 t.cancel(false); 352 } 353 } 354 } 355 } 356 tryTerminate(); 357 } 358 359 /** 360 * Modifies or replaces the task used to execute a runnable. 361 * This method can be used to override the concrete 362 * class used for managing internal tasks. 363 * The default implementation simply returns the given task. 364 * 365 * @param runnable the submitted Runnable 366 * @param task the task created to execute the runnable 367 * @return a task that can execute the runnable 368 * @since 1.6 369 */ decorateTask( Runnable runnable, RunnableScheduledFuture<V> task)370 protected <V> RunnableScheduledFuture<V> decorateTask( 371 Runnable runnable, RunnableScheduledFuture<V> task) { 372 return task; 373 } 374 375 /** 376 * Modifies or replaces the task used to execute a callable. 377 * This method can be used to override the concrete 378 * class used for managing internal tasks. 379 * The default implementation simply returns the given task. 380 * 381 * @param callable the submitted Callable 382 * @param task the task created to execute the callable 383 * @return a task that can execute the callable 384 * @since 1.6 385 */ decorateTask( Callable<V> callable, RunnableScheduledFuture<V> task)386 protected <V> RunnableScheduledFuture<V> decorateTask( 387 Callable<V> callable, RunnableScheduledFuture<V> task) { 388 return task; 389 } 390 391 /** 392 * Creates a new {@code ScheduledThreadPoolExecutor} with the 393 * given core pool size. 394 * 395 * @param corePoolSize the number of threads to keep in the pool, even 396 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 397 * @throws IllegalArgumentException if {@code corePoolSize < 0} 398 */ ScheduledThreadPoolExecutor(int corePoolSize)399 public ScheduledThreadPoolExecutor(int corePoolSize) { 400 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 401 new DelayedWorkQueue()); 402 } 403 404 /** 405 * Creates a new {@code ScheduledThreadPoolExecutor} with the 406 * given initial parameters. 407 * 408 * @param corePoolSize the number of threads to keep in the pool, even 409 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 410 * @param threadFactory the factory to use when the executor 411 * creates a new thread 412 * @throws IllegalArgumentException if {@code corePoolSize < 0} 413 * @throws NullPointerException if {@code threadFactory} is null 414 */ ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)415 public ScheduledThreadPoolExecutor(int corePoolSize, 416 ThreadFactory threadFactory) { 417 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 418 new DelayedWorkQueue(), threadFactory); 419 } 420 421 /** 422 * Creates a new ScheduledThreadPoolExecutor with the given 423 * initial parameters. 424 * 425 * @param corePoolSize the number of threads to keep in the pool, even 426 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 427 * @param handler the handler to use when execution is blocked 428 * because the thread bounds and queue capacities are reached 429 * @throws IllegalArgumentException if {@code corePoolSize < 0} 430 * @throws NullPointerException if {@code handler} is null 431 */ ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)432 public ScheduledThreadPoolExecutor(int corePoolSize, 433 RejectedExecutionHandler handler) { 434 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 435 new DelayedWorkQueue(), handler); 436 } 437 438 /** 439 * Creates a new ScheduledThreadPoolExecutor with the given 440 * initial parameters. 441 * 442 * @param corePoolSize the number of threads to keep in the pool, even 443 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 444 * @param threadFactory the factory to use when the executor 445 * creates a new thread 446 * @param handler the handler to use when execution is blocked 447 * because the thread bounds and queue capacities are reached 448 * @throws IllegalArgumentException if {@code corePoolSize < 0} 449 * @throws NullPointerException if {@code threadFactory} or 450 * {@code handler} is null 451 */ ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)452 public ScheduledThreadPoolExecutor(int corePoolSize, 453 ThreadFactory threadFactory, 454 RejectedExecutionHandler handler) { 455 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 456 new DelayedWorkQueue(), threadFactory, handler); 457 } 458 459 /** 460 * Returns the trigger time of a delayed action. 461 */ triggerTime(long delay, TimeUnit unit)462 private long triggerTime(long delay, TimeUnit unit) { 463 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); 464 } 465 466 /** 467 * Returns the trigger time of a delayed action. 468 */ triggerTime(long delay)469 long triggerTime(long delay) { 470 return now() + 471 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); 472 } 473 474 /** 475 * Constrains the values of all delays in the queue to be within 476 * Long.MAX_VALUE of each other, to avoid overflow in compareTo. 477 * This may occur if a task is eligible to be dequeued, but has 478 * not yet been, while some other task is added with a delay of 479 * Long.MAX_VALUE. 480 */ overflowFree(long delay)481 private long overflowFree(long delay) { 482 Delayed head = (Delayed) super.getQueue().peek(); 483 if (head != null) { 484 long headDelay = head.getDelay(NANOSECONDS); 485 if (headDelay < 0 && (delay - headDelay < 0)) 486 delay = Long.MAX_VALUE + headDelay; 487 } 488 return delay; 489 } 490 491 /** 492 * @throws RejectedExecutionException {@inheritDoc} 493 * @throws NullPointerException {@inheritDoc} 494 */ schedule(Runnable command, long delay, TimeUnit unit)495 public ScheduledFuture<?> schedule(Runnable command, 496 long delay, 497 TimeUnit unit) { 498 if (command == null || unit == null) 499 throw new NullPointerException(); 500 RunnableScheduledFuture<?> t = decorateTask(command, 501 new ScheduledFutureTask<Void>(command, null, 502 triggerTime(delay, unit))); 503 delayedExecute(t); 504 return t; 505 } 506 507 /** 508 * @throws RejectedExecutionException {@inheritDoc} 509 * @throws NullPointerException {@inheritDoc} 510 */ schedule(Callable<V> callable, long delay, TimeUnit unit)511 public <V> ScheduledFuture<V> schedule(Callable<V> callable, 512 long delay, 513 TimeUnit unit) { 514 if (callable == null || unit == null) 515 throw new NullPointerException(); 516 RunnableScheduledFuture<V> t = decorateTask(callable, 517 new ScheduledFutureTask<V>(callable, 518 triggerTime(delay, unit))); 519 delayedExecute(t); 520 return t; 521 } 522 523 /** 524 * @throws RejectedExecutionException {@inheritDoc} 525 * @throws NullPointerException {@inheritDoc} 526 * @throws IllegalArgumentException {@inheritDoc} 527 */ scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)528 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, 529 long initialDelay, 530 long period, 531 TimeUnit unit) { 532 if (command == null || unit == null) 533 throw new NullPointerException(); 534 if (period <= 0) 535 throw new IllegalArgumentException(); 536 ScheduledFutureTask<Void> sft = 537 new ScheduledFutureTask<Void>(command, 538 null, 539 triggerTime(initialDelay, unit), 540 unit.toNanos(period)); 541 RunnableScheduledFuture<Void> t = decorateTask(command, sft); 542 sft.outerTask = t; 543 delayedExecute(t); 544 return t; 545 } 546 547 /** 548 * @throws RejectedExecutionException {@inheritDoc} 549 * @throws NullPointerException {@inheritDoc} 550 * @throws IllegalArgumentException {@inheritDoc} 551 */ scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)552 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 553 long initialDelay, 554 long delay, 555 TimeUnit unit) { 556 if (command == null || unit == null) 557 throw new NullPointerException(); 558 if (delay <= 0) 559 throw new IllegalArgumentException(); 560 ScheduledFutureTask<Void> sft = 561 new ScheduledFutureTask<Void>(command, 562 null, 563 triggerTime(initialDelay, unit), 564 unit.toNanos(-delay)); 565 RunnableScheduledFuture<Void> t = decorateTask(command, sft); 566 sft.outerTask = t; 567 delayedExecute(t); 568 return t; 569 } 570 571 /** 572 * Executes {@code command} with zero required delay. 573 * This has effect equivalent to 574 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}. 575 * Note that inspections of the queue and of the list returned by 576 * {@code shutdownNow} will access the zero-delayed 577 * {@link ScheduledFuture}, not the {@code command} itself. 578 * 579 * <p>A consequence of the use of {@code ScheduledFuture} objects is 580 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always 581 * called with a null second {@code Throwable} argument, even if the 582 * {@code command} terminated abruptly. Instead, the {@code Throwable} 583 * thrown by such a task can be obtained via {@link Future#get}. 584 * 585 * @throws RejectedExecutionException at discretion of 586 * {@code RejectedExecutionHandler}, if the task 587 * cannot be accepted for execution because the 588 * executor has been shut down 589 * @throws NullPointerException {@inheritDoc} 590 */ execute(Runnable command)591 public void execute(Runnable command) { 592 schedule(command, 0, NANOSECONDS); 593 } 594 595 // Override AbstractExecutorService methods 596 597 /** 598 * @throws RejectedExecutionException {@inheritDoc} 599 * @throws NullPointerException {@inheritDoc} 600 */ submit(Runnable task)601 public Future<?> submit(Runnable task) { 602 return schedule(task, 0, NANOSECONDS); 603 } 604 605 /** 606 * @throws RejectedExecutionException {@inheritDoc} 607 * @throws NullPointerException {@inheritDoc} 608 */ submit(Runnable task, T result)609 public <T> Future<T> submit(Runnable task, T result) { 610 return schedule(Executors.callable(task, result), 0, NANOSECONDS); 611 } 612 613 /** 614 * @throws RejectedExecutionException {@inheritDoc} 615 * @throws NullPointerException {@inheritDoc} 616 */ submit(Callable<T> task)617 public <T> Future<T> submit(Callable<T> task) { 618 return schedule(task, 0, NANOSECONDS); 619 } 620 621 /** 622 * Sets the policy on whether to continue executing existing 623 * periodic tasks even when this executor has been {@code shutdown}. 624 * In this case, these tasks will only terminate upon 625 * {@code shutdownNow} or after setting the policy to 626 * {@code false} when already shutdown. 627 * This value is by default {@code false}. 628 * 629 * @param value if {@code true}, continue after shutdown, else don't 630 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy 631 */ setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)632 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) { 633 continueExistingPeriodicTasksAfterShutdown = value; 634 if (!value && isShutdown()) 635 onShutdown(); 636 } 637 638 /** 639 * Gets the policy on whether to continue executing existing 640 * periodic tasks even when this executor has been {@code shutdown}. 641 * In this case, these tasks will only terminate upon 642 * {@code shutdownNow} or after setting the policy to 643 * {@code false} when already shutdown. 644 * This value is by default {@code false}. 645 * 646 * @return {@code true} if will continue after shutdown 647 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy 648 */ getContinueExistingPeriodicTasksAfterShutdownPolicy()649 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() { 650 return continueExistingPeriodicTasksAfterShutdown; 651 } 652 653 /** 654 * Sets the policy on whether to execute existing delayed 655 * tasks even when this executor has been {@code shutdown}. 656 * In this case, these tasks will only terminate upon 657 * {@code shutdownNow}, or after setting the policy to 658 * {@code false} when already shutdown. 659 * This value is by default {@code true}. 660 * 661 * @param value if {@code true}, execute after shutdown, else don't 662 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy 663 */ setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value)664 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) { 665 executeExistingDelayedTasksAfterShutdown = value; 666 if (!value && isShutdown()) 667 onShutdown(); 668 } 669 670 /** 671 * Gets the policy on whether to execute existing delayed 672 * tasks even when this executor has been {@code shutdown}. 673 * In this case, these tasks will only terminate upon 674 * {@code shutdownNow}, or after setting the policy to 675 * {@code false} when already shutdown. 676 * This value is by default {@code true}. 677 * 678 * @return {@code true} if will execute after shutdown 679 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy 680 */ getExecuteExistingDelayedTasksAfterShutdownPolicy()681 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() { 682 return executeExistingDelayedTasksAfterShutdown; 683 } 684 685 /** 686 * Sets the policy on whether cancelled tasks should be immediately 687 * removed from the work queue at time of cancellation. This value is 688 * by default {@code false}. 689 * 690 * @param value if {@code true}, remove on cancellation, else don't 691 * @see #getRemoveOnCancelPolicy 692 * @since 1.7 693 */ setRemoveOnCancelPolicy(boolean value)694 public void setRemoveOnCancelPolicy(boolean value) { 695 removeOnCancel = value; 696 } 697 698 /** 699 * Gets the policy on whether cancelled tasks should be immediately 700 * removed from the work queue at time of cancellation. This value is 701 * by default {@code false}. 702 * 703 * @return {@code true} if cancelled tasks are immediately removed 704 * from the queue 705 * @see #setRemoveOnCancelPolicy 706 * @since 1.7 707 */ getRemoveOnCancelPolicy()708 public boolean getRemoveOnCancelPolicy() { 709 return removeOnCancel; 710 } 711 712 /** 713 * Initiates an orderly shutdown in which previously submitted 714 * tasks are executed, but no new tasks will be accepted. 715 * Invocation has no additional effect if already shut down. 716 * 717 * <p>This method does not wait for previously submitted tasks to 718 * complete execution. Use {@link #awaitTermination awaitTermination} 719 * to do that. 720 * 721 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} 722 * has been set {@code false}, existing delayed tasks whose delays 723 * have not yet elapsed are cancelled. And unless the {@code 724 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set 725 * {@code true}, future executions of existing periodic tasks will 726 * be cancelled. 727 */ shutdown()728 public void shutdown() { 729 super.shutdown(); 730 } 731 732 /** 733 * Attempts to stop all actively executing tasks, halts the 734 * processing of waiting tasks, and returns a list of the tasks 735 * that were awaiting execution. 736 * 737 * <p>This method does not wait for actively executing tasks to 738 * terminate. Use {@link #awaitTermination awaitTermination} to 739 * do that. 740 * 741 * <p>There are no guarantees beyond best-effort attempts to stop 742 * processing actively executing tasks. This implementation 743 * cancels tasks via {@link Thread#interrupt}, so any task that 744 * fails to respond to interrupts may never terminate. 745 * 746 * @return list of tasks that never commenced execution. 747 * Each element of this list is a {@link ScheduledFuture}, 748 * including those tasks submitted using {@code execute}, 749 * which are for scheduling purposes used as the basis of a 750 * zero-delay {@code ScheduledFuture}. 751 */ shutdownNow()752 public List<Runnable> shutdownNow() { 753 return super.shutdownNow(); 754 } 755 756 /** 757 * Returns the task queue used by this executor. Each element of 758 * this queue is a {@link ScheduledFuture}, including those 759 * tasks submitted using {@code execute} which are for scheduling 760 * purposes used as the basis of a zero-delay 761 * {@code ScheduledFuture}. Iteration over this queue is 762 * <em>not</em> guaranteed to traverse tasks in the order in 763 * which they will execute. 764 * 765 * @return the task queue 766 */ getQueue()767 public BlockingQueue<Runnable> getQueue() { 768 return super.getQueue(); 769 } 770 771 /** 772 * Specialized delay queue. To mesh with TPE declarations, this 773 * class must be declared as a BlockingQueue<Runnable> even though 774 * it can only hold RunnableScheduledFutures. 775 */ 776 static class DelayedWorkQueue extends AbstractQueue<Runnable> 777 implements BlockingQueue<Runnable> { 778 779 /* 780 * A DelayedWorkQueue is based on a heap-based data structure 781 * like those in DelayQueue and PriorityQueue, except that 782 * every ScheduledFutureTask also records its index into the 783 * heap array. This eliminates the need to find a task upon 784 * cancellation, greatly speeding up removal (down from O(n) 785 * to O(log n)), and reducing garbage retention that would 786 * otherwise occur by waiting for the element to rise to top 787 * before clearing. But because the queue may also hold 788 * RunnableScheduledFutures that are not ScheduledFutureTasks, 789 * we are not guaranteed to have such indices available, in 790 * which case we fall back to linear search. (We expect that 791 * most tasks will not be decorated, and that the faster cases 792 * will be much more common.) 793 * 794 * All heap operations must record index changes -- mainly 795 * within siftUp and siftDown. Upon removal, a task's 796 * heapIndex is set to -1. Note that ScheduledFutureTasks can 797 * appear at most once in the queue (this need not be true for 798 * other kinds of tasks or work queues), so are uniquely 799 * identified by heapIndex. 800 */ 801 802 private static final int INITIAL_CAPACITY = 16; 803 private RunnableScheduledFuture<?>[] queue = 804 new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; 805 private final ReentrantLock lock = new ReentrantLock(); 806 private int size = 0; 807 808 /** 809 * Thread designated to wait for the task at the head of the 810 * queue. This variant of the Leader-Follower pattern 811 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to 812 * minimize unnecessary timed waiting. When a thread becomes 813 * the leader, it waits only for the next delay to elapse, but 814 * other threads await indefinitely. The leader thread must 815 * signal some other thread before returning from take() or 816 * poll(...), unless some other thread becomes leader in the 817 * interim. Whenever the head of the queue is replaced with a 818 * task with an earlier expiration time, the leader field is 819 * invalidated by being reset to null, and some waiting 820 * thread, but not necessarily the current leader, is 821 * signalled. So waiting threads must be prepared to acquire 822 * and lose leadership while waiting. 823 */ 824 private Thread leader = null; 825 826 /** 827 * Condition signalled when a newer task becomes available at the 828 * head of the queue or a new thread may need to become leader. 829 */ 830 private final Condition available = lock.newCondition(); 831 832 /** 833 * Sets f's heapIndex if it is a ScheduledFutureTask. 834 */ setIndex(RunnableScheduledFuture<?> f, int idx)835 private void setIndex(RunnableScheduledFuture<?> f, int idx) { 836 if (f instanceof ScheduledFutureTask) 837 ((ScheduledFutureTask)f).heapIndex = idx; 838 } 839 840 /** 841 * Sifts element added at bottom up to its heap-ordered spot. 842 * Call only when holding lock. 843 */ siftUp(int k, RunnableScheduledFuture<?> key)844 private void siftUp(int k, RunnableScheduledFuture<?> key) { 845 while (k > 0) { 846 int parent = (k - 1) >>> 1; 847 RunnableScheduledFuture<?> e = queue[parent]; 848 if (key.compareTo(e) >= 0) 849 break; 850 queue[k] = e; 851 setIndex(e, k); 852 k = parent; 853 } 854 queue[k] = key; 855 setIndex(key, k); 856 } 857 858 /** 859 * Sifts element added at top down to its heap-ordered spot. 860 * Call only when holding lock. 861 */ siftDown(int k, RunnableScheduledFuture<?> key)862 private void siftDown(int k, RunnableScheduledFuture<?> key) { 863 int half = size >>> 1; 864 while (k < half) { 865 int child = (k << 1) + 1; 866 RunnableScheduledFuture<?> c = queue[child]; 867 int right = child + 1; 868 if (right < size && c.compareTo(queue[right]) > 0) 869 c = queue[child = right]; 870 if (key.compareTo(c) <= 0) 871 break; 872 queue[k] = c; 873 setIndex(c, k); 874 k = child; 875 } 876 queue[k] = key; 877 setIndex(key, k); 878 } 879 880 /** 881 * Resizes the heap array. Call only when holding lock. 882 */ grow()883 private void grow() { 884 int oldCapacity = queue.length; 885 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% 886 if (newCapacity < 0) // overflow 887 newCapacity = Integer.MAX_VALUE; 888 queue = Arrays.copyOf(queue, newCapacity); 889 } 890 891 /** 892 * Finds index of given object, or -1 if absent. 893 */ indexOf(Object x)894 private int indexOf(Object x) { 895 if (x != null) { 896 if (x instanceof ScheduledFutureTask) { 897 int i = ((ScheduledFutureTask) x).heapIndex; 898 // Sanity check; x could conceivably be a 899 // ScheduledFutureTask from some other pool. 900 if (i >= 0 && i < size && queue[i] == x) 901 return i; 902 } else { 903 for (int i = 0; i < size; i++) 904 if (x.equals(queue[i])) 905 return i; 906 } 907 } 908 return -1; 909 } 910 contains(Object x)911 public boolean contains(Object x) { 912 final ReentrantLock lock = this.lock; 913 lock.lock(); 914 try { 915 return indexOf(x) != -1; 916 } finally { 917 lock.unlock(); 918 } 919 } 920 remove(Object x)921 public boolean remove(Object x) { 922 final ReentrantLock lock = this.lock; 923 lock.lock(); 924 try { 925 int i = indexOf(x); 926 if (i < 0) 927 return false; 928 929 setIndex(queue[i], -1); 930 int s = --size; 931 RunnableScheduledFuture<?> replacement = queue[s]; 932 queue[s] = null; 933 if (s != i) { 934 siftDown(i, replacement); 935 if (queue[i] == replacement) 936 siftUp(i, replacement); 937 } 938 return true; 939 } finally { 940 lock.unlock(); 941 } 942 } 943 size()944 public int size() { 945 final ReentrantLock lock = this.lock; 946 lock.lock(); 947 try { 948 return size; 949 } finally { 950 lock.unlock(); 951 } 952 } 953 isEmpty()954 public boolean isEmpty() { 955 return size() == 0; 956 } 957 remainingCapacity()958 public int remainingCapacity() { 959 return Integer.MAX_VALUE; 960 } 961 peek()962 public RunnableScheduledFuture<?> peek() { 963 final ReentrantLock lock = this.lock; 964 lock.lock(); 965 try { 966 return queue[0]; 967 } finally { 968 lock.unlock(); 969 } 970 } 971 offer(Runnable x)972 public boolean offer(Runnable x) { 973 if (x == null) 974 throw new NullPointerException(); 975 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; 976 final ReentrantLock lock = this.lock; 977 lock.lock(); 978 try { 979 int i = size; 980 if (i >= queue.length) 981 grow(); 982 size = i + 1; 983 if (i == 0) { 984 queue[0] = e; 985 setIndex(e, 0); 986 } else { 987 siftUp(i, e); 988 } 989 if (queue[0] == e) { 990 leader = null; 991 available.signal(); 992 } 993 } finally { 994 lock.unlock(); 995 } 996 return true; 997 } 998 put(Runnable e)999 public void put(Runnable e) { 1000 offer(e); 1001 } 1002 add(Runnable e)1003 public boolean add(Runnable e) { 1004 return offer(e); 1005 } 1006 offer(Runnable e, long timeout, TimeUnit unit)1007 public boolean offer(Runnable e, long timeout, TimeUnit unit) { 1008 return offer(e); 1009 } 1010 1011 /** 1012 * Performs common bookkeeping for poll and take: Replaces 1013 * first element with last and sifts it down. Call only when 1014 * holding lock. 1015 * @param f the task to remove and return 1016 */ finishPoll(RunnableScheduledFuture<?> f)1017 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { 1018 int s = --size; 1019 RunnableScheduledFuture<?> x = queue[s]; 1020 queue[s] = null; 1021 if (s != 0) 1022 siftDown(0, x); 1023 setIndex(f, -1); 1024 return f; 1025 } 1026 poll()1027 public RunnableScheduledFuture<?> poll() { 1028 final ReentrantLock lock = this.lock; 1029 lock.lock(); 1030 try { 1031 RunnableScheduledFuture<?> first = queue[0]; 1032 if (first == null || first.getDelay(NANOSECONDS) > 0) 1033 return null; 1034 else 1035 return finishPoll(first); 1036 } finally { 1037 lock.unlock(); 1038 } 1039 } 1040 take()1041 public RunnableScheduledFuture<?> take() throws InterruptedException { 1042 final ReentrantLock lock = this.lock; 1043 lock.lockInterruptibly(); 1044 try { 1045 for (;;) { 1046 RunnableScheduledFuture<?> first = queue[0]; 1047 if (first == null) 1048 available.await(); 1049 else { 1050 long delay = first.getDelay(NANOSECONDS); 1051 if (delay <= 0) 1052 return finishPoll(first); 1053 first = null; // don't retain ref while waiting 1054 if (leader != null) 1055 available.await(); 1056 else { 1057 Thread thisThread = Thread.currentThread(); 1058 leader = thisThread; 1059 try { 1060 available.awaitNanos(delay); 1061 } finally { 1062 if (leader == thisThread) 1063 leader = null; 1064 } 1065 } 1066 } 1067 } 1068 } finally { 1069 if (leader == null && queue[0] != null) 1070 available.signal(); 1071 lock.unlock(); 1072 } 1073 } 1074 poll(long timeout, TimeUnit unit)1075 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) 1076 throws InterruptedException { 1077 long nanos = unit.toNanos(timeout); 1078 final ReentrantLock lock = this.lock; 1079 lock.lockInterruptibly(); 1080 try { 1081 for (;;) { 1082 RunnableScheduledFuture<?> first = queue[0]; 1083 if (first == null) { 1084 if (nanos <= 0) 1085 return null; 1086 else 1087 nanos = available.awaitNanos(nanos); 1088 } else { 1089 long delay = first.getDelay(NANOSECONDS); 1090 if (delay <= 0) 1091 return finishPoll(first); 1092 if (nanos <= 0) 1093 return null; 1094 first = null; // don't retain ref while waiting 1095 if (nanos < delay || leader != null) 1096 nanos = available.awaitNanos(nanos); 1097 else { 1098 Thread thisThread = Thread.currentThread(); 1099 leader = thisThread; 1100 try { 1101 long timeLeft = available.awaitNanos(delay); 1102 nanos -= delay - timeLeft; 1103 } finally { 1104 if (leader == thisThread) 1105 leader = null; 1106 } 1107 } 1108 } 1109 } 1110 } finally { 1111 if (leader == null && queue[0] != null) 1112 available.signal(); 1113 lock.unlock(); 1114 } 1115 } 1116 clear()1117 public void clear() { 1118 final ReentrantLock lock = this.lock; 1119 lock.lock(); 1120 try { 1121 for (int i = 0; i < size; i++) { 1122 RunnableScheduledFuture<?> t = queue[i]; 1123 if (t != null) { 1124 queue[i] = null; 1125 setIndex(t, -1); 1126 } 1127 } 1128 size = 0; 1129 } finally { 1130 lock.unlock(); 1131 } 1132 } 1133 1134 /** 1135 * Returns first element only if it is expired. 1136 * Used only by drainTo. Call only when holding lock. 1137 */ peekExpired()1138 private RunnableScheduledFuture<?> peekExpired() { 1139 // assert lock.isHeldByCurrentThread(); 1140 RunnableScheduledFuture<?> first = queue[0]; 1141 return (first == null || first.getDelay(NANOSECONDS) > 0) ? 1142 null : first; 1143 } 1144 drainTo(Collection<? super Runnable> c)1145 public int drainTo(Collection<? super Runnable> c) { 1146 if (c == null) 1147 throw new NullPointerException(); 1148 if (c == this) 1149 throw new IllegalArgumentException(); 1150 final ReentrantLock lock = this.lock; 1151 lock.lock(); 1152 try { 1153 RunnableScheduledFuture<?> first; 1154 int n = 0; 1155 while ((first = peekExpired()) != null) { 1156 c.add(first); // In this order, in case add() throws. 1157 finishPoll(first); 1158 ++n; 1159 } 1160 return n; 1161 } finally { 1162 lock.unlock(); 1163 } 1164 } 1165 drainTo(Collection<? super Runnable> c, int maxElements)1166 public int drainTo(Collection<? super Runnable> c, int maxElements) { 1167 if (c == null) 1168 throw new NullPointerException(); 1169 if (c == this) 1170 throw new IllegalArgumentException(); 1171 if (maxElements <= 0) 1172 return 0; 1173 final ReentrantLock lock = this.lock; 1174 lock.lock(); 1175 try { 1176 RunnableScheduledFuture<?> first; 1177 int n = 0; 1178 while (n < maxElements && (first = peekExpired()) != null) { 1179 c.add(first); // In this order, in case add() throws. 1180 finishPoll(first); 1181 ++n; 1182 } 1183 return n; 1184 } finally { 1185 lock.unlock(); 1186 } 1187 } 1188 toArray()1189 public Object[] toArray() { 1190 final ReentrantLock lock = this.lock; 1191 lock.lock(); 1192 try { 1193 return Arrays.copyOf(queue, size, Object[].class); 1194 } finally { 1195 lock.unlock(); 1196 } 1197 } 1198 1199 @SuppressWarnings("unchecked") toArray(T[] a)1200 public <T> T[] toArray(T[] a) { 1201 final ReentrantLock lock = this.lock; 1202 lock.lock(); 1203 try { 1204 if (a.length < size) 1205 return (T[]) Arrays.copyOf(queue, size, a.getClass()); 1206 System.arraycopy(queue, 0, a, 0, size); 1207 if (a.length > size) 1208 a[size] = null; 1209 return a; 1210 } finally { 1211 lock.unlock(); 1212 } 1213 } 1214 iterator()1215 public Iterator<Runnable> iterator() { 1216 return new Itr(Arrays.copyOf(queue, size)); 1217 } 1218 1219 /** 1220 * Snapshot iterator that works off copy of underlying q array. 1221 */ 1222 private class Itr implements Iterator<Runnable> { 1223 final RunnableScheduledFuture[] array; 1224 int cursor = 0; // index of next element to return 1225 int lastRet = -1; // index of last element, or -1 if no such 1226 Itr(RunnableScheduledFuture[] array)1227 Itr(RunnableScheduledFuture[] array) { 1228 this.array = array; 1229 } 1230 hasNext()1231 public boolean hasNext() { 1232 return cursor < array.length; 1233 } 1234 next()1235 public Runnable next() { 1236 if (cursor >= array.length) 1237 throw new NoSuchElementException(); 1238 lastRet = cursor; 1239 return array[cursor++]; 1240 } 1241 remove()1242 public void remove() { 1243 if (lastRet < 0) 1244 throw new IllegalStateException(); 1245 DelayedWorkQueue.this.remove(array[lastRet]); 1246 lastRet = -1; 1247 } 1248 } 1249 } 1250 } 1251