1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import static java.util.concurrent.TimeUnit.MILLISECONDS; 39 import static java.util.concurrent.TimeUnit.NANOSECONDS; 40 41 import java.util.AbstractQueue; 42 import java.util.Arrays; 43 import java.util.Collection; 44 import java.util.Iterator; 45 import java.util.List; 46 import java.util.NoSuchElementException; 47 import java.util.Objects; 48 import java.util.concurrent.atomic.AtomicLong; 49 import java.util.concurrent.locks.Condition; 50 import java.util.concurrent.locks.ReentrantLock; 51 52 // BEGIN android-note 53 // omit class-level docs on setRemoveOnCancelPolicy() 54 // END android-note 55 56 /** 57 * A {@link ThreadPoolExecutor} that can additionally schedule 58 * commands to run after a given delay, or to execute periodically. 59 * This class is preferable to {@link java.util.Timer} when multiple 60 * worker threads are needed, or when the additional flexibility or 61 * capabilities of {@link ThreadPoolExecutor} (which this class 62 * extends) are required. 63 * 64 * <p>Delayed tasks execute no sooner than they are enabled, but 65 * without any real-time guarantees about when, after they are 66 * enabled, they will commence. Tasks scheduled for exactly the same 67 * execution time are enabled in first-in-first-out (FIFO) order of 68 * submission. 69 * 70 * <p>When a submitted task is cancelled before it is run, execution 71 * is suppressed. By default, such a cancelled task is not 72 * automatically removed from the work queue until its delay elapses. 73 * While this enables further inspection and monitoring, it may also 74 * cause unbounded retention of cancelled tasks. 75 * 76 * <p>Successive executions of a periodic task scheduled via 77 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or 78 * {@link #scheduleWithFixedDelay scheduleWithFixedDelay} 79 * do not overlap. While different executions may be performed by 80 * different threads, the effects of prior executions 81 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 82 * those of subsequent ones. 83 * 84 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few 85 * of the inherited tuning methods are not useful for it. In 86 * particular, because it acts as a fixed-sized pool using 87 * {@code corePoolSize} threads and an unbounded queue, adjustments 88 * to {@code maximumPoolSize} have no useful effect. Additionally, it 89 * is almost never a good idea to set {@code corePoolSize} to zero or 90 * use {@code allowCoreThreadTimeOut} because this may leave the pool 91 * without threads to handle tasks once they become eligible to run. 92 * 93 * <p>As with {@code ThreadPoolExecutor}, if not otherwise specified, 94 * this class uses {@link Executors#defaultThreadFactory} as the 95 * default thread factory, and {@link ThreadPoolExecutor.AbortPolicy} 96 * as the default rejected execution handler. 97 * 98 * <p><b>Extension notes:</b> This class overrides the 99 * {@link ThreadPoolExecutor#execute(Runnable) execute} and 100 * {@link AbstractExecutorService#submit(Runnable) submit} 101 * methods to generate internal {@link ScheduledFuture} objects to 102 * control per-task delays and scheduling. To preserve 103 * functionality, any further overrides of these methods in 104 * subclasses must invoke superclass versions, which effectively 105 * disables additional task customization. However, this class 106 * provides alternative protected extension method 107 * {@code decorateTask} (one version each for {@code Runnable} and 108 * {@code Callable}) that can be used to customize the concrete task 109 * types used to execute commands entered via {@code execute}, 110 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate}, 111 * and {@code scheduleWithFixedDelay}. By default, a 112 * {@code ScheduledThreadPoolExecutor} uses a task type extending 113 * {@link FutureTask}. However, this may be modified or replaced using 114 * subclasses of the form: 115 * 116 * <pre> {@code 117 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor { 118 * 119 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... } 120 * 121 * protected <V> RunnableScheduledFuture<V> decorateTask( 122 * Runnable r, RunnableScheduledFuture<V> task) { 123 * return new CustomTask<V>(r, task); 124 * } 125 * 126 * protected <V> RunnableScheduledFuture<V> decorateTask( 127 * Callable<V> c, RunnableScheduledFuture<V> task) { 128 * return new CustomTask<V>(c, task); 129 * } 130 * // ... add constructors, etc. 131 * }}</pre> 132 * 133 * @since 1.5 134 * @author Doug Lea 135 */ 136 public class ScheduledThreadPoolExecutor 137 extends ThreadPoolExecutor 138 implements ScheduledExecutorService { 139 140 /* 141 * This class specializes ThreadPoolExecutor implementation by 142 * 143 * 1. Using a custom task type ScheduledFutureTask, even for tasks 144 * that don't require scheduling because they are submitted 145 * using ExecutorService rather than ScheduledExecutorService 146 * methods, which are treated as tasks with a delay of zero. 147 * 148 * 2. Using a custom queue (DelayedWorkQueue), a variant of 149 * unbounded DelayQueue. The lack of capacity constraint and 150 * the fact that corePoolSize and maximumPoolSize are 151 * effectively identical simplifies some execution mechanics 152 * (see delayedExecute) compared to ThreadPoolExecutor. 153 * 154 * 3. Supporting optional run-after-shutdown parameters, which 155 * leads to overrides of shutdown methods to remove and cancel 156 * tasks that should NOT be run after shutdown, as well as 157 * different recheck logic when task (re)submission overlaps 158 * with a shutdown. 159 * 160 * 4. Task decoration methods to allow interception and 161 * instrumentation, which are needed because subclasses cannot 162 * otherwise override submit methods to get this effect. These 163 * don't have any impact on pool control logic though. 164 */ 165 166 /** 167 * False if should cancel/suppress periodic tasks on shutdown. 168 */ 169 private volatile boolean continueExistingPeriodicTasksAfterShutdown; 170 171 // Android-changed: Preserving behaviour on expired tasks (b/202927404) 172 /** 173 * False if should cancel non-periodic tasks on shutdown. 174 */ 175 private volatile boolean executeExistingDelayedTasksAfterShutdown = true; 176 177 /** 178 * True if ScheduledFutureTask.cancel should remove from queue. 179 */ 180 volatile boolean removeOnCancel; 181 182 /** 183 * Sequence number to break scheduling ties, and in turn to 184 * guarantee FIFO order among tied entries. 185 */ 186 private static final AtomicLong sequencer = new AtomicLong(); 187 188 private class ScheduledFutureTask<V> 189 extends FutureTask<V> implements RunnableScheduledFuture<V> { 190 191 /** Sequence number to break ties FIFO */ 192 private final long sequenceNumber; 193 194 /** The nanoTime-based time when the task is enabled to execute. */ 195 private volatile long time; 196 197 /** 198 * Period for repeating tasks, in nanoseconds. 199 * A positive value indicates fixed-rate execution. 200 * A negative value indicates fixed-delay execution. 201 * A value of 0 indicates a non-repeating (one-shot) task. 202 */ 203 private final long period; 204 205 /** The actual task to be re-enqueued by reExecutePeriodic */ 206 RunnableScheduledFuture<V> outerTask = this; 207 208 /** 209 * Index into delay queue, to support faster cancellation. 210 */ 211 int heapIndex; 212 213 /** 214 * Creates a one-shot action with given nanoTime-based trigger time. 215 */ ScheduledFutureTask(Runnable r, V result, long triggerTime, long sequenceNumber)216 ScheduledFutureTask(Runnable r, V result, long triggerTime, 217 long sequenceNumber) { 218 super(r, result); 219 this.time = triggerTime; 220 this.period = 0; 221 this.sequenceNumber = sequenceNumber; 222 } 223 224 /** 225 * Creates a periodic action with given nanoTime-based initial 226 * trigger time and period. 227 */ ScheduledFutureTask(Runnable r, V result, long triggerTime, long period, long sequenceNumber)228 ScheduledFutureTask(Runnable r, V result, long triggerTime, 229 long period, long sequenceNumber) { 230 super(r, result); 231 this.time = triggerTime; 232 this.period = period; 233 this.sequenceNumber = sequenceNumber; 234 } 235 236 /** 237 * Creates a one-shot action with given nanoTime-based trigger time. 238 */ ScheduledFutureTask(Callable<V> callable, long triggerTime, long sequenceNumber)239 ScheduledFutureTask(Callable<V> callable, long triggerTime, 240 long sequenceNumber) { 241 super(callable); 242 this.time = triggerTime; 243 this.period = 0; 244 this.sequenceNumber = sequenceNumber; 245 } 246 getDelay(TimeUnit unit)247 public long getDelay(TimeUnit unit) { 248 return unit.convert(time - System.nanoTime(), NANOSECONDS); 249 } 250 compareTo(Delayed other)251 public int compareTo(Delayed other) { 252 if (other == this) // compare zero if same object 253 return 0; 254 if (other instanceof ScheduledFutureTask) { 255 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; 256 long diff = time - x.time; 257 if (diff < 0) 258 return -1; 259 else if (diff > 0) 260 return 1; 261 else if (sequenceNumber < x.sequenceNumber) 262 return -1; 263 else 264 return 1; 265 } 266 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); 267 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; 268 } 269 270 /** 271 * Returns {@code true} if this is a periodic (not a one-shot) action. 272 * 273 * @return {@code true} if periodic 274 */ isPeriodic()275 public boolean isPeriodic() { 276 return period != 0; 277 } 278 279 /** 280 * Sets the next time to run for a periodic task. 281 */ setNextRunTime()282 private void setNextRunTime() { 283 long p = period; 284 if (p > 0) 285 time += p; 286 else 287 time = triggerTime(-p); 288 } 289 cancel(boolean mayInterruptIfRunning)290 public boolean cancel(boolean mayInterruptIfRunning) { 291 // The racy read of heapIndex below is benign: 292 // if heapIndex < 0, then OOTA guarantees that we have surely 293 // been removed; else we recheck under lock in remove() 294 boolean cancelled = super.cancel(mayInterruptIfRunning); 295 if (cancelled && removeOnCancel && heapIndex >= 0) 296 remove(this); 297 return cancelled; 298 } 299 300 /** 301 * Overrides FutureTask version so as to reset/requeue if periodic. 302 */ run()303 public void run() { 304 if (!canRunInCurrentRunState(this)) 305 cancel(false); 306 else if (!isPeriodic()) 307 super.run(); 308 else if (super.runAndReset()) { 309 setNextRunTime(); 310 reExecutePeriodic(outerTask); 311 } 312 } 313 } 314 315 /** 316 * Returns true if can run a task given current run state and 317 * run-after-shutdown parameters. 318 */ canRunInCurrentRunState(RunnableScheduledFuture<?> task)319 boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) { 320 if (!isShutdown()) 321 return true; 322 if (isStopped()) 323 return false; 324 return task.isPeriodic() 325 ? continueExistingPeriodicTasksAfterShutdown 326 : (executeExistingDelayedTasksAfterShutdown 327 // Android-changed: Preserving behaviour on expired tasks (b/202927404) 328 // || task.getDelay(NANOSECONDS) <= 0); 329 ); 330 } 331 332 /** 333 * Main execution method for delayed or periodic tasks. If pool 334 * is shut down, rejects the task. Otherwise adds task to queue 335 * and starts a thread, if necessary, to run it. (We cannot 336 * prestart the thread to run the task because the task (probably) 337 * shouldn't be run yet.) If the pool is shut down while the task 338 * is being added, cancel and remove it if required by state and 339 * run-after-shutdown parameters. 340 * 341 * @param task the task 342 */ delayedExecute(RunnableScheduledFuture<?> task)343 private void delayedExecute(RunnableScheduledFuture<?> task) { 344 if (isShutdown()) 345 reject(task); 346 else { 347 super.getQueue().add(task); 348 if (!canRunInCurrentRunState(task) && remove(task)) 349 task.cancel(false); 350 else 351 ensurePrestart(); 352 } 353 } 354 355 /** 356 * Requeues a periodic task unless current run state precludes it. 357 * Same idea as delayedExecute except drops task rather than rejecting. 358 * 359 * @param task the task 360 */ reExecutePeriodic(RunnableScheduledFuture<?> task)361 void reExecutePeriodic(RunnableScheduledFuture<?> task) { 362 if (canRunInCurrentRunState(task)) { 363 super.getQueue().add(task); 364 if (canRunInCurrentRunState(task) || !remove(task)) { 365 ensurePrestart(); 366 return; 367 } 368 } 369 task.cancel(false); 370 } 371 372 /** 373 * Cancels and clears the queue of all tasks that should not be run 374 * due to shutdown policy. Invoked within super.shutdown. 375 */ onShutdown()376 @Override void onShutdown() { 377 BlockingQueue<Runnable> q = super.getQueue(); 378 boolean keepDelayed = 379 getExecuteExistingDelayedTasksAfterShutdownPolicy(); 380 boolean keepPeriodic = 381 getContinueExistingPeriodicTasksAfterShutdownPolicy(); 382 // Traverse snapshot to avoid iterator exceptions 383 // TODO: implement and use efficient removeIf 384 // super.getQueue().removeIf(...); 385 for (Object e : q.toArray()) { 386 if (e instanceof RunnableScheduledFuture) { 387 RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; 388 if ((t.isPeriodic() 389 ? !keepPeriodic 390 // Android-changed: Preserving behaviour on expired tasks (b/202927404) 391 // : (!keepDelayed && t.getDelay(NANOSECONDS) > 0)) 392 : !keepDelayed) 393 || t.isCancelled()) { // also remove if already cancelled 394 if (q.remove(t)) 395 t.cancel(false); 396 } 397 } 398 } 399 tryTerminate(); 400 } 401 402 /** 403 * Modifies or replaces the task used to execute a runnable. 404 * This method can be used to override the concrete 405 * class used for managing internal tasks. 406 * The default implementation simply returns the given task. 407 * 408 * @param runnable the submitted Runnable 409 * @param task the task created to execute the runnable 410 * @param <V> the type of the task's result 411 * @return a task that can execute the runnable 412 * @since 1.6 413 */ decorateTask( Runnable runnable, RunnableScheduledFuture<V> task)414 protected <V> RunnableScheduledFuture<V> decorateTask( 415 Runnable runnable, RunnableScheduledFuture<V> task) { 416 return task; 417 } 418 419 /** 420 * Modifies or replaces the task used to execute a callable. 421 * This method can be used to override the concrete 422 * class used for managing internal tasks. 423 * The default implementation simply returns the given task. 424 * 425 * @param callable the submitted Callable 426 * @param task the task created to execute the callable 427 * @param <V> the type of the task's result 428 * @return a task that can execute the callable 429 * @since 1.6 430 */ decorateTask( Callable<V> callable, RunnableScheduledFuture<V> task)431 protected <V> RunnableScheduledFuture<V> decorateTask( 432 Callable<V> callable, RunnableScheduledFuture<V> task) { 433 return task; 434 } 435 436 /** 437 * The default keep-alive time for pool threads. 438 * 439 * Normally, this value is unused because all pool threads will be 440 * core threads, but if a user creates a pool with a corePoolSize 441 * of zero (against our advice), we keep a thread alive as long as 442 * there are queued tasks. If the keep alive time is zero (the 443 * historic value), we end up hot-spinning in getTask, wasting a 444 * CPU. But on the other hand, if we set the value too high, and 445 * users create a one-shot pool which they don't cleanly shutdown, 446 * the pool's non-daemon threads will prevent JVM termination. A 447 * small but non-zero value (relative to a JVM's lifetime) seems 448 * best. 449 */ 450 private static final long DEFAULT_KEEPALIVE_MILLIS = 10L; 451 452 /** 453 * Creates a new {@code ScheduledThreadPoolExecutor} with the 454 * given core pool size. 455 * 456 * @param corePoolSize the number of threads to keep in the pool, even 457 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 458 * @throws IllegalArgumentException if {@code corePoolSize < 0} 459 */ ScheduledThreadPoolExecutor(int corePoolSize)460 public ScheduledThreadPoolExecutor(int corePoolSize) { 461 super(corePoolSize, Integer.MAX_VALUE, 462 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, 463 new DelayedWorkQueue()); 464 } 465 466 /** 467 * Creates a new {@code ScheduledThreadPoolExecutor} with the 468 * given initial parameters. 469 * 470 * @param corePoolSize the number of threads to keep in the pool, even 471 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 472 * @param threadFactory the factory to use when the executor 473 * creates a new thread 474 * @throws IllegalArgumentException if {@code corePoolSize < 0} 475 * @throws NullPointerException if {@code threadFactory} is null 476 */ ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)477 public ScheduledThreadPoolExecutor(int corePoolSize, 478 ThreadFactory threadFactory) { 479 super(corePoolSize, Integer.MAX_VALUE, 480 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, 481 new DelayedWorkQueue(), threadFactory); 482 } 483 484 /** 485 * Creates a new {@code ScheduledThreadPoolExecutor} with the 486 * given initial parameters. 487 * 488 * @param corePoolSize the number of threads to keep in the pool, even 489 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 490 * @param handler the handler to use when execution is blocked 491 * because the thread bounds and queue capacities are reached 492 * @throws IllegalArgumentException if {@code corePoolSize < 0} 493 * @throws NullPointerException if {@code handler} is null 494 */ ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)495 public ScheduledThreadPoolExecutor(int corePoolSize, 496 RejectedExecutionHandler handler) { 497 super(corePoolSize, Integer.MAX_VALUE, 498 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, 499 new DelayedWorkQueue(), handler); 500 } 501 502 /** 503 * Creates a new {@code ScheduledThreadPoolExecutor} with the 504 * given initial parameters. 505 * 506 * @param corePoolSize the number of threads to keep in the pool, even 507 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 508 * @param threadFactory the factory to use when the executor 509 * creates a new thread 510 * @param handler the handler to use when execution is blocked 511 * because the thread bounds and queue capacities are reached 512 * @throws IllegalArgumentException if {@code corePoolSize < 0} 513 * @throws NullPointerException if {@code threadFactory} or 514 * {@code handler} is null 515 */ ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)516 public ScheduledThreadPoolExecutor(int corePoolSize, 517 ThreadFactory threadFactory, 518 RejectedExecutionHandler handler) { 519 super(corePoolSize, Integer.MAX_VALUE, 520 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, 521 new DelayedWorkQueue(), threadFactory, handler); 522 } 523 524 /** 525 * Returns the nanoTime-based trigger time of a delayed action. 526 */ triggerTime(long delay, TimeUnit unit)527 private long triggerTime(long delay, TimeUnit unit) { 528 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); 529 } 530 531 /** 532 * Returns the nanoTime-based trigger time of a delayed action. 533 */ triggerTime(long delay)534 long triggerTime(long delay) { 535 return System.nanoTime() + 536 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); 537 } 538 539 /** 540 * Constrains the values of all delays in the queue to be within 541 * Long.MAX_VALUE of each other, to avoid overflow in compareTo. 542 * This may occur if a task is eligible to be dequeued, but has 543 * not yet been, while some other task is added with a delay of 544 * Long.MAX_VALUE. 545 */ overflowFree(long delay)546 private long overflowFree(long delay) { 547 Delayed head = (Delayed) super.getQueue().peek(); 548 if (head != null) { 549 long headDelay = head.getDelay(NANOSECONDS); 550 if (headDelay < 0 && (delay - headDelay < 0)) 551 delay = Long.MAX_VALUE + headDelay; 552 } 553 return delay; 554 } 555 556 /** 557 * @throws RejectedExecutionException {@inheritDoc} 558 * @throws NullPointerException {@inheritDoc} 559 */ schedule(Runnable command, long delay, TimeUnit unit)560 public ScheduledFuture<?> schedule(Runnable command, 561 long delay, 562 TimeUnit unit) { 563 if (command == null || unit == null) 564 throw new NullPointerException(); 565 RunnableScheduledFuture<Void> t = decorateTask(command, 566 new ScheduledFutureTask<Void>(command, null, 567 triggerTime(delay, unit), 568 sequencer.getAndIncrement())); 569 delayedExecute(t); 570 return t; 571 } 572 573 /** 574 * @throws RejectedExecutionException {@inheritDoc} 575 * @throws NullPointerException {@inheritDoc} 576 */ schedule(Callable<V> callable, long delay, TimeUnit unit)577 public <V> ScheduledFuture<V> schedule(Callable<V> callable, 578 long delay, 579 TimeUnit unit) { 580 if (callable == null || unit == null) 581 throw new NullPointerException(); 582 RunnableScheduledFuture<V> t = decorateTask(callable, 583 new ScheduledFutureTask<V>(callable, 584 triggerTime(delay, unit), 585 sequencer.getAndIncrement())); 586 delayedExecute(t); 587 return t; 588 } 589 590 /** 591 * Submits a periodic action that becomes enabled first after the 592 * given initial delay, and subsequently with the given period; 593 * that is, executions will commence after 594 * {@code initialDelay}, then {@code initialDelay + period}, then 595 * {@code initialDelay + 2 * period}, and so on. 596 * 597 * <p>The sequence of task executions continues indefinitely until 598 * one of the following exceptional completions occur: 599 * <ul> 600 * <li>The task is {@linkplain Future#cancel explicitly cancelled} 601 * via the returned future. 602 * <li>Method {@link #shutdown} is called and the {@linkplain 603 * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on 604 * whether to continue after shutdown} is not set true, or method 605 * {@link #shutdownNow} is called; also resulting in task 606 * cancellation. 607 * <li>An execution of the task throws an exception. In this case 608 * calling {@link Future#get() get} on the returned future will throw 609 * {@link ExecutionException}, holding the exception as its cause. 610 * </ul> 611 * Subsequent executions are suppressed. Subsequent calls to 612 * {@link Future#isDone isDone()} on the returned future will 613 * return {@code true}. 614 * 615 * <p>If any execution of this task takes longer than its period, then 616 * subsequent executions may start late, but will not concurrently 617 * execute. 618 * 619 * @throws RejectedExecutionException {@inheritDoc} 620 * @throws NullPointerException {@inheritDoc} 621 * @throws IllegalArgumentException {@inheritDoc} 622 */ scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)623 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, 624 long initialDelay, 625 long period, 626 TimeUnit unit) { 627 if (command == null || unit == null) 628 throw new NullPointerException(); 629 if (period <= 0L) 630 throw new IllegalArgumentException(); 631 ScheduledFutureTask<Void> sft = 632 new ScheduledFutureTask<Void>(command, 633 null, 634 triggerTime(initialDelay, unit), 635 unit.toNanos(period), 636 sequencer.getAndIncrement()); 637 RunnableScheduledFuture<Void> t = decorateTask(command, sft); 638 sft.outerTask = t; 639 delayedExecute(t); 640 return t; 641 } 642 643 /** 644 * Submits a periodic action that becomes enabled first after the 645 * given initial delay, and subsequently with the given delay 646 * between the termination of one execution and the commencement of 647 * the next. 648 * 649 * <p>The sequence of task executions continues indefinitely until 650 * one of the following exceptional completions occur: 651 * <ul> 652 * <li>The task is {@linkplain Future#cancel explicitly cancelled} 653 * via the returned future. 654 * <li>Method {@link #shutdown} is called and the {@linkplain 655 * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on 656 * whether to continue after shutdown} is not set true, or method 657 * {@link #shutdownNow} is called; also resulting in task 658 * cancellation. 659 * <li>An execution of the task throws an exception. In this case 660 * calling {@link Future#get() get} on the returned future will throw 661 * {@link ExecutionException}, holding the exception as its cause. 662 * </ul> 663 * Subsequent executions are suppressed. Subsequent calls to 664 * {@link Future#isDone isDone()} on the returned future will 665 * return {@code true}. 666 * 667 * @throws RejectedExecutionException {@inheritDoc} 668 * @throws NullPointerException {@inheritDoc} 669 * @throws IllegalArgumentException {@inheritDoc} 670 */ scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)671 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 672 long initialDelay, 673 long delay, 674 TimeUnit unit) { 675 if (command == null || unit == null) 676 throw new NullPointerException(); 677 if (delay <= 0L) 678 throw new IllegalArgumentException(); 679 ScheduledFutureTask<Void> sft = 680 new ScheduledFutureTask<Void>(command, 681 null, 682 triggerTime(initialDelay, unit), 683 -unit.toNanos(delay), 684 sequencer.getAndIncrement()); 685 RunnableScheduledFuture<Void> t = decorateTask(command, sft); 686 sft.outerTask = t; 687 delayedExecute(t); 688 return t; 689 } 690 691 /** 692 * Executes {@code command} with zero required delay. 693 * This has effect equivalent to 694 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}. 695 * Note that inspections of the queue and of the list returned by 696 * {@code shutdownNow} will access the zero-delayed 697 * {@link ScheduledFuture}, not the {@code command} itself. 698 * 699 * <p>A consequence of the use of {@code ScheduledFuture} objects is 700 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always 701 * called with a null second {@code Throwable} argument, even if the 702 * {@code command} terminated abruptly. Instead, the {@code Throwable} 703 * thrown by such a task can be obtained via {@link Future#get}. 704 * 705 * @throws RejectedExecutionException at discretion of 706 * {@code RejectedExecutionHandler}, if the task 707 * cannot be accepted for execution because the 708 * executor has been shut down 709 * @throws NullPointerException {@inheritDoc} 710 */ execute(Runnable command)711 public void execute(Runnable command) { 712 schedule(command, 0, NANOSECONDS); 713 } 714 715 // Override AbstractExecutorService methods 716 717 /** 718 * @throws RejectedExecutionException {@inheritDoc} 719 * @throws NullPointerException {@inheritDoc} 720 */ submit(Runnable task)721 public Future<?> submit(Runnable task) { 722 return schedule(task, 0, NANOSECONDS); 723 } 724 725 /** 726 * @throws RejectedExecutionException {@inheritDoc} 727 * @throws NullPointerException {@inheritDoc} 728 */ submit(Runnable task, T result)729 public <T> Future<T> submit(Runnable task, T result) { 730 return schedule(Executors.callable(task, result), 0, NANOSECONDS); 731 } 732 733 /** 734 * @throws RejectedExecutionException {@inheritDoc} 735 * @throws NullPointerException {@inheritDoc} 736 */ submit(Callable<T> task)737 public <T> Future<T> submit(Callable<T> task) { 738 return schedule(task, 0, NANOSECONDS); 739 } 740 741 /** 742 * Sets the policy on whether to continue executing existing 743 * periodic tasks even when this executor has been {@code shutdown}. 744 * In this case, executions will continue until {@code shutdownNow} 745 * or the policy is set to {@code false} when already shutdown. 746 * This value is by default {@code false}. 747 * 748 * @param value if {@code true}, continue after shutdown, else don't 749 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy 750 */ setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)751 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) { 752 continueExistingPeriodicTasksAfterShutdown = value; 753 if (!value && isShutdown()) 754 onShutdown(); 755 } 756 757 /** 758 * Gets the policy on whether to continue executing existing 759 * periodic tasks even when this executor has been {@code shutdown}. 760 * In this case, executions will continue until {@code shutdownNow} 761 * or the policy is set to {@code false} when already shutdown. 762 * This value is by default {@code false}. 763 * 764 * @return {@code true} if will continue after shutdown 765 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy 766 */ getContinueExistingPeriodicTasksAfterShutdownPolicy()767 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() { 768 return continueExistingPeriodicTasksAfterShutdown; 769 } 770 771 /** 772 * Sets the policy on whether to execute existing delayed 773 * tasks even when this executor has been {@code shutdown}. 774 * In this case, these tasks will only terminate upon 775 * {@code shutdownNow}, or after setting the policy to 776 * {@code false} when already shutdown. 777 * This value is by default {@code true}. 778 * 779 * @param value if {@code true}, execute after shutdown, else don't 780 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy 781 */ setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value)782 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) { 783 executeExistingDelayedTasksAfterShutdown = value; 784 if (!value && isShutdown()) 785 onShutdown(); 786 } 787 788 /** 789 * Gets the policy on whether to execute existing delayed 790 * tasks even when this executor has been {@code shutdown}. 791 * In this case, these tasks will only terminate upon 792 * {@code shutdownNow}, or after setting the policy to 793 * {@code false} when already shutdown. 794 * This value is by default {@code true}. 795 * 796 * @return {@code true} if will execute after shutdown 797 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy 798 */ getExecuteExistingDelayedTasksAfterShutdownPolicy()799 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() { 800 return executeExistingDelayedTasksAfterShutdown; 801 } 802 803 /** 804 * Sets the policy on whether cancelled tasks should be immediately 805 * removed from the work queue at time of cancellation. This value is 806 * by default {@code false}. 807 * 808 * @param value if {@code true}, remove on cancellation, else don't 809 * @see #getRemoveOnCancelPolicy 810 * @since 1.7 811 */ setRemoveOnCancelPolicy(boolean value)812 public void setRemoveOnCancelPolicy(boolean value) { 813 removeOnCancel = value; 814 } 815 816 /** 817 * Gets the policy on whether cancelled tasks should be immediately 818 * removed from the work queue at time of cancellation. This value is 819 * by default {@code false}. 820 * 821 * @return {@code true} if cancelled tasks are immediately removed 822 * from the queue 823 * @see #setRemoveOnCancelPolicy 824 * @since 1.7 825 */ getRemoveOnCancelPolicy()826 public boolean getRemoveOnCancelPolicy() { 827 return removeOnCancel; 828 } 829 830 /** 831 * Initiates an orderly shutdown in which previously submitted 832 * tasks are executed, but no new tasks will be accepted. 833 * Invocation has no additional effect if already shut down. 834 * 835 * <p>This method does not wait for previously submitted tasks to 836 * complete execution. Use {@link #awaitTermination awaitTermination} 837 * to do that. 838 * 839 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} 840 * has been set {@code false}, existing delayed tasks whose delays 841 * have not yet elapsed are cancelled. And unless the {@code 842 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set 843 * {@code true}, future executions of existing periodic tasks will 844 * be cancelled. 845 */ 846 // android-note: Removed "throws SecurityException" doc. shutdown()847 public void shutdown() { 848 super.shutdown(); 849 } 850 851 /** 852 * Attempts to stop all actively executing tasks, halts the 853 * processing of waiting tasks, and returns a list of the tasks 854 * that were awaiting execution. These tasks are drained (removed) 855 * from the task queue upon return from this method. 856 * 857 * <p>This method does not wait for actively executing tasks to 858 * terminate. Use {@link #awaitTermination awaitTermination} to 859 * do that. 860 * 861 * <p>There are no guarantees beyond best-effort attempts to stop 862 * processing actively executing tasks. This implementation 863 * interrupts tasks via {@link Thread#interrupt}; any task that 864 * fails to respond to interrupts may never terminate. 865 * 866 * @return list of tasks that never commenced execution. 867 * Each element of this list is a {@link ScheduledFuture}. 868 * For tasks submitted via one of the {@code schedule} 869 * methods, the element will be identical to the returned 870 * {@code ScheduledFuture}. For tasks submitted using 871 * {@link #execute execute}, the element will be a 872 * zero-delay {@code ScheduledFuture}. 873 */ 874 // android-note: Removed "throws SecurityException" doc. shutdownNow()875 public List<Runnable> shutdownNow() { 876 return super.shutdownNow(); 877 } 878 879 /** 880 * Returns the task queue used by this executor. Access to the 881 * task queue is intended primarily for debugging and monitoring. 882 * This queue may be in active use. Retrieving the task queue 883 * does not prevent queued tasks from executing. 884 * 885 * <p>Each element of this queue is a {@link ScheduledFuture}. 886 * For tasks submitted via one of the {@code schedule} methods, the 887 * element will be identical to the returned {@code ScheduledFuture}. 888 * For tasks submitted using {@link #execute execute}, the element 889 * will be a zero-delay {@code ScheduledFuture}. 890 * 891 * <p>Iteration over this queue is <em>not</em> guaranteed to traverse 892 * tasks in the order in which they will execute. 893 * 894 * @return the task queue 895 */ getQueue()896 public BlockingQueue<Runnable> getQueue() { 897 return super.getQueue(); 898 } 899 900 /** 901 * Specialized delay queue. To mesh with TPE declarations, this 902 * class must be declared as a BlockingQueue<Runnable> even though 903 * it can only hold RunnableScheduledFutures. 904 */ 905 static class DelayedWorkQueue extends AbstractQueue<Runnable> 906 implements BlockingQueue<Runnable> { 907 908 /* 909 * A DelayedWorkQueue is based on a heap-based data structure 910 * like those in DelayQueue and PriorityQueue, except that 911 * every ScheduledFutureTask also records its index into the 912 * heap array. This eliminates the need to find a task upon 913 * cancellation, greatly speeding up removal (down from O(n) 914 * to O(log n)), and reducing garbage retention that would 915 * otherwise occur by waiting for the element to rise to top 916 * before clearing. But because the queue may also hold 917 * RunnableScheduledFutures that are not ScheduledFutureTasks, 918 * we are not guaranteed to have such indices available, in 919 * which case we fall back to linear search. (We expect that 920 * most tasks will not be decorated, and that the faster cases 921 * will be much more common.) 922 * 923 * All heap operations must record index changes -- mainly 924 * within siftUp and siftDown. Upon removal, a task's 925 * heapIndex is set to -1. Note that ScheduledFutureTasks can 926 * appear at most once in the queue (this need not be true for 927 * other kinds of tasks or work queues), so are uniquely 928 * identified by heapIndex. 929 */ 930 931 private static final int INITIAL_CAPACITY = 16; 932 private RunnableScheduledFuture<?>[] queue = 933 new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; 934 private final ReentrantLock lock = new ReentrantLock(); 935 private int size; 936 937 /** 938 * Thread designated to wait for the task at the head of the 939 * queue. This variant of the Leader-Follower pattern 940 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to 941 * minimize unnecessary timed waiting. When a thread becomes 942 * the leader, it waits only for the next delay to elapse, but 943 * other threads await indefinitely. The leader thread must 944 * signal some other thread before returning from take() or 945 * poll(...), unless some other thread becomes leader in the 946 * interim. Whenever the head of the queue is replaced with a 947 * task with an earlier expiration time, the leader field is 948 * invalidated by being reset to null, and some waiting 949 * thread, but not necessarily the current leader, is 950 * signalled. So waiting threads must be prepared to acquire 951 * and lose leadership while waiting. 952 */ 953 private Thread leader; 954 955 /** 956 * Condition signalled when a newer task becomes available at the 957 * head of the queue or a new thread may need to become leader. 958 */ 959 private final Condition available = lock.newCondition(); 960 961 /** 962 * Sets f's heapIndex if it is a ScheduledFutureTask. 963 */ setIndex(RunnableScheduledFuture<?> f, int idx)964 private static void setIndex(RunnableScheduledFuture<?> f, int idx) { 965 if (f instanceof ScheduledFutureTask) 966 ((ScheduledFutureTask)f).heapIndex = idx; 967 } 968 969 /** 970 * Sifts element added at bottom up to its heap-ordered spot. 971 * Call only when holding lock. 972 */ siftUp(int k, RunnableScheduledFuture<?> key)973 private void siftUp(int k, RunnableScheduledFuture<?> key) { 974 while (k > 0) { 975 int parent = (k - 1) >>> 1; 976 RunnableScheduledFuture<?> e = queue[parent]; 977 if (key.compareTo(e) >= 0) 978 break; 979 queue[k] = e; 980 setIndex(e, k); 981 k = parent; 982 } 983 queue[k] = key; 984 setIndex(key, k); 985 } 986 987 /** 988 * Sifts element added at top down to its heap-ordered spot. 989 * Call only when holding lock. 990 */ siftDown(int k, RunnableScheduledFuture<?> key)991 private void siftDown(int k, RunnableScheduledFuture<?> key) { 992 int half = size >>> 1; 993 while (k < half) { 994 int child = (k << 1) + 1; 995 RunnableScheduledFuture<?> c = queue[child]; 996 int right = child + 1; 997 if (right < size && c.compareTo(queue[right]) > 0) 998 c = queue[child = right]; 999 if (key.compareTo(c) <= 0) 1000 break; 1001 queue[k] = c; 1002 setIndex(c, k); 1003 k = child; 1004 } 1005 queue[k] = key; 1006 setIndex(key, k); 1007 } 1008 1009 /** 1010 * Resizes the heap array. Call only when holding lock. 1011 */ grow()1012 private void grow() { 1013 int oldCapacity = queue.length; 1014 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% 1015 if (newCapacity < 0) // overflow 1016 newCapacity = Integer.MAX_VALUE; 1017 queue = Arrays.copyOf(queue, newCapacity); 1018 } 1019 1020 /** 1021 * Finds index of given object, or -1 if absent. 1022 */ indexOf(Object x)1023 private int indexOf(Object x) { 1024 if (x != null) { 1025 if (x instanceof ScheduledFutureTask) { 1026 int i = ((ScheduledFutureTask) x).heapIndex; 1027 // Sanity check; x could conceivably be a 1028 // ScheduledFutureTask from some other pool. 1029 if (i >= 0 && i < size && queue[i] == x) 1030 return i; 1031 } else { 1032 for (int i = 0; i < size; i++) 1033 if (x.equals(queue[i])) 1034 return i; 1035 } 1036 } 1037 return -1; 1038 } 1039 contains(Object x)1040 public boolean contains(Object x) { 1041 final ReentrantLock lock = this.lock; 1042 lock.lock(); 1043 try { 1044 return indexOf(x) != -1; 1045 } finally { 1046 lock.unlock(); 1047 } 1048 } 1049 remove(Object x)1050 public boolean remove(Object x) { 1051 final ReentrantLock lock = this.lock; 1052 lock.lock(); 1053 try { 1054 int i = indexOf(x); 1055 if (i < 0) 1056 return false; 1057 1058 setIndex(queue[i], -1); 1059 int s = --size; 1060 RunnableScheduledFuture<?> replacement = queue[s]; 1061 queue[s] = null; 1062 if (s != i) { 1063 siftDown(i, replacement); 1064 if (queue[i] == replacement) 1065 siftUp(i, replacement); 1066 } 1067 return true; 1068 } finally { 1069 lock.unlock(); 1070 } 1071 } 1072 size()1073 public int size() { 1074 final ReentrantLock lock = this.lock; 1075 lock.lock(); 1076 try { 1077 return size; 1078 } finally { 1079 lock.unlock(); 1080 } 1081 } 1082 isEmpty()1083 public boolean isEmpty() { 1084 return size() == 0; 1085 } 1086 remainingCapacity()1087 public int remainingCapacity() { 1088 return Integer.MAX_VALUE; 1089 } 1090 peek()1091 public RunnableScheduledFuture<?> peek() { 1092 final ReentrantLock lock = this.lock; 1093 lock.lock(); 1094 try { 1095 return queue[0]; 1096 } finally { 1097 lock.unlock(); 1098 } 1099 } 1100 offer(Runnable x)1101 public boolean offer(Runnable x) { 1102 if (x == null) 1103 throw new NullPointerException(); 1104 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; 1105 final ReentrantLock lock = this.lock; 1106 lock.lock(); 1107 try { 1108 int i = size; 1109 if (i >= queue.length) 1110 grow(); 1111 size = i + 1; 1112 if (i == 0) { 1113 queue[0] = e; 1114 setIndex(e, 0); 1115 } else { 1116 siftUp(i, e); 1117 } 1118 if (queue[0] == e) { 1119 leader = null; 1120 available.signal(); 1121 } 1122 } finally { 1123 lock.unlock(); 1124 } 1125 return true; 1126 } 1127 put(Runnable e)1128 public void put(Runnable e) { 1129 offer(e); 1130 } 1131 add(Runnable e)1132 public boolean add(Runnable e) { 1133 return offer(e); 1134 } 1135 offer(Runnable e, long timeout, TimeUnit unit)1136 public boolean offer(Runnable e, long timeout, TimeUnit unit) { 1137 return offer(e); 1138 } 1139 1140 /** 1141 * Performs common bookkeeping for poll and take: Replaces 1142 * first element with last and sifts it down. Call only when 1143 * holding lock. 1144 * @param f the task to remove and return 1145 */ finishPoll(RunnableScheduledFuture<?> f)1146 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { 1147 int s = --size; 1148 RunnableScheduledFuture<?> x = queue[s]; 1149 queue[s] = null; 1150 if (s != 0) 1151 siftDown(0, x); 1152 setIndex(f, -1); 1153 return f; 1154 } 1155 poll()1156 public RunnableScheduledFuture<?> poll() { 1157 final ReentrantLock lock = this.lock; 1158 lock.lock(); 1159 try { 1160 RunnableScheduledFuture<?> first = queue[0]; 1161 return (first == null || first.getDelay(NANOSECONDS) > 0) 1162 ? null 1163 : finishPoll(first); 1164 } finally { 1165 lock.unlock(); 1166 } 1167 } 1168 take()1169 public RunnableScheduledFuture<?> take() throws InterruptedException { 1170 final ReentrantLock lock = this.lock; 1171 lock.lockInterruptibly(); 1172 try { 1173 for (;;) { 1174 RunnableScheduledFuture<?> first = queue[0]; 1175 if (first == null) 1176 available.await(); 1177 else { 1178 long delay = first.getDelay(NANOSECONDS); 1179 if (delay <= 0L) 1180 return finishPoll(first); 1181 first = null; // don't retain ref while waiting 1182 if (leader != null) 1183 available.await(); 1184 else { 1185 Thread thisThread = Thread.currentThread(); 1186 leader = thisThread; 1187 try { 1188 available.awaitNanos(delay); 1189 } finally { 1190 if (leader == thisThread) 1191 leader = null; 1192 } 1193 } 1194 } 1195 } 1196 } finally { 1197 if (leader == null && queue[0] != null) 1198 available.signal(); 1199 lock.unlock(); 1200 } 1201 } 1202 poll(long timeout, TimeUnit unit)1203 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) 1204 throws InterruptedException { 1205 long nanos = unit.toNanos(timeout); 1206 final ReentrantLock lock = this.lock; 1207 lock.lockInterruptibly(); 1208 try { 1209 for (;;) { 1210 RunnableScheduledFuture<?> first = queue[0]; 1211 if (first == null) { 1212 if (nanos <= 0L) 1213 return null; 1214 else 1215 nanos = available.awaitNanos(nanos); 1216 } else { 1217 long delay = first.getDelay(NANOSECONDS); 1218 if (delay <= 0L) 1219 return finishPoll(first); 1220 if (nanos <= 0L) 1221 return null; 1222 first = null; // don't retain ref while waiting 1223 if (nanos < delay || leader != null) 1224 nanos = available.awaitNanos(nanos); 1225 else { 1226 Thread thisThread = Thread.currentThread(); 1227 leader = thisThread; 1228 try { 1229 long timeLeft = available.awaitNanos(delay); 1230 nanos -= delay - timeLeft; 1231 } finally { 1232 if (leader == thisThread) 1233 leader = null; 1234 } 1235 } 1236 } 1237 } 1238 } finally { 1239 if (leader == null && queue[0] != null) 1240 available.signal(); 1241 lock.unlock(); 1242 } 1243 } 1244 clear()1245 public void clear() { 1246 final ReentrantLock lock = this.lock; 1247 lock.lock(); 1248 try { 1249 for (int i = 0; i < size; i++) { 1250 RunnableScheduledFuture<?> t = queue[i]; 1251 if (t != null) { 1252 queue[i] = null; 1253 setIndex(t, -1); 1254 } 1255 } 1256 size = 0; 1257 } finally { 1258 lock.unlock(); 1259 } 1260 } 1261 drainTo(Collection<? super Runnable> c)1262 public int drainTo(Collection<? super Runnable> c) { 1263 return drainTo(c, Integer.MAX_VALUE); 1264 } 1265 drainTo(Collection<? super Runnable> c, int maxElements)1266 public int drainTo(Collection<? super Runnable> c, int maxElements) { 1267 Objects.requireNonNull(c); 1268 if (c == this) 1269 throw new IllegalArgumentException(); 1270 if (maxElements <= 0) 1271 return 0; 1272 final ReentrantLock lock = this.lock; 1273 lock.lock(); 1274 try { 1275 int n = 0; 1276 for (RunnableScheduledFuture<?> first; 1277 n < maxElements 1278 && (first = queue[0]) != null 1279 && first.getDelay(NANOSECONDS) <= 0;) { 1280 c.add(first); // In this order, in case add() throws. 1281 finishPoll(first); 1282 ++n; 1283 } 1284 return n; 1285 } finally { 1286 lock.unlock(); 1287 } 1288 } 1289 toArray()1290 public Object[] toArray() { 1291 final ReentrantLock lock = this.lock; 1292 lock.lock(); 1293 try { 1294 return Arrays.copyOf(queue, size, Object[].class); 1295 } finally { 1296 lock.unlock(); 1297 } 1298 } 1299 1300 @SuppressWarnings("unchecked") toArray(T[] a)1301 public <T> T[] toArray(T[] a) { 1302 final ReentrantLock lock = this.lock; 1303 lock.lock(); 1304 try { 1305 if (a.length < size) 1306 return (T[]) Arrays.copyOf(queue, size, a.getClass()); 1307 System.arraycopy(queue, 0, a, 0, size); 1308 if (a.length > size) 1309 a[size] = null; 1310 return a; 1311 } finally { 1312 lock.unlock(); 1313 } 1314 } 1315 iterator()1316 public Iterator<Runnable> iterator() { 1317 final ReentrantLock lock = this.lock; 1318 lock.lock(); 1319 try { 1320 return new Itr(Arrays.copyOf(queue, size)); 1321 } finally { 1322 lock.unlock(); 1323 } 1324 } 1325 1326 /** 1327 * Snapshot iterator that works off copy of underlying q array. 1328 */ 1329 private class Itr implements Iterator<Runnable> { 1330 final RunnableScheduledFuture<?>[] array; 1331 int cursor; // index of next element to return; initially 0 1332 int lastRet = -1; // index of last element returned; -1 if no such 1333 Itr(RunnableScheduledFuture<?>[] array)1334 Itr(RunnableScheduledFuture<?>[] array) { 1335 this.array = array; 1336 } 1337 hasNext()1338 public boolean hasNext() { 1339 return cursor < array.length; 1340 } 1341 next()1342 public Runnable next() { 1343 if (cursor >= array.length) 1344 throw new NoSuchElementException(); 1345 return array[lastRet = cursor++]; 1346 } 1347 remove()1348 public void remove() { 1349 if (lastRet < 0) 1350 throw new IllegalStateException(); 1351 DelayedWorkQueue.this.remove(array[lastRet]); 1352 lastRet = -1; 1353 } 1354 } 1355 } 1356 } 1357