1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import static java.util.concurrent.TimeUnit.MILLISECONDS;
39 import static java.util.concurrent.TimeUnit.NANOSECONDS;
40 
41 import java.util.AbstractQueue;
42 import java.util.Arrays;
43 import java.util.Collection;
44 import java.util.Iterator;
45 import java.util.List;
46 import java.util.NoSuchElementException;
47 import java.util.concurrent.atomic.AtomicLong;
48 import java.util.concurrent.locks.Condition;
49 import java.util.concurrent.locks.ReentrantLock;
50 
51 // BEGIN android-note
52 // omit class-level docs on setRemoveOnCancelPolicy()
53 // END android-note
54 
55 /**
56  * A {@link ThreadPoolExecutor} that can additionally schedule
57  * commands to run after a given delay, or to execute periodically.
58  * This class is preferable to {@link java.util.Timer} when multiple
59  * worker threads are needed, or when the additional flexibility or
60  * capabilities of {@link ThreadPoolExecutor} (which this class
61  * extends) are required.
62  *
63  * <p>Delayed tasks execute no sooner than they are enabled, but
64  * without any real-time guarantees about when, after they are
65  * enabled, they will commence. Tasks scheduled for exactly the same
66  * execution time are enabled in first-in-first-out (FIFO) order of
67  * submission.
68  *
69  * <p>When a submitted task is cancelled before it is run, execution
70  * is suppressed.  By default, such a cancelled task is not
71  * automatically removed from the work queue until its delay elapses.
72  * While this enables further inspection and monitoring, it may also
73  * cause unbounded retention of cancelled tasks.
74  *
75  * <p>Successive executions of a periodic task scheduled via
76  * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
77  * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
78  * do not overlap. While different executions may be performed by
79  * different threads, the effects of prior executions
80  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
81  * those of subsequent ones.
82  *
83  * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
84  * of the inherited tuning methods are not useful for it. In
85  * particular, because it acts as a fixed-sized pool using
86  * {@code corePoolSize} threads and an unbounded queue, adjustments
87  * to {@code maximumPoolSize} have no useful effect. Additionally, it
88  * is almost never a good idea to set {@code corePoolSize} to zero or
89  * use {@code allowCoreThreadTimeOut} because this may leave the pool
90  * without threads to handle tasks once they become eligible to run.
91  *
92  * <p><b>Extension notes:</b> This class overrides the
93  * {@link ThreadPoolExecutor#execute(Runnable) execute} and
94  * {@link AbstractExecutorService#submit(Runnable) submit}
95  * methods to generate internal {@link ScheduledFuture} objects to
96  * control per-task delays and scheduling.  To preserve
97  * functionality, any further overrides of these methods in
98  * subclasses must invoke superclass versions, which effectively
99  * disables additional task customization.  However, this class
100  * provides alternative protected extension method
101  * {@code decorateTask} (one version each for {@code Runnable} and
102  * {@code Callable}) that can be used to customize the concrete task
103  * types used to execute commands entered via {@code execute},
104  * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
105  * and {@code scheduleWithFixedDelay}.  By default, a
106  * {@code ScheduledThreadPoolExecutor} uses a task type extending
107  * {@link FutureTask}. However, this may be modified or replaced using
108  * subclasses of the form:
109  *
110  * <pre> {@code
111  * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
112  *
113  *   static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
114  *
115  *   protected <V> RunnableScheduledFuture<V> decorateTask(
116  *                Runnable r, RunnableScheduledFuture<V> task) {
117  *       return new CustomTask<V>(r, task);
118  *   }
119  *
120  *   protected <V> RunnableScheduledFuture<V> decorateTask(
121  *                Callable<V> c, RunnableScheduledFuture<V> task) {
122  *       return new CustomTask<V>(c, task);
123  *   }
124  *   // ... add constructors, etc.
125  * }}</pre>
126  *
127  * @since 1.5
128  * @author Doug Lea
129  */
130 public class ScheduledThreadPoolExecutor
131         extends ThreadPoolExecutor
132         implements ScheduledExecutorService {
133 
134     /*
135      * This class specializes ThreadPoolExecutor implementation by
136      *
137      * 1. Using a custom task type ScheduledFutureTask, even for tasks
138      *    that don't require scheduling because they are submitted
139      *    using ExecutorService rather than ScheduledExecutorService
140      *    methods, which are treated as tasks with a delay of zero.
141      *
142      * 2. Using a custom queue (DelayedWorkQueue), a variant of
143      *    unbounded DelayQueue. The lack of capacity constraint and
144      *    the fact that corePoolSize and maximumPoolSize are
145      *    effectively identical simplifies some execution mechanics
146      *    (see delayedExecute) compared to ThreadPoolExecutor.
147      *
148      * 3. Supporting optional run-after-shutdown parameters, which
149      *    leads to overrides of shutdown methods to remove and cancel
150      *    tasks that should NOT be run after shutdown, as well as
151      *    different recheck logic when task (re)submission overlaps
152      *    with a shutdown.
153      *
154      * 4. Task decoration methods to allow interception and
155      *    instrumentation, which are needed because subclasses cannot
156      *    otherwise override submit methods to get this effect. These
157      *    don't have any impact on pool control logic though.
158      */
159 
160     /**
161      * False if should cancel/suppress periodic tasks on shutdown.
162      */
163     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
164 
165     /**
166      * False if should cancel non-periodic tasks on shutdown.
167      */
168     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
169 
170     /**
171      * True if ScheduledFutureTask.cancel should remove from queue.
172      */
173     volatile boolean removeOnCancel;
174 
175     /**
176      * Sequence number to break scheduling ties, and in turn to
177      * guarantee FIFO order among tied entries.
178      */
179     private static final AtomicLong sequencer = new AtomicLong();
180 
181     private class ScheduledFutureTask<V>
182             extends FutureTask<V> implements RunnableScheduledFuture<V> {
183 
184         /** Sequence number to break ties FIFO */
185         private final long sequenceNumber;
186 
187         /** The nanoTime-based time when the task is enabled to execute. */
188         private volatile long time;
189 
190         /**
191          * Period for repeating tasks, in nanoseconds.
192          * A positive value indicates fixed-rate execution.
193          * A negative value indicates fixed-delay execution.
194          * A value of 0 indicates a non-repeating (one-shot) task.
195          */
196         private final long period;
197 
198         /** The actual task to be re-enqueued by reExecutePeriodic */
199         RunnableScheduledFuture<V> outerTask = this;
200 
201         /**
202          * Index into delay queue, to support faster cancellation.
203          */
204         int heapIndex;
205 
206         /**
207          * Creates a one-shot action with given nanoTime-based trigger time.
208          */
ScheduledFutureTask(Runnable r, V result, long triggerTime, long sequenceNumber)209         ScheduledFutureTask(Runnable r, V result, long triggerTime,
210                             long sequenceNumber) {
211             super(r, result);
212             this.time = triggerTime;
213             this.period = 0;
214             this.sequenceNumber = sequenceNumber;
215         }
216 
217         /**
218          * Creates a periodic action with given nanoTime-based initial
219          * trigger time and period.
220          */
ScheduledFutureTask(Runnable r, V result, long triggerTime, long period, long sequenceNumber)221         ScheduledFutureTask(Runnable r, V result, long triggerTime,
222                             long period, long sequenceNumber) {
223             super(r, result);
224             this.time = triggerTime;
225             this.period = period;
226             this.sequenceNumber = sequenceNumber;
227         }
228 
229         /**
230          * Creates a one-shot action with given nanoTime-based trigger time.
231          */
ScheduledFutureTask(Callable<V> callable, long triggerTime, long sequenceNumber)232         ScheduledFutureTask(Callable<V> callable, long triggerTime,
233                             long sequenceNumber) {
234             super(callable);
235             this.time = triggerTime;
236             this.period = 0;
237             this.sequenceNumber = sequenceNumber;
238         }
239 
getDelay(TimeUnit unit)240         public long getDelay(TimeUnit unit) {
241             return unit.convert(time - System.nanoTime(), NANOSECONDS);
242         }
243 
compareTo(Delayed other)244         public int compareTo(Delayed other) {
245             if (other == this) // compare zero if same object
246                 return 0;
247             if (other instanceof ScheduledFutureTask) {
248                 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
249                 long diff = time - x.time;
250                 if (diff < 0)
251                     return -1;
252                 else if (diff > 0)
253                     return 1;
254                 else if (sequenceNumber < x.sequenceNumber)
255                     return -1;
256                 else
257                     return 1;
258             }
259             long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
260             return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
261         }
262 
263         /**
264          * Returns {@code true} if this is a periodic (not a one-shot) action.
265          *
266          * @return {@code true} if periodic
267          */
isPeriodic()268         public boolean isPeriodic() {
269             return period != 0;
270         }
271 
272         /**
273          * Sets the next time to run for a periodic task.
274          */
setNextRunTime()275         private void setNextRunTime() {
276             long p = period;
277             if (p > 0)
278                 time += p;
279             else
280                 time = triggerTime(-p);
281         }
282 
cancel(boolean mayInterruptIfRunning)283         public boolean cancel(boolean mayInterruptIfRunning) {
284             // The racy read of heapIndex below is benign:
285             // if heapIndex < 0, then OOTA guarantees that we have surely
286             // been removed; else we recheck under lock in remove()
287             boolean cancelled = super.cancel(mayInterruptIfRunning);
288             if (cancelled && removeOnCancel && heapIndex >= 0)
289                 remove(this);
290             return cancelled;
291         }
292 
293         /**
294          * Overrides FutureTask version so as to reset/requeue if periodic.
295          */
run()296         public void run() {
297             boolean periodic = isPeriodic();
298             if (!canRunInCurrentRunState(periodic))
299                 cancel(false);
300             else if (!periodic)
301                 super.run();
302             else if (super.runAndReset()) {
303                 setNextRunTime();
304                 reExecutePeriodic(outerTask);
305             }
306         }
307     }
308 
309     /**
310      * Returns true if can run a task given current run state
311      * and run-after-shutdown parameters.
312      *
313      * @param periodic true if this task periodic, false if delayed
314      */
canRunInCurrentRunState(boolean periodic)315     boolean canRunInCurrentRunState(boolean periodic) {
316         return isRunningOrShutdown(periodic ?
317                                    continueExistingPeriodicTasksAfterShutdown :
318                                    executeExistingDelayedTasksAfterShutdown);
319     }
320 
321     /**
322      * Main execution method for delayed or periodic tasks.  If pool
323      * is shut down, rejects the task. Otherwise adds task to queue
324      * and starts a thread, if necessary, to run it.  (We cannot
325      * prestart the thread to run the task because the task (probably)
326      * shouldn't be run yet.)  If the pool is shut down while the task
327      * is being added, cancel and remove it if required by state and
328      * run-after-shutdown parameters.
329      *
330      * @param task the task
331      */
delayedExecute(RunnableScheduledFuture<?> task)332     private void delayedExecute(RunnableScheduledFuture<?> task) {
333         if (isShutdown())
334             reject(task);
335         else {
336             super.getQueue().add(task);
337             if (isShutdown() &&
338                 !canRunInCurrentRunState(task.isPeriodic()) &&
339                 remove(task))
340                 task.cancel(false);
341             else
342                 ensurePrestart();
343         }
344     }
345 
346     /**
347      * Requeues a periodic task unless current run state precludes it.
348      * Same idea as delayedExecute except drops task rather than rejecting.
349      *
350      * @param task the task
351      */
reExecutePeriodic(RunnableScheduledFuture<?> task)352     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
353         if (canRunInCurrentRunState(true)) {
354             super.getQueue().add(task);
355             if (!canRunInCurrentRunState(true) && remove(task))
356                 task.cancel(false);
357             else
358                 ensurePrestart();
359         }
360     }
361 
362     /**
363      * Cancels and clears the queue of all tasks that should not be run
364      * due to shutdown policy.  Invoked within super.shutdown.
365      */
onShutdown()366     @Override void onShutdown() {
367         BlockingQueue<Runnable> q = super.getQueue();
368         boolean keepDelayed =
369             getExecuteExistingDelayedTasksAfterShutdownPolicy();
370         boolean keepPeriodic =
371             getContinueExistingPeriodicTasksAfterShutdownPolicy();
372         if (!keepDelayed && !keepPeriodic) {
373             for (Object e : q.toArray())
374                 if (e instanceof RunnableScheduledFuture<?>)
375                     ((RunnableScheduledFuture<?>) e).cancel(false);
376             q.clear();
377         }
378         else {
379             // Traverse snapshot to avoid iterator exceptions
380             for (Object e : q.toArray()) {
381                 if (e instanceof RunnableScheduledFuture) {
382                     RunnableScheduledFuture<?> t =
383                         (RunnableScheduledFuture<?>)e;
384                     if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
385                         t.isCancelled()) { // also remove if already cancelled
386                         if (q.remove(t))
387                             t.cancel(false);
388                     }
389                 }
390             }
391         }
392         tryTerminate();
393     }
394 
395     /**
396      * Modifies or replaces the task used to execute a runnable.
397      * This method can be used to override the concrete
398      * class used for managing internal tasks.
399      * The default implementation simply returns the given task.
400      *
401      * @param runnable the submitted Runnable
402      * @param task the task created to execute the runnable
403      * @param <V> the type of the task's result
404      * @return a task that can execute the runnable
405      * @since 1.6
406      */
decorateTask( Runnable runnable, RunnableScheduledFuture<V> task)407     protected <V> RunnableScheduledFuture<V> decorateTask(
408         Runnable runnable, RunnableScheduledFuture<V> task) {
409         return task;
410     }
411 
412     /**
413      * Modifies or replaces the task used to execute a callable.
414      * This method can be used to override the concrete
415      * class used for managing internal tasks.
416      * The default implementation simply returns the given task.
417      *
418      * @param callable the submitted Callable
419      * @param task the task created to execute the callable
420      * @param <V> the type of the task's result
421      * @return a task that can execute the callable
422      * @since 1.6
423      */
decorateTask( Callable<V> callable, RunnableScheduledFuture<V> task)424     protected <V> RunnableScheduledFuture<V> decorateTask(
425         Callable<V> callable, RunnableScheduledFuture<V> task) {
426         return task;
427     }
428 
429     /**
430      * The default keep-alive time for pool threads.
431      *
432      * Normally, this value is unused because all pool threads will be
433      * core threads, but if a user creates a pool with a corePoolSize
434      * of zero (against our advice), we keep a thread alive as long as
435      * there are queued tasks.  If the keep alive time is zero (the
436      * historic value), we end up hot-spinning in getTask, wasting a
437      * CPU.  But on the other hand, if we set the value too high, and
438      * users create a one-shot pool which they don't cleanly shutdown,
439      * the pool's non-daemon threads will prevent JVM termination.  A
440      * small but non-zero value (relative to a JVM's lifetime) seems
441      * best.
442      */
443     private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
444 
445     /**
446      * Creates a new {@code ScheduledThreadPoolExecutor} with the
447      * given core pool size.
448      *
449      * @param corePoolSize the number of threads to keep in the pool, even
450      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
451      * @throws IllegalArgumentException if {@code corePoolSize < 0}
452      */
ScheduledThreadPoolExecutor(int corePoolSize)453     public ScheduledThreadPoolExecutor(int corePoolSize) {
454         super(corePoolSize, Integer.MAX_VALUE,
455               DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
456               new DelayedWorkQueue());
457     }
458 
459     /**
460      * Creates a new {@code ScheduledThreadPoolExecutor} with the
461      * given initial parameters.
462      *
463      * @param corePoolSize the number of threads to keep in the pool, even
464      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
465      * @param threadFactory the factory to use when the executor
466      *        creates a new thread
467      * @throws IllegalArgumentException if {@code corePoolSize < 0}
468      * @throws NullPointerException if {@code threadFactory} is null
469      */
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)470     public ScheduledThreadPoolExecutor(int corePoolSize,
471                                        ThreadFactory threadFactory) {
472         super(corePoolSize, Integer.MAX_VALUE,
473               DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
474               new DelayedWorkQueue(), threadFactory);
475     }
476 
477     /**
478      * Creates a new {@code ScheduledThreadPoolExecutor} with the
479      * given initial parameters.
480      *
481      * @param corePoolSize the number of threads to keep in the pool, even
482      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
483      * @param handler the handler to use when execution is blocked
484      *        because the thread bounds and queue capacities are reached
485      * @throws IllegalArgumentException if {@code corePoolSize < 0}
486      * @throws NullPointerException if {@code handler} is null
487      */
ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)488     public ScheduledThreadPoolExecutor(int corePoolSize,
489                                        RejectedExecutionHandler handler) {
490         super(corePoolSize, Integer.MAX_VALUE,
491               DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
492               new DelayedWorkQueue(), handler);
493     }
494 
495     /**
496      * Creates a new {@code ScheduledThreadPoolExecutor} with the
497      * given initial parameters.
498      *
499      * @param corePoolSize the number of threads to keep in the pool, even
500      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
501      * @param threadFactory the factory to use when the executor
502      *        creates a new thread
503      * @param handler the handler to use when execution is blocked
504      *        because the thread bounds and queue capacities are reached
505      * @throws IllegalArgumentException if {@code corePoolSize < 0}
506      * @throws NullPointerException if {@code threadFactory} or
507      *         {@code handler} is null
508      */
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)509     public ScheduledThreadPoolExecutor(int corePoolSize,
510                                        ThreadFactory threadFactory,
511                                        RejectedExecutionHandler handler) {
512         super(corePoolSize, Integer.MAX_VALUE,
513               DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
514               new DelayedWorkQueue(), threadFactory, handler);
515     }
516 
517     /**
518      * Returns the nanoTime-based trigger time of a delayed action.
519      */
triggerTime(long delay, TimeUnit unit)520     private long triggerTime(long delay, TimeUnit unit) {
521         return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
522     }
523 
524     /**
525      * Returns the nanoTime-based trigger time of a delayed action.
526      */
triggerTime(long delay)527     long triggerTime(long delay) {
528         return System.nanoTime() +
529             ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
530     }
531 
532     /**
533      * Constrains the values of all delays in the queue to be within
534      * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
535      * This may occur if a task is eligible to be dequeued, but has
536      * not yet been, while some other task is added with a delay of
537      * Long.MAX_VALUE.
538      */
overflowFree(long delay)539     private long overflowFree(long delay) {
540         Delayed head = (Delayed) super.getQueue().peek();
541         if (head != null) {
542             long headDelay = head.getDelay(NANOSECONDS);
543             if (headDelay < 0 && (delay - headDelay < 0))
544                 delay = Long.MAX_VALUE + headDelay;
545         }
546         return delay;
547     }
548 
549     /**
550      * @throws RejectedExecutionException {@inheritDoc}
551      * @throws NullPointerException       {@inheritDoc}
552      */
schedule(Runnable command, long delay, TimeUnit unit)553     public ScheduledFuture<?> schedule(Runnable command,
554                                        long delay,
555                                        TimeUnit unit) {
556         if (command == null || unit == null)
557             throw new NullPointerException();
558         RunnableScheduledFuture<Void> t = decorateTask(command,
559             new ScheduledFutureTask<Void>(command, null,
560                                           triggerTime(delay, unit),
561                                           sequencer.getAndIncrement()));
562         delayedExecute(t);
563         return t;
564     }
565 
566     /**
567      * @throws RejectedExecutionException {@inheritDoc}
568      * @throws NullPointerException       {@inheritDoc}
569      */
schedule(Callable<V> callable, long delay, TimeUnit unit)570     public <V> ScheduledFuture<V> schedule(Callable<V> callable,
571                                            long delay,
572                                            TimeUnit unit) {
573         if (callable == null || unit == null)
574             throw new NullPointerException();
575         RunnableScheduledFuture<V> t = decorateTask(callable,
576             new ScheduledFutureTask<V>(callable,
577                                        triggerTime(delay, unit),
578                                        sequencer.getAndIncrement()));
579         delayedExecute(t);
580         return t;
581     }
582 
583     /**
584      * @throws RejectedExecutionException {@inheritDoc}
585      * @throws NullPointerException       {@inheritDoc}
586      * @throws IllegalArgumentException   {@inheritDoc}
587      */
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)588     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
589                                                   long initialDelay,
590                                                   long period,
591                                                   TimeUnit unit) {
592         if (command == null || unit == null)
593             throw new NullPointerException();
594         if (period <= 0L)
595             throw new IllegalArgumentException();
596         ScheduledFutureTask<Void> sft =
597             new ScheduledFutureTask<Void>(command,
598                                           null,
599                                           triggerTime(initialDelay, unit),
600                                           unit.toNanos(period),
601                                           sequencer.getAndIncrement());
602         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
603         sft.outerTask = t;
604         delayedExecute(t);
605         return t;
606     }
607 
608     /**
609      * @throws RejectedExecutionException {@inheritDoc}
610      * @throws NullPointerException       {@inheritDoc}
611      * @throws IllegalArgumentException   {@inheritDoc}
612      */
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)613     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
614                                                      long initialDelay,
615                                                      long delay,
616                                                      TimeUnit unit) {
617         if (command == null || unit == null)
618             throw new NullPointerException();
619         if (delay <= 0L)
620             throw new IllegalArgumentException();
621         ScheduledFutureTask<Void> sft =
622             new ScheduledFutureTask<Void>(command,
623                                           null,
624                                           triggerTime(initialDelay, unit),
625                                           -unit.toNanos(delay),
626                                           sequencer.getAndIncrement());
627         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
628         sft.outerTask = t;
629         delayedExecute(t);
630         return t;
631     }
632 
633     /**
634      * Executes {@code command} with zero required delay.
635      * This has effect equivalent to
636      * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
637      * Note that inspections of the queue and of the list returned by
638      * {@code shutdownNow} will access the zero-delayed
639      * {@link ScheduledFuture}, not the {@code command} itself.
640      *
641      * <p>A consequence of the use of {@code ScheduledFuture} objects is
642      * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
643      * called with a null second {@code Throwable} argument, even if the
644      * {@code command} terminated abruptly.  Instead, the {@code Throwable}
645      * thrown by such a task can be obtained via {@link Future#get}.
646      *
647      * @throws RejectedExecutionException at discretion of
648      *         {@code RejectedExecutionHandler}, if the task
649      *         cannot be accepted for execution because the
650      *         executor has been shut down
651      * @throws NullPointerException {@inheritDoc}
652      */
execute(Runnable command)653     public void execute(Runnable command) {
654         schedule(command, 0, NANOSECONDS);
655     }
656 
657     // Override AbstractExecutorService methods
658 
659     /**
660      * @throws RejectedExecutionException {@inheritDoc}
661      * @throws NullPointerException       {@inheritDoc}
662      */
submit(Runnable task)663     public Future<?> submit(Runnable task) {
664         return schedule(task, 0, NANOSECONDS);
665     }
666 
667     /**
668      * @throws RejectedExecutionException {@inheritDoc}
669      * @throws NullPointerException       {@inheritDoc}
670      */
submit(Runnable task, T result)671     public <T> Future<T> submit(Runnable task, T result) {
672         return schedule(Executors.callable(task, result), 0, NANOSECONDS);
673     }
674 
675     /**
676      * @throws RejectedExecutionException {@inheritDoc}
677      * @throws NullPointerException       {@inheritDoc}
678      */
submit(Callable<T> task)679     public <T> Future<T> submit(Callable<T> task) {
680         return schedule(task, 0, NANOSECONDS);
681     }
682 
683     /**
684      * Sets the policy on whether to continue executing existing
685      * periodic tasks even when this executor has been {@code shutdown}.
686      * In this case, these tasks will only terminate upon
687      * {@code shutdownNow} or after setting the policy to
688      * {@code false} when already shutdown.
689      * This value is by default {@code false}.
690      *
691      * @param value if {@code true}, continue after shutdown, else don't
692      * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
693      */
setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)694     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
695         continueExistingPeriodicTasksAfterShutdown = value;
696         if (!value && isShutdown())
697             onShutdown();
698     }
699 
700     /**
701      * Gets the policy on whether to continue executing existing
702      * periodic tasks even when this executor has been {@code shutdown}.
703      * In this case, these tasks will only terminate upon
704      * {@code shutdownNow} or after setting the policy to
705      * {@code false} when already shutdown.
706      * This value is by default {@code false}.
707      *
708      * @return {@code true} if will continue after shutdown
709      * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
710      */
getContinueExistingPeriodicTasksAfterShutdownPolicy()711     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
712         return continueExistingPeriodicTasksAfterShutdown;
713     }
714 
715     /**
716      * Sets the policy on whether to execute existing delayed
717      * tasks even when this executor has been {@code shutdown}.
718      * In this case, these tasks will only terminate upon
719      * {@code shutdownNow}, or after setting the policy to
720      * {@code false} when already shutdown.
721      * This value is by default {@code true}.
722      *
723      * @param value if {@code true}, execute after shutdown, else don't
724      * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
725      */
setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value)726     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
727         executeExistingDelayedTasksAfterShutdown = value;
728         if (!value && isShutdown())
729             onShutdown();
730     }
731 
732     /**
733      * Gets the policy on whether to execute existing delayed
734      * tasks even when this executor has been {@code shutdown}.
735      * In this case, these tasks will only terminate upon
736      * {@code shutdownNow}, or after setting the policy to
737      * {@code false} when already shutdown.
738      * This value is by default {@code true}.
739      *
740      * @return {@code true} if will execute after shutdown
741      * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
742      */
getExecuteExistingDelayedTasksAfterShutdownPolicy()743     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
744         return executeExistingDelayedTasksAfterShutdown;
745     }
746 
747     /**
748      * Sets the policy on whether cancelled tasks should be immediately
749      * removed from the work queue at time of cancellation.  This value is
750      * by default {@code false}.
751      *
752      * @param value if {@code true}, remove on cancellation, else don't
753      * @see #getRemoveOnCancelPolicy
754      * @since 1.7
755      */
setRemoveOnCancelPolicy(boolean value)756     public void setRemoveOnCancelPolicy(boolean value) {
757         removeOnCancel = value;
758     }
759 
760     /**
761      * Gets the policy on whether cancelled tasks should be immediately
762      * removed from the work queue at time of cancellation.  This value is
763      * by default {@code false}.
764      *
765      * @return {@code true} if cancelled tasks are immediately removed
766      *         from the queue
767      * @see #setRemoveOnCancelPolicy
768      * @since 1.7
769      */
getRemoveOnCancelPolicy()770     public boolean getRemoveOnCancelPolicy() {
771         return removeOnCancel;
772     }
773 
774     /**
775      * Initiates an orderly shutdown in which previously submitted
776      * tasks are executed, but no new tasks will be accepted.
777      * Invocation has no additional effect if already shut down.
778      *
779      * <p>This method does not wait for previously submitted tasks to
780      * complete execution.  Use {@link #awaitTermination awaitTermination}
781      * to do that.
782      *
783      * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
784      * has been set {@code false}, existing delayed tasks whose delays
785      * have not yet elapsed are cancelled.  And unless the {@code
786      * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
787      * {@code true}, future executions of existing periodic tasks will
788      * be cancelled.
789      */
790     // android-note: Removed "throws SecurityException" doc.
shutdown()791     public void shutdown() {
792         super.shutdown();
793     }
794 
795     /**
796      * Attempts to stop all actively executing tasks, halts the
797      * processing of waiting tasks, and returns a list of the tasks
798      * that were awaiting execution. These tasks are drained (removed)
799      * from the task queue upon return from this method.
800      *
801      * <p>This method does not wait for actively executing tasks to
802      * terminate.  Use {@link #awaitTermination awaitTermination} to
803      * do that.
804      *
805      * <p>There are no guarantees beyond best-effort attempts to stop
806      * processing actively executing tasks.  This implementation
807      * interrupts tasks via {@link Thread#interrupt}; any task that
808      * fails to respond to interrupts may never terminate.
809      *
810      * @return list of tasks that never commenced execution.
811      *         Each element of this list is a {@link ScheduledFuture}.
812      *         For tasks submitted via one of the {@code schedule}
813      *         methods, the element will be identical to the returned
814      *         {@code ScheduledFuture}.  For tasks submitted using
815      *         {@link #execute execute}, the element will be a
816      *         zero-delay {@code ScheduledFuture}.
817      */
818     // android-note: Removed "throws SecurityException" doc.
shutdownNow()819     public List<Runnable> shutdownNow() {
820         return super.shutdownNow();
821     }
822 
823     /**
824      * Returns the task queue used by this executor.  Access to the
825      * task queue is intended primarily for debugging and monitoring.
826      * This queue may be in active use.  Retrieving the task queue
827      * does not prevent queued tasks from executing.
828      *
829      * <p>Each element of this queue is a {@link ScheduledFuture}.
830      * For tasks submitted via one of the {@code schedule} methods, the
831      * element will be identical to the returned {@code ScheduledFuture}.
832      * For tasks submitted using {@link #execute execute}, the element
833      * will be a zero-delay {@code ScheduledFuture}.
834      *
835      * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
836      * tasks in the order in which they will execute.
837      *
838      * @return the task queue
839      */
getQueue()840     public BlockingQueue<Runnable> getQueue() {
841         return super.getQueue();
842     }
843 
844     /**
845      * Specialized delay queue. To mesh with TPE declarations, this
846      * class must be declared as a BlockingQueue<Runnable> even though
847      * it can only hold RunnableScheduledFutures.
848      */
849     static class DelayedWorkQueue extends AbstractQueue<Runnable>
850         implements BlockingQueue<Runnable> {
851 
852         /*
853          * A DelayedWorkQueue is based on a heap-based data structure
854          * like those in DelayQueue and PriorityQueue, except that
855          * every ScheduledFutureTask also records its index into the
856          * heap array. This eliminates the need to find a task upon
857          * cancellation, greatly speeding up removal (down from O(n)
858          * to O(log n)), and reducing garbage retention that would
859          * otherwise occur by waiting for the element to rise to top
860          * before clearing. But because the queue may also hold
861          * RunnableScheduledFutures that are not ScheduledFutureTasks,
862          * we are not guaranteed to have such indices available, in
863          * which case we fall back to linear search. (We expect that
864          * most tasks will not be decorated, and that the faster cases
865          * will be much more common.)
866          *
867          * All heap operations must record index changes -- mainly
868          * within siftUp and siftDown. Upon removal, a task's
869          * heapIndex is set to -1. Note that ScheduledFutureTasks can
870          * appear at most once in the queue (this need not be true for
871          * other kinds of tasks or work queues), so are uniquely
872          * identified by heapIndex.
873          */
874 
875         private static final int INITIAL_CAPACITY = 16;
876         private RunnableScheduledFuture<?>[] queue =
877             new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
878         private final ReentrantLock lock = new ReentrantLock();
879         private int size;
880 
881         /**
882          * Thread designated to wait for the task at the head of the
883          * queue.  This variant of the Leader-Follower pattern
884          * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
885          * minimize unnecessary timed waiting.  When a thread becomes
886          * the leader, it waits only for the next delay to elapse, but
887          * other threads await indefinitely.  The leader thread must
888          * signal some other thread before returning from take() or
889          * poll(...), unless some other thread becomes leader in the
890          * interim.  Whenever the head of the queue is replaced with a
891          * task with an earlier expiration time, the leader field is
892          * invalidated by being reset to null, and some waiting
893          * thread, but not necessarily the current leader, is
894          * signalled.  So waiting threads must be prepared to acquire
895          * and lose leadership while waiting.
896          */
897         private Thread leader;
898 
899         /**
900          * Condition signalled when a newer task becomes available at the
901          * head of the queue or a new thread may need to become leader.
902          */
903         private final Condition available = lock.newCondition();
904 
905         /**
906          * Sets f's heapIndex if it is a ScheduledFutureTask.
907          */
setIndex(RunnableScheduledFuture<?> f, int idx)908         private void setIndex(RunnableScheduledFuture<?> f, int idx) {
909             if (f instanceof ScheduledFutureTask)
910                 ((ScheduledFutureTask)f).heapIndex = idx;
911         }
912 
913         /**
914          * Sifts element added at bottom up to its heap-ordered spot.
915          * Call only when holding lock.
916          */
siftUp(int k, RunnableScheduledFuture<?> key)917         private void siftUp(int k, RunnableScheduledFuture<?> key) {
918             while (k > 0) {
919                 int parent = (k - 1) >>> 1;
920                 RunnableScheduledFuture<?> e = queue[parent];
921                 if (key.compareTo(e) >= 0)
922                     break;
923                 queue[k] = e;
924                 setIndex(e, k);
925                 k = parent;
926             }
927             queue[k] = key;
928             setIndex(key, k);
929         }
930 
931         /**
932          * Sifts element added at top down to its heap-ordered spot.
933          * Call only when holding lock.
934          */
siftDown(int k, RunnableScheduledFuture<?> key)935         private void siftDown(int k, RunnableScheduledFuture<?> key) {
936             int half = size >>> 1;
937             while (k < half) {
938                 int child = (k << 1) + 1;
939                 RunnableScheduledFuture<?> c = queue[child];
940                 int right = child + 1;
941                 if (right < size && c.compareTo(queue[right]) > 0)
942                     c = queue[child = right];
943                 if (key.compareTo(c) <= 0)
944                     break;
945                 queue[k] = c;
946                 setIndex(c, k);
947                 k = child;
948             }
949             queue[k] = key;
950             setIndex(key, k);
951         }
952 
953         /**
954          * Resizes the heap array.  Call only when holding lock.
955          */
grow()956         private void grow() {
957             int oldCapacity = queue.length;
958             int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
959             if (newCapacity < 0) // overflow
960                 newCapacity = Integer.MAX_VALUE;
961             queue = Arrays.copyOf(queue, newCapacity);
962         }
963 
964         /**
965          * Finds index of given object, or -1 if absent.
966          */
indexOf(Object x)967         private int indexOf(Object x) {
968             if (x != null) {
969                 if (x instanceof ScheduledFutureTask) {
970                     int i = ((ScheduledFutureTask) x).heapIndex;
971                     // Sanity check; x could conceivably be a
972                     // ScheduledFutureTask from some other pool.
973                     if (i >= 0 && i < size && queue[i] == x)
974                         return i;
975                 } else {
976                     for (int i = 0; i < size; i++)
977                         if (x.equals(queue[i]))
978                             return i;
979                 }
980             }
981             return -1;
982         }
983 
contains(Object x)984         public boolean contains(Object x) {
985             final ReentrantLock lock = this.lock;
986             lock.lock();
987             try {
988                 return indexOf(x) != -1;
989             } finally {
990                 lock.unlock();
991             }
992         }
993 
remove(Object x)994         public boolean remove(Object x) {
995             final ReentrantLock lock = this.lock;
996             lock.lock();
997             try {
998                 int i = indexOf(x);
999                 if (i < 0)
1000                     return false;
1001 
1002                 setIndex(queue[i], -1);
1003                 int s = --size;
1004                 RunnableScheduledFuture<?> replacement = queue[s];
1005                 queue[s] = null;
1006                 if (s != i) {
1007                     siftDown(i, replacement);
1008                     if (queue[i] == replacement)
1009                         siftUp(i, replacement);
1010                 }
1011                 return true;
1012             } finally {
1013                 lock.unlock();
1014             }
1015         }
1016 
size()1017         public int size() {
1018             final ReentrantLock lock = this.lock;
1019             lock.lock();
1020             try {
1021                 return size;
1022             } finally {
1023                 lock.unlock();
1024             }
1025         }
1026 
isEmpty()1027         public boolean isEmpty() {
1028             return size() == 0;
1029         }
1030 
remainingCapacity()1031         public int remainingCapacity() {
1032             return Integer.MAX_VALUE;
1033         }
1034 
peek()1035         public RunnableScheduledFuture<?> peek() {
1036             final ReentrantLock lock = this.lock;
1037             lock.lock();
1038             try {
1039                 return queue[0];
1040             } finally {
1041                 lock.unlock();
1042             }
1043         }
1044 
offer(Runnable x)1045         public boolean offer(Runnable x) {
1046             if (x == null)
1047                 throw new NullPointerException();
1048             RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1049             final ReentrantLock lock = this.lock;
1050             lock.lock();
1051             try {
1052                 int i = size;
1053                 if (i >= queue.length)
1054                     grow();
1055                 size = i + 1;
1056                 if (i == 0) {
1057                     queue[0] = e;
1058                     setIndex(e, 0);
1059                 } else {
1060                     siftUp(i, e);
1061                 }
1062                 if (queue[0] == e) {
1063                     leader = null;
1064                     available.signal();
1065                 }
1066             } finally {
1067                 lock.unlock();
1068             }
1069             return true;
1070         }
1071 
put(Runnable e)1072         public void put(Runnable e) {
1073             offer(e);
1074         }
1075 
add(Runnable e)1076         public boolean add(Runnable e) {
1077             return offer(e);
1078         }
1079 
offer(Runnable e, long timeout, TimeUnit unit)1080         public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1081             return offer(e);
1082         }
1083 
1084         /**
1085          * Performs common bookkeeping for poll and take: Replaces
1086          * first element with last and sifts it down.  Call only when
1087          * holding lock.
1088          * @param f the task to remove and return
1089          */
finishPoll(RunnableScheduledFuture<?> f)1090         private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1091             int s = --size;
1092             RunnableScheduledFuture<?> x = queue[s];
1093             queue[s] = null;
1094             if (s != 0)
1095                 siftDown(0, x);
1096             setIndex(f, -1);
1097             return f;
1098         }
1099 
poll()1100         public RunnableScheduledFuture<?> poll() {
1101             final ReentrantLock lock = this.lock;
1102             lock.lock();
1103             try {
1104                 RunnableScheduledFuture<?> first = queue[0];
1105                 return (first == null || first.getDelay(NANOSECONDS) > 0)
1106                     ? null
1107                     : finishPoll(first);
1108             } finally {
1109                 lock.unlock();
1110             }
1111         }
1112 
take()1113         public RunnableScheduledFuture<?> take() throws InterruptedException {
1114             final ReentrantLock lock = this.lock;
1115             lock.lockInterruptibly();
1116             try {
1117                 for (;;) {
1118                     RunnableScheduledFuture<?> first = queue[0];
1119                     if (first == null)
1120                         available.await();
1121                     else {
1122                         long delay = first.getDelay(NANOSECONDS);
1123                         if (delay <= 0L)
1124                             return finishPoll(first);
1125                         first = null; // don't retain ref while waiting
1126                         if (leader != null)
1127                             available.await();
1128                         else {
1129                             Thread thisThread = Thread.currentThread();
1130                             leader = thisThread;
1131                             try {
1132                                 available.awaitNanos(delay);
1133                             } finally {
1134                                 if (leader == thisThread)
1135                                     leader = null;
1136                             }
1137                         }
1138                     }
1139                 }
1140             } finally {
1141                 if (leader == null && queue[0] != null)
1142                     available.signal();
1143                 lock.unlock();
1144             }
1145         }
1146 
poll(long timeout, TimeUnit unit)1147         public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1148             throws InterruptedException {
1149             long nanos = unit.toNanos(timeout);
1150             final ReentrantLock lock = this.lock;
1151             lock.lockInterruptibly();
1152             try {
1153                 for (;;) {
1154                     RunnableScheduledFuture<?> first = queue[0];
1155                     if (first == null) {
1156                         if (nanos <= 0L)
1157                             return null;
1158                         else
1159                             nanos = available.awaitNanos(nanos);
1160                     } else {
1161                         long delay = first.getDelay(NANOSECONDS);
1162                         if (delay <= 0L)
1163                             return finishPoll(first);
1164                         if (nanos <= 0L)
1165                             return null;
1166                         first = null; // don't retain ref while waiting
1167                         if (nanos < delay || leader != null)
1168                             nanos = available.awaitNanos(nanos);
1169                         else {
1170                             Thread thisThread = Thread.currentThread();
1171                             leader = thisThread;
1172                             try {
1173                                 long timeLeft = available.awaitNanos(delay);
1174                                 nanos -= delay - timeLeft;
1175                             } finally {
1176                                 if (leader == thisThread)
1177                                     leader = null;
1178                             }
1179                         }
1180                     }
1181                 }
1182             } finally {
1183                 if (leader == null && queue[0] != null)
1184                     available.signal();
1185                 lock.unlock();
1186             }
1187         }
1188 
clear()1189         public void clear() {
1190             final ReentrantLock lock = this.lock;
1191             lock.lock();
1192             try {
1193                 for (int i = 0; i < size; i++) {
1194                     RunnableScheduledFuture<?> t = queue[i];
1195                     if (t != null) {
1196                         queue[i] = null;
1197                         setIndex(t, -1);
1198                     }
1199                 }
1200                 size = 0;
1201             } finally {
1202                 lock.unlock();
1203             }
1204         }
1205 
1206         /**
1207          * Returns first element only if it is expired.
1208          * Used only by drainTo.  Call only when holding lock.
1209          */
peekExpired()1210         private RunnableScheduledFuture<?> peekExpired() {
1211             // assert lock.isHeldByCurrentThread();
1212             RunnableScheduledFuture<?> first = queue[0];
1213             return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1214                 null : first;
1215         }
1216 
drainTo(Collection<? super Runnable> c)1217         public int drainTo(Collection<? super Runnable> c) {
1218             if (c == null)
1219                 throw new NullPointerException();
1220             if (c == this)
1221                 throw new IllegalArgumentException();
1222             final ReentrantLock lock = this.lock;
1223             lock.lock();
1224             try {
1225                 RunnableScheduledFuture<?> first;
1226                 int n = 0;
1227                 while ((first = peekExpired()) != null) {
1228                     c.add(first);   // In this order, in case add() throws.
1229                     finishPoll(first);
1230                     ++n;
1231                 }
1232                 return n;
1233             } finally {
1234                 lock.unlock();
1235             }
1236         }
1237 
drainTo(Collection<? super Runnable> c, int maxElements)1238         public int drainTo(Collection<? super Runnable> c, int maxElements) {
1239             if (c == null)
1240                 throw new NullPointerException();
1241             if (c == this)
1242                 throw new IllegalArgumentException();
1243             if (maxElements <= 0)
1244                 return 0;
1245             final ReentrantLock lock = this.lock;
1246             lock.lock();
1247             try {
1248                 RunnableScheduledFuture<?> first;
1249                 int n = 0;
1250                 while (n < maxElements && (first = peekExpired()) != null) {
1251                     c.add(first);   // In this order, in case add() throws.
1252                     finishPoll(first);
1253                     ++n;
1254                 }
1255                 return n;
1256             } finally {
1257                 lock.unlock();
1258             }
1259         }
1260 
toArray()1261         public Object[] toArray() {
1262             final ReentrantLock lock = this.lock;
1263             lock.lock();
1264             try {
1265                 return Arrays.copyOf(queue, size, Object[].class);
1266             } finally {
1267                 lock.unlock();
1268             }
1269         }
1270 
1271         @SuppressWarnings("unchecked")
toArray(T[] a)1272         public <T> T[] toArray(T[] a) {
1273             final ReentrantLock lock = this.lock;
1274             lock.lock();
1275             try {
1276                 if (a.length < size)
1277                     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1278                 System.arraycopy(queue, 0, a, 0, size);
1279                 if (a.length > size)
1280                     a[size] = null;
1281                 return a;
1282             } finally {
1283                 lock.unlock();
1284             }
1285         }
1286 
iterator()1287         public Iterator<Runnable> iterator() {
1288             return new Itr(Arrays.copyOf(queue, size));
1289         }
1290 
1291         /**
1292          * Snapshot iterator that works off copy of underlying q array.
1293          */
1294         private class Itr implements Iterator<Runnable> {
1295             final RunnableScheduledFuture<?>[] array;
1296             int cursor;        // index of next element to return; initially 0
1297             int lastRet = -1;  // index of last element returned; -1 if no such
1298 
Itr(RunnableScheduledFuture<?>[] array)1299             Itr(RunnableScheduledFuture<?>[] array) {
1300                 this.array = array;
1301             }
1302 
hasNext()1303             public boolean hasNext() {
1304                 return cursor < array.length;
1305             }
1306 
next()1307             public Runnable next() {
1308                 if (cursor >= array.length)
1309                     throw new NoSuchElementException();
1310                 lastRet = cursor;
1311                 return array[cursor++];
1312             }
1313 
remove()1314             public void remove() {
1315                 if (lastRet < 0)
1316                     throw new IllegalStateException();
1317                 DelayedWorkQueue.this.remove(array[lastRet]);
1318                 lastRet = -1;
1319             }
1320         }
1321     }
1322 }
1323