1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import static java.util.concurrent.TimeUnit.MILLISECONDS;
39 import static java.util.concurrent.TimeUnit.NANOSECONDS;
40 
41 import java.util.AbstractQueue;
42 import java.util.Arrays;
43 import java.util.Collection;
44 import java.util.Iterator;
45 import java.util.List;
46 import java.util.NoSuchElementException;
47 import java.util.Objects;
48 import java.util.concurrent.atomic.AtomicLong;
49 import java.util.concurrent.locks.Condition;
50 import java.util.concurrent.locks.ReentrantLock;
51 
52 // BEGIN android-note
53 // omit class-level docs on setRemoveOnCancelPolicy()
54 // END android-note
55 
56 /**
57  * A {@link ThreadPoolExecutor} that can additionally schedule
58  * commands to run after a given delay, or to execute periodically.
59  * This class is preferable to {@link java.util.Timer} when multiple
60  * worker threads are needed, or when the additional flexibility or
61  * capabilities of {@link ThreadPoolExecutor} (which this class
62  * extends) are required.
63  *
64  * <p>Delayed tasks execute no sooner than they are enabled, but
65  * without any real-time guarantees about when, after they are
66  * enabled, they will commence. Tasks scheduled for exactly the same
67  * execution time are enabled in first-in-first-out (FIFO) order of
68  * submission.
69  *
70  * <p>When a submitted task is cancelled before it is run, execution
71  * is suppressed.  By default, such a cancelled task is not
72  * automatically removed from the work queue until its delay elapses.
73  * While this enables further inspection and monitoring, it may also
74  * cause unbounded retention of cancelled tasks.
75  *
76  * <p>Successive executions of a periodic task scheduled via
77  * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
78  * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
79  * do not overlap. While different executions may be performed by
80  * different threads, the effects of prior executions
81  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
82  * those of subsequent ones.
83  *
84  * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
85  * of the inherited tuning methods are not useful for it. In
86  * particular, because it acts as a fixed-sized pool using
87  * {@code corePoolSize} threads and an unbounded queue, adjustments
88  * to {@code maximumPoolSize} have no useful effect. Additionally, it
89  * is almost never a good idea to set {@code corePoolSize} to zero or
90  * use {@code allowCoreThreadTimeOut} because this may leave the pool
91  * without threads to handle tasks once they become eligible to run.
92  *
93  * <p>As with {@code ThreadPoolExecutor}, if not otherwise specified,
94  * this class uses {@link Executors#defaultThreadFactory} as the
95  * default thread factory, and {@link ThreadPoolExecutor.AbortPolicy}
96  * as the default rejected execution handler.
97  *
98  * <p><b>Extension notes:</b> This class overrides the
99  * {@link ThreadPoolExecutor#execute(Runnable) execute} and
100  * {@link AbstractExecutorService#submit(Runnable) submit}
101  * methods to generate internal {@link ScheduledFuture} objects to
102  * control per-task delays and scheduling.  To preserve
103  * functionality, any further overrides of these methods in
104  * subclasses must invoke superclass versions, which effectively
105  * disables additional task customization.  However, this class
106  * provides alternative protected extension method
107  * {@code decorateTask} (one version each for {@code Runnable} and
108  * {@code Callable}) that can be used to customize the concrete task
109  * types used to execute commands entered via {@code execute},
110  * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
111  * and {@code scheduleWithFixedDelay}.  By default, a
112  * {@code ScheduledThreadPoolExecutor} uses a task type extending
113  * {@link FutureTask}. However, this may be modified or replaced using
114  * subclasses of the form:
115  *
116  * <pre> {@code
117  * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
118  *
119  *   static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
120  *
121  *   protected <V> RunnableScheduledFuture<V> decorateTask(
122  *                Runnable r, RunnableScheduledFuture<V> task) {
123  *       return new CustomTask<V>(r, task);
124  *   }
125  *
126  *   protected <V> RunnableScheduledFuture<V> decorateTask(
127  *                Callable<V> c, RunnableScheduledFuture<V> task) {
128  *       return new CustomTask<V>(c, task);
129  *   }
130  *   // ... add constructors, etc.
131  * }}</pre>
132  *
133  * @since 1.5
134  * @author Doug Lea
135  */
136 public class ScheduledThreadPoolExecutor
137         extends ThreadPoolExecutor
138         implements ScheduledExecutorService {
139 
140     /*
141      * This class specializes ThreadPoolExecutor implementation by
142      *
143      * 1. Using a custom task type ScheduledFutureTask, even for tasks
144      *    that don't require scheduling because they are submitted
145      *    using ExecutorService rather than ScheduledExecutorService
146      *    methods, which are treated as tasks with a delay of zero.
147      *
148      * 2. Using a custom queue (DelayedWorkQueue), a variant of
149      *    unbounded DelayQueue. The lack of capacity constraint and
150      *    the fact that corePoolSize and maximumPoolSize are
151      *    effectively identical simplifies some execution mechanics
152      *    (see delayedExecute) compared to ThreadPoolExecutor.
153      *
154      * 3. Supporting optional run-after-shutdown parameters, which
155      *    leads to overrides of shutdown methods to remove and cancel
156      *    tasks that should NOT be run after shutdown, as well as
157      *    different recheck logic when task (re)submission overlaps
158      *    with a shutdown.
159      *
160      * 4. Task decoration methods to allow interception and
161      *    instrumentation, which are needed because subclasses cannot
162      *    otherwise override submit methods to get this effect. These
163      *    don't have any impact on pool control logic though.
164      */
165 
166     /**
167      * False if should cancel/suppress periodic tasks on shutdown.
168      */
169     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
170 
171     // Android-changed: Preserving behaviour on expired tasks (b/202927404)
172     /**
173      * False if should cancel non-periodic tasks on shutdown.
174      */
175     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
176 
177     /**
178      * True if ScheduledFutureTask.cancel should remove from queue.
179      */
180     volatile boolean removeOnCancel;
181 
182     /**
183      * Sequence number to break scheduling ties, and in turn to
184      * guarantee FIFO order among tied entries.
185      */
186     private static final AtomicLong sequencer = new AtomicLong();
187 
188     private class ScheduledFutureTask<V>
189             extends FutureTask<V> implements RunnableScheduledFuture<V> {
190 
191         /** Sequence number to break ties FIFO */
192         private final long sequenceNumber;
193 
194         /** The nanoTime-based time when the task is enabled to execute. */
195         private volatile long time;
196 
197         /**
198          * Period for repeating tasks, in nanoseconds.
199          * A positive value indicates fixed-rate execution.
200          * A negative value indicates fixed-delay execution.
201          * A value of 0 indicates a non-repeating (one-shot) task.
202          */
203         private final long period;
204 
205         /** The actual task to be re-enqueued by reExecutePeriodic */
206         RunnableScheduledFuture<V> outerTask = this;
207 
208         /**
209          * Index into delay queue, to support faster cancellation.
210          */
211         int heapIndex;
212 
213         /**
214          * Creates a one-shot action with given nanoTime-based trigger time.
215          */
ScheduledFutureTask(Runnable r, V result, long triggerTime, long sequenceNumber)216         ScheduledFutureTask(Runnable r, V result, long triggerTime,
217                             long sequenceNumber) {
218             super(r, result);
219             this.time = triggerTime;
220             this.period = 0;
221             this.sequenceNumber = sequenceNumber;
222         }
223 
224         /**
225          * Creates a periodic action with given nanoTime-based initial
226          * trigger time and period.
227          */
ScheduledFutureTask(Runnable r, V result, long triggerTime, long period, long sequenceNumber)228         ScheduledFutureTask(Runnable r, V result, long triggerTime,
229                             long period, long sequenceNumber) {
230             super(r, result);
231             this.time = triggerTime;
232             this.period = period;
233             this.sequenceNumber = sequenceNumber;
234         }
235 
236         /**
237          * Creates a one-shot action with given nanoTime-based trigger time.
238          */
ScheduledFutureTask(Callable<V> callable, long triggerTime, long sequenceNumber)239         ScheduledFutureTask(Callable<V> callable, long triggerTime,
240                             long sequenceNumber) {
241             super(callable);
242             this.time = triggerTime;
243             this.period = 0;
244             this.sequenceNumber = sequenceNumber;
245         }
246 
getDelay(TimeUnit unit)247         public long getDelay(TimeUnit unit) {
248             return unit.convert(time - System.nanoTime(), NANOSECONDS);
249         }
250 
compareTo(Delayed other)251         public int compareTo(Delayed other) {
252             if (other == this) // compare zero if same object
253                 return 0;
254             if (other instanceof ScheduledFutureTask) {
255                 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
256                 long diff = time - x.time;
257                 if (diff < 0)
258                     return -1;
259                 else if (diff > 0)
260                     return 1;
261                 else if (sequenceNumber < x.sequenceNumber)
262                     return -1;
263                 else
264                     return 1;
265             }
266             long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
267             return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
268         }
269 
270         /**
271          * Returns {@code true} if this is a periodic (not a one-shot) action.
272          *
273          * @return {@code true} if periodic
274          */
isPeriodic()275         public boolean isPeriodic() {
276             return period != 0;
277         }
278 
279         /**
280          * Sets the next time to run for a periodic task.
281          */
setNextRunTime()282         private void setNextRunTime() {
283             long p = period;
284             if (p > 0)
285                 time += p;
286             else
287                 time = triggerTime(-p);
288         }
289 
cancel(boolean mayInterruptIfRunning)290         public boolean cancel(boolean mayInterruptIfRunning) {
291             // The racy read of heapIndex below is benign:
292             // if heapIndex < 0, then OOTA guarantees that we have surely
293             // been removed; else we recheck under lock in remove()
294             boolean cancelled = super.cancel(mayInterruptIfRunning);
295             if (cancelled && removeOnCancel && heapIndex >= 0)
296                 remove(this);
297             return cancelled;
298         }
299 
300         /**
301          * Overrides FutureTask version so as to reset/requeue if periodic.
302          */
run()303         public void run() {
304             if (!canRunInCurrentRunState(this))
305                 cancel(false);
306             else if (!isPeriodic())
307                 super.run();
308             else if (super.runAndReset()) {
309                 setNextRunTime();
310                 reExecutePeriodic(outerTask);
311             }
312         }
313     }
314 
315     /**
316      * Returns true if can run a task given current run state and
317      * run-after-shutdown parameters.
318      */
canRunInCurrentRunState(RunnableScheduledFuture<?> task)319     boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) {
320         if (!isShutdown())
321             return true;
322         if (isStopped())
323             return false;
324         return task.isPeriodic()
325             ? continueExistingPeriodicTasksAfterShutdown
326             : (executeExistingDelayedTasksAfterShutdown
327             // Android-changed: Preserving behaviour on expired tasks (b/202927404)
328             //   || task.getDelay(NANOSECONDS) <= 0);
329               );
330     }
331 
332     /**
333      * Main execution method for delayed or periodic tasks.  If pool
334      * is shut down, rejects the task. Otherwise adds task to queue
335      * and starts a thread, if necessary, to run it.  (We cannot
336      * prestart the thread to run the task because the task (probably)
337      * shouldn't be run yet.)  If the pool is shut down while the task
338      * is being added, cancel and remove it if required by state and
339      * run-after-shutdown parameters.
340      *
341      * @param task the task
342      */
delayedExecute(RunnableScheduledFuture<?> task)343     private void delayedExecute(RunnableScheduledFuture<?> task) {
344         if (isShutdown())
345             reject(task);
346         else {
347             super.getQueue().add(task);
348             if (!canRunInCurrentRunState(task) && remove(task))
349                 task.cancel(false);
350             else
351                 ensurePrestart();
352         }
353     }
354 
355     /**
356      * Requeues a periodic task unless current run state precludes it.
357      * Same idea as delayedExecute except drops task rather than rejecting.
358      *
359      * @param task the task
360      */
reExecutePeriodic(RunnableScheduledFuture<?> task)361     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
362         if (canRunInCurrentRunState(task)) {
363             super.getQueue().add(task);
364             if (canRunInCurrentRunState(task) || !remove(task)) {
365                 ensurePrestart();
366                 return;
367             }
368         }
369         task.cancel(false);
370     }
371 
372     /**
373      * Cancels and clears the queue of all tasks that should not be run
374      * due to shutdown policy.  Invoked within super.shutdown.
375      */
onShutdown()376     @Override void onShutdown() {
377         BlockingQueue<Runnable> q = super.getQueue();
378         boolean keepDelayed =
379             getExecuteExistingDelayedTasksAfterShutdownPolicy();
380         boolean keepPeriodic =
381             getContinueExistingPeriodicTasksAfterShutdownPolicy();
382         // Traverse snapshot to avoid iterator exceptions
383         // TODO: implement and use efficient removeIf
384         // super.getQueue().removeIf(...);
385         for (Object e : q.toArray()) {
386             if (e instanceof RunnableScheduledFuture) {
387                 RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
388                 if ((t.isPeriodic()
389                      ? !keepPeriodic
390                      // Android-changed: Preserving behaviour on expired tasks (b/202927404)
391                      // : (!keepDelayed && t.getDelay(NANOSECONDS) > 0))
392                      : !keepDelayed)
393                     || t.isCancelled()) { // also remove if already cancelled
394                     if (q.remove(t))
395                         t.cancel(false);
396                 }
397             }
398         }
399         tryTerminate();
400     }
401 
402     /**
403      * Modifies or replaces the task used to execute a runnable.
404      * This method can be used to override the concrete
405      * class used for managing internal tasks.
406      * The default implementation simply returns the given task.
407      *
408      * @param runnable the submitted Runnable
409      * @param task the task created to execute the runnable
410      * @param <V> the type of the task's result
411      * @return a task that can execute the runnable
412      * @since 1.6
413      */
decorateTask( Runnable runnable, RunnableScheduledFuture<V> task)414     protected <V> RunnableScheduledFuture<V> decorateTask(
415         Runnable runnable, RunnableScheduledFuture<V> task) {
416         return task;
417     }
418 
419     /**
420      * Modifies or replaces the task used to execute a callable.
421      * This method can be used to override the concrete
422      * class used for managing internal tasks.
423      * The default implementation simply returns the given task.
424      *
425      * @param callable the submitted Callable
426      * @param task the task created to execute the callable
427      * @param <V> the type of the task's result
428      * @return a task that can execute the callable
429      * @since 1.6
430      */
decorateTask( Callable<V> callable, RunnableScheduledFuture<V> task)431     protected <V> RunnableScheduledFuture<V> decorateTask(
432         Callable<V> callable, RunnableScheduledFuture<V> task) {
433         return task;
434     }
435 
436     /**
437      * The default keep-alive time for pool threads.
438      *
439      * Normally, this value is unused because all pool threads will be
440      * core threads, but if a user creates a pool with a corePoolSize
441      * of zero (against our advice), we keep a thread alive as long as
442      * there are queued tasks.  If the keep alive time is zero (the
443      * historic value), we end up hot-spinning in getTask, wasting a
444      * CPU.  But on the other hand, if we set the value too high, and
445      * users create a one-shot pool which they don't cleanly shutdown,
446      * the pool's non-daemon threads will prevent JVM termination.  A
447      * small but non-zero value (relative to a JVM's lifetime) seems
448      * best.
449      */
450     private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
451 
452     /**
453      * Creates a new {@code ScheduledThreadPoolExecutor} with the
454      * given core pool size.
455      *
456      * @param corePoolSize the number of threads to keep in the pool, even
457      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
458      * @throws IllegalArgumentException if {@code corePoolSize < 0}
459      */
ScheduledThreadPoolExecutor(int corePoolSize)460     public ScheduledThreadPoolExecutor(int corePoolSize) {
461         super(corePoolSize, Integer.MAX_VALUE,
462               DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
463               new DelayedWorkQueue());
464     }
465 
466     /**
467      * Creates a new {@code ScheduledThreadPoolExecutor} with the
468      * given initial parameters.
469      *
470      * @param corePoolSize the number of threads to keep in the pool, even
471      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
472      * @param threadFactory the factory to use when the executor
473      *        creates a new thread
474      * @throws IllegalArgumentException if {@code corePoolSize < 0}
475      * @throws NullPointerException if {@code threadFactory} is null
476      */
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)477     public ScheduledThreadPoolExecutor(int corePoolSize,
478                                        ThreadFactory threadFactory) {
479         super(corePoolSize, Integer.MAX_VALUE,
480               DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
481               new DelayedWorkQueue(), threadFactory);
482     }
483 
484     /**
485      * Creates a new {@code ScheduledThreadPoolExecutor} with the
486      * given initial parameters.
487      *
488      * @param corePoolSize the number of threads to keep in the pool, even
489      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
490      * @param handler the handler to use when execution is blocked
491      *        because the thread bounds and queue capacities are reached
492      * @throws IllegalArgumentException if {@code corePoolSize < 0}
493      * @throws NullPointerException if {@code handler} is null
494      */
ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)495     public ScheduledThreadPoolExecutor(int corePoolSize,
496                                        RejectedExecutionHandler handler) {
497         super(corePoolSize, Integer.MAX_VALUE,
498               DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
499               new DelayedWorkQueue(), handler);
500     }
501 
502     /**
503      * Creates a new {@code ScheduledThreadPoolExecutor} with the
504      * given initial parameters.
505      *
506      * @param corePoolSize the number of threads to keep in the pool, even
507      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
508      * @param threadFactory the factory to use when the executor
509      *        creates a new thread
510      * @param handler the handler to use when execution is blocked
511      *        because the thread bounds and queue capacities are reached
512      * @throws IllegalArgumentException if {@code corePoolSize < 0}
513      * @throws NullPointerException if {@code threadFactory} or
514      *         {@code handler} is null
515      */
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)516     public ScheduledThreadPoolExecutor(int corePoolSize,
517                                        ThreadFactory threadFactory,
518                                        RejectedExecutionHandler handler) {
519         super(corePoolSize, Integer.MAX_VALUE,
520               DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
521               new DelayedWorkQueue(), threadFactory, handler);
522     }
523 
524     /**
525      * Returns the nanoTime-based trigger time of a delayed action.
526      */
triggerTime(long delay, TimeUnit unit)527     private long triggerTime(long delay, TimeUnit unit) {
528         return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
529     }
530 
531     /**
532      * Returns the nanoTime-based trigger time of a delayed action.
533      */
triggerTime(long delay)534     long triggerTime(long delay) {
535         return System.nanoTime() +
536             ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
537     }
538 
539     /**
540      * Constrains the values of all delays in the queue to be within
541      * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
542      * This may occur if a task is eligible to be dequeued, but has
543      * not yet been, while some other task is added with a delay of
544      * Long.MAX_VALUE.
545      */
overflowFree(long delay)546     private long overflowFree(long delay) {
547         Delayed head = (Delayed) super.getQueue().peek();
548         if (head != null) {
549             long headDelay = head.getDelay(NANOSECONDS);
550             if (headDelay < 0 && (delay - headDelay < 0))
551                 delay = Long.MAX_VALUE + headDelay;
552         }
553         return delay;
554     }
555 
556     /**
557      * @throws RejectedExecutionException {@inheritDoc}
558      * @throws NullPointerException       {@inheritDoc}
559      */
schedule(Runnable command, long delay, TimeUnit unit)560     public ScheduledFuture<?> schedule(Runnable command,
561                                        long delay,
562                                        TimeUnit unit) {
563         if (command == null || unit == null)
564             throw new NullPointerException();
565         RunnableScheduledFuture<Void> t = decorateTask(command,
566             new ScheduledFutureTask<Void>(command, null,
567                                           triggerTime(delay, unit),
568                                           sequencer.getAndIncrement()));
569         delayedExecute(t);
570         return t;
571     }
572 
573     /**
574      * @throws RejectedExecutionException {@inheritDoc}
575      * @throws NullPointerException       {@inheritDoc}
576      */
schedule(Callable<V> callable, long delay, TimeUnit unit)577     public <V> ScheduledFuture<V> schedule(Callable<V> callable,
578                                            long delay,
579                                            TimeUnit unit) {
580         if (callable == null || unit == null)
581             throw new NullPointerException();
582         RunnableScheduledFuture<V> t = decorateTask(callable,
583             new ScheduledFutureTask<V>(callable,
584                                        triggerTime(delay, unit),
585                                        sequencer.getAndIncrement()));
586         delayedExecute(t);
587         return t;
588     }
589 
590     /**
591      * Submits a periodic action that becomes enabled first after the
592      * given initial delay, and subsequently with the given period;
593      * that is, executions will commence after
594      * {@code initialDelay}, then {@code initialDelay + period}, then
595      * {@code initialDelay + 2 * period}, and so on.
596      *
597      * <p>The sequence of task executions continues indefinitely until
598      * one of the following exceptional completions occur:
599      * <ul>
600      * <li>The task is {@linkplain Future#cancel explicitly cancelled}
601      * via the returned future.
602      * <li>Method {@link #shutdown} is called and the {@linkplain
603      * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on
604      * whether to continue after shutdown} is not set true, or method
605      * {@link #shutdownNow} is called; also resulting in task
606      * cancellation.
607      * <li>An execution of the task throws an exception.  In this case
608      * calling {@link Future#get() get} on the returned future will throw
609      * {@link ExecutionException}, holding the exception as its cause.
610      * </ul>
611      * Subsequent executions are suppressed.  Subsequent calls to
612      * {@link Future#isDone isDone()} on the returned future will
613      * return {@code true}.
614      *
615      * <p>If any execution of this task takes longer than its period, then
616      * subsequent executions may start late, but will not concurrently
617      * execute.
618      *
619      * @throws RejectedExecutionException {@inheritDoc}
620      * @throws NullPointerException       {@inheritDoc}
621      * @throws IllegalArgumentException   {@inheritDoc}
622      */
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)623     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
624                                                   long initialDelay,
625                                                   long period,
626                                                   TimeUnit unit) {
627         if (command == null || unit == null)
628             throw new NullPointerException();
629         if (period <= 0L)
630             throw new IllegalArgumentException();
631         ScheduledFutureTask<Void> sft =
632             new ScheduledFutureTask<Void>(command,
633                                           null,
634                                           triggerTime(initialDelay, unit),
635                                           unit.toNanos(period),
636                                           sequencer.getAndIncrement());
637         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
638         sft.outerTask = t;
639         delayedExecute(t);
640         return t;
641     }
642 
643     /**
644      * Submits a periodic action that becomes enabled first after the
645      * given initial delay, and subsequently with the given delay
646      * between the termination of one execution and the commencement of
647      * the next.
648      *
649      * <p>The sequence of task executions continues indefinitely until
650      * one of the following exceptional completions occur:
651      * <ul>
652      * <li>The task is {@linkplain Future#cancel explicitly cancelled}
653      * via the returned future.
654      * <li>Method {@link #shutdown} is called and the {@linkplain
655      * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on
656      * whether to continue after shutdown} is not set true, or method
657      * {@link #shutdownNow} is called; also resulting in task
658      * cancellation.
659      * <li>An execution of the task throws an exception.  In this case
660      * calling {@link Future#get() get} on the returned future will throw
661      * {@link ExecutionException}, holding the exception as its cause.
662      * </ul>
663      * Subsequent executions are suppressed.  Subsequent calls to
664      * {@link Future#isDone isDone()} on the returned future will
665      * return {@code true}.
666      *
667      * @throws RejectedExecutionException {@inheritDoc}
668      * @throws NullPointerException       {@inheritDoc}
669      * @throws IllegalArgumentException   {@inheritDoc}
670      */
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)671     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
672                                                      long initialDelay,
673                                                      long delay,
674                                                      TimeUnit unit) {
675         if (command == null || unit == null)
676             throw new NullPointerException();
677         if (delay <= 0L)
678             throw new IllegalArgumentException();
679         ScheduledFutureTask<Void> sft =
680             new ScheduledFutureTask<Void>(command,
681                                           null,
682                                           triggerTime(initialDelay, unit),
683                                           -unit.toNanos(delay),
684                                           sequencer.getAndIncrement());
685         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
686         sft.outerTask = t;
687         delayedExecute(t);
688         return t;
689     }
690 
691     /**
692      * Executes {@code command} with zero required delay.
693      * This has effect equivalent to
694      * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
695      * Note that inspections of the queue and of the list returned by
696      * {@code shutdownNow} will access the zero-delayed
697      * {@link ScheduledFuture}, not the {@code command} itself.
698      *
699      * <p>A consequence of the use of {@code ScheduledFuture} objects is
700      * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
701      * called with a null second {@code Throwable} argument, even if the
702      * {@code command} terminated abruptly.  Instead, the {@code Throwable}
703      * thrown by such a task can be obtained via {@link Future#get}.
704      *
705      * @throws RejectedExecutionException at discretion of
706      *         {@code RejectedExecutionHandler}, if the task
707      *         cannot be accepted for execution because the
708      *         executor has been shut down
709      * @throws NullPointerException {@inheritDoc}
710      */
execute(Runnable command)711     public void execute(Runnable command) {
712         schedule(command, 0, NANOSECONDS);
713     }
714 
715     // Override AbstractExecutorService methods
716 
717     /**
718      * @throws RejectedExecutionException {@inheritDoc}
719      * @throws NullPointerException       {@inheritDoc}
720      */
submit(Runnable task)721     public Future<?> submit(Runnable task) {
722         return schedule(task, 0, NANOSECONDS);
723     }
724 
725     /**
726      * @throws RejectedExecutionException {@inheritDoc}
727      * @throws NullPointerException       {@inheritDoc}
728      */
submit(Runnable task, T result)729     public <T> Future<T> submit(Runnable task, T result) {
730         return schedule(Executors.callable(task, result), 0, NANOSECONDS);
731     }
732 
733     /**
734      * @throws RejectedExecutionException {@inheritDoc}
735      * @throws NullPointerException       {@inheritDoc}
736      */
submit(Callable<T> task)737     public <T> Future<T> submit(Callable<T> task) {
738         return schedule(task, 0, NANOSECONDS);
739     }
740 
741     /**
742      * Sets the policy on whether to continue executing existing
743      * periodic tasks even when this executor has been {@code shutdown}.
744      * In this case, executions will continue until {@code shutdownNow}
745      * or the policy is set to {@code false} when already shutdown.
746      * This value is by default {@code false}.
747      *
748      * @param value if {@code true}, continue after shutdown, else don't
749      * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
750      */
setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)751     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
752         continueExistingPeriodicTasksAfterShutdown = value;
753         if (!value && isShutdown())
754             onShutdown();
755     }
756 
757     /**
758      * Gets the policy on whether to continue executing existing
759      * periodic tasks even when this executor has been {@code shutdown}.
760      * In this case, executions will continue until {@code shutdownNow}
761      * or the policy is set to {@code false} when already shutdown.
762      * This value is by default {@code false}.
763      *
764      * @return {@code true} if will continue after shutdown
765      * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
766      */
getContinueExistingPeriodicTasksAfterShutdownPolicy()767     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
768         return continueExistingPeriodicTasksAfterShutdown;
769     }
770 
771     /**
772      * Sets the policy on whether to execute existing delayed
773      * tasks even when this executor has been {@code shutdown}.
774      * In this case, these tasks will only terminate upon
775      * {@code shutdownNow}, or after setting the policy to
776      * {@code false} when already shutdown.
777      * This value is by default {@code true}.
778      *
779      * @param value if {@code true}, execute after shutdown, else don't
780      * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
781      */
setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value)782     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
783         executeExistingDelayedTasksAfterShutdown = value;
784         if (!value && isShutdown())
785             onShutdown();
786     }
787 
788     /**
789      * Gets the policy on whether to execute existing delayed
790      * tasks even when this executor has been {@code shutdown}.
791      * In this case, these tasks will only terminate upon
792      * {@code shutdownNow}, or after setting the policy to
793      * {@code false} when already shutdown.
794      * This value is by default {@code true}.
795      *
796      * @return {@code true} if will execute after shutdown
797      * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
798      */
getExecuteExistingDelayedTasksAfterShutdownPolicy()799     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
800         return executeExistingDelayedTasksAfterShutdown;
801     }
802 
803     /**
804      * Sets the policy on whether cancelled tasks should be immediately
805      * removed from the work queue at time of cancellation.  This value is
806      * by default {@code false}.
807      *
808      * @param value if {@code true}, remove on cancellation, else don't
809      * @see #getRemoveOnCancelPolicy
810      * @since 1.7
811      */
setRemoveOnCancelPolicy(boolean value)812     public void setRemoveOnCancelPolicy(boolean value) {
813         removeOnCancel = value;
814     }
815 
816     /**
817      * Gets the policy on whether cancelled tasks should be immediately
818      * removed from the work queue at time of cancellation.  This value is
819      * by default {@code false}.
820      *
821      * @return {@code true} if cancelled tasks are immediately removed
822      *         from the queue
823      * @see #setRemoveOnCancelPolicy
824      * @since 1.7
825      */
getRemoveOnCancelPolicy()826     public boolean getRemoveOnCancelPolicy() {
827         return removeOnCancel;
828     }
829 
830     /**
831      * Initiates an orderly shutdown in which previously submitted
832      * tasks are executed, but no new tasks will be accepted.
833      * Invocation has no additional effect if already shut down.
834      *
835      * <p>This method does not wait for previously submitted tasks to
836      * complete execution.  Use {@link #awaitTermination awaitTermination}
837      * to do that.
838      *
839      * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
840      * has been set {@code false}, existing delayed tasks whose delays
841      * have not yet elapsed are cancelled.  And unless the {@code
842      * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
843      * {@code true}, future executions of existing periodic tasks will
844      * be cancelled.
845      */
846     // android-note: Removed "throws SecurityException" doc.
shutdown()847     public void shutdown() {
848         super.shutdown();
849     }
850 
851     /**
852      * Attempts to stop all actively executing tasks, halts the
853      * processing of waiting tasks, and returns a list of the tasks
854      * that were awaiting execution. These tasks are drained (removed)
855      * from the task queue upon return from this method.
856      *
857      * <p>This method does not wait for actively executing tasks to
858      * terminate.  Use {@link #awaitTermination awaitTermination} to
859      * do that.
860      *
861      * <p>There are no guarantees beyond best-effort attempts to stop
862      * processing actively executing tasks.  This implementation
863      * interrupts tasks via {@link Thread#interrupt}; any task that
864      * fails to respond to interrupts may never terminate.
865      *
866      * @return list of tasks that never commenced execution.
867      *         Each element of this list is a {@link ScheduledFuture}.
868      *         For tasks submitted via one of the {@code schedule}
869      *         methods, the element will be identical to the returned
870      *         {@code ScheduledFuture}.  For tasks submitted using
871      *         {@link #execute execute}, the element will be a
872      *         zero-delay {@code ScheduledFuture}.
873      */
874     // android-note: Removed "throws SecurityException" doc.
shutdownNow()875     public List<Runnable> shutdownNow() {
876         return super.shutdownNow();
877     }
878 
879     /**
880      * Returns the task queue used by this executor.  Access to the
881      * task queue is intended primarily for debugging and monitoring.
882      * This queue may be in active use.  Retrieving the task queue
883      * does not prevent queued tasks from executing.
884      *
885      * <p>Each element of this queue is a {@link ScheduledFuture}.
886      * For tasks submitted via one of the {@code schedule} methods, the
887      * element will be identical to the returned {@code ScheduledFuture}.
888      * For tasks submitted using {@link #execute execute}, the element
889      * will be a zero-delay {@code ScheduledFuture}.
890      *
891      * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
892      * tasks in the order in which they will execute.
893      *
894      * @return the task queue
895      */
getQueue()896     public BlockingQueue<Runnable> getQueue() {
897         return super.getQueue();
898     }
899 
900     /**
901      * Specialized delay queue. To mesh with TPE declarations, this
902      * class must be declared as a BlockingQueue<Runnable> even though
903      * it can only hold RunnableScheduledFutures.
904      */
905     static class DelayedWorkQueue extends AbstractQueue<Runnable>
906         implements BlockingQueue<Runnable> {
907 
908         /*
909          * A DelayedWorkQueue is based on a heap-based data structure
910          * like those in DelayQueue and PriorityQueue, except that
911          * every ScheduledFutureTask also records its index into the
912          * heap array. This eliminates the need to find a task upon
913          * cancellation, greatly speeding up removal (down from O(n)
914          * to O(log n)), and reducing garbage retention that would
915          * otherwise occur by waiting for the element to rise to top
916          * before clearing. But because the queue may also hold
917          * RunnableScheduledFutures that are not ScheduledFutureTasks,
918          * we are not guaranteed to have such indices available, in
919          * which case we fall back to linear search. (We expect that
920          * most tasks will not be decorated, and that the faster cases
921          * will be much more common.)
922          *
923          * All heap operations must record index changes -- mainly
924          * within siftUp and siftDown. Upon removal, a task's
925          * heapIndex is set to -1. Note that ScheduledFutureTasks can
926          * appear at most once in the queue (this need not be true for
927          * other kinds of tasks or work queues), so are uniquely
928          * identified by heapIndex.
929          */
930 
931         private static final int INITIAL_CAPACITY = 16;
932         private RunnableScheduledFuture<?>[] queue =
933             new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
934         private final ReentrantLock lock = new ReentrantLock();
935         private int size;
936 
937         /**
938          * Thread designated to wait for the task at the head of the
939          * queue.  This variant of the Leader-Follower pattern
940          * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
941          * minimize unnecessary timed waiting.  When a thread becomes
942          * the leader, it waits only for the next delay to elapse, but
943          * other threads await indefinitely.  The leader thread must
944          * signal some other thread before returning from take() or
945          * poll(...), unless some other thread becomes leader in the
946          * interim.  Whenever the head of the queue is replaced with a
947          * task with an earlier expiration time, the leader field is
948          * invalidated by being reset to null, and some waiting
949          * thread, but not necessarily the current leader, is
950          * signalled.  So waiting threads must be prepared to acquire
951          * and lose leadership while waiting.
952          */
953         private Thread leader;
954 
955         /**
956          * Condition signalled when a newer task becomes available at the
957          * head of the queue or a new thread may need to become leader.
958          */
959         private final Condition available = lock.newCondition();
960 
961         /**
962          * Sets f's heapIndex if it is a ScheduledFutureTask.
963          */
setIndex(RunnableScheduledFuture<?> f, int idx)964         private static void setIndex(RunnableScheduledFuture<?> f, int idx) {
965             if (f instanceof ScheduledFutureTask)
966                 ((ScheduledFutureTask)f).heapIndex = idx;
967         }
968 
969         /**
970          * Sifts element added at bottom up to its heap-ordered spot.
971          * Call only when holding lock.
972          */
siftUp(int k, RunnableScheduledFuture<?> key)973         private void siftUp(int k, RunnableScheduledFuture<?> key) {
974             while (k > 0) {
975                 int parent = (k - 1) >>> 1;
976                 RunnableScheduledFuture<?> e = queue[parent];
977                 if (key.compareTo(e) >= 0)
978                     break;
979                 queue[k] = e;
980                 setIndex(e, k);
981                 k = parent;
982             }
983             queue[k] = key;
984             setIndex(key, k);
985         }
986 
987         /**
988          * Sifts element added at top down to its heap-ordered spot.
989          * Call only when holding lock.
990          */
siftDown(int k, RunnableScheduledFuture<?> key)991         private void siftDown(int k, RunnableScheduledFuture<?> key) {
992             int half = size >>> 1;
993             while (k < half) {
994                 int child = (k << 1) + 1;
995                 RunnableScheduledFuture<?> c = queue[child];
996                 int right = child + 1;
997                 if (right < size && c.compareTo(queue[right]) > 0)
998                     c = queue[child = right];
999                 if (key.compareTo(c) <= 0)
1000                     break;
1001                 queue[k] = c;
1002                 setIndex(c, k);
1003                 k = child;
1004             }
1005             queue[k] = key;
1006             setIndex(key, k);
1007         }
1008 
1009         /**
1010          * Resizes the heap array.  Call only when holding lock.
1011          */
grow()1012         private void grow() {
1013             int oldCapacity = queue.length;
1014             int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
1015             if (newCapacity < 0) // overflow
1016                 newCapacity = Integer.MAX_VALUE;
1017             queue = Arrays.copyOf(queue, newCapacity);
1018         }
1019 
1020         /**
1021          * Finds index of given object, or -1 if absent.
1022          */
indexOf(Object x)1023         private int indexOf(Object x) {
1024             if (x != null) {
1025                 if (x instanceof ScheduledFutureTask) {
1026                     int i = ((ScheduledFutureTask) x).heapIndex;
1027                     // Sanity check; x could conceivably be a
1028                     // ScheduledFutureTask from some other pool.
1029                     if (i >= 0 && i < size && queue[i] == x)
1030                         return i;
1031                 } else {
1032                     for (int i = 0; i < size; i++)
1033                         if (x.equals(queue[i]))
1034                             return i;
1035                 }
1036             }
1037             return -1;
1038         }
1039 
contains(Object x)1040         public boolean contains(Object x) {
1041             final ReentrantLock lock = this.lock;
1042             lock.lock();
1043             try {
1044                 return indexOf(x) != -1;
1045             } finally {
1046                 lock.unlock();
1047             }
1048         }
1049 
remove(Object x)1050         public boolean remove(Object x) {
1051             final ReentrantLock lock = this.lock;
1052             lock.lock();
1053             try {
1054                 int i = indexOf(x);
1055                 if (i < 0)
1056                     return false;
1057 
1058                 setIndex(queue[i], -1);
1059                 int s = --size;
1060                 RunnableScheduledFuture<?> replacement = queue[s];
1061                 queue[s] = null;
1062                 if (s != i) {
1063                     siftDown(i, replacement);
1064                     if (queue[i] == replacement)
1065                         siftUp(i, replacement);
1066                 }
1067                 return true;
1068             } finally {
1069                 lock.unlock();
1070             }
1071         }
1072 
size()1073         public int size() {
1074             final ReentrantLock lock = this.lock;
1075             lock.lock();
1076             try {
1077                 return size;
1078             } finally {
1079                 lock.unlock();
1080             }
1081         }
1082 
isEmpty()1083         public boolean isEmpty() {
1084             return size() == 0;
1085         }
1086 
remainingCapacity()1087         public int remainingCapacity() {
1088             return Integer.MAX_VALUE;
1089         }
1090 
peek()1091         public RunnableScheduledFuture<?> peek() {
1092             final ReentrantLock lock = this.lock;
1093             lock.lock();
1094             try {
1095                 return queue[0];
1096             } finally {
1097                 lock.unlock();
1098             }
1099         }
1100 
offer(Runnable x)1101         public boolean offer(Runnable x) {
1102             if (x == null)
1103                 throw new NullPointerException();
1104             RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1105             final ReentrantLock lock = this.lock;
1106             lock.lock();
1107             try {
1108                 int i = size;
1109                 if (i >= queue.length)
1110                     grow();
1111                 size = i + 1;
1112                 if (i == 0) {
1113                     queue[0] = e;
1114                     setIndex(e, 0);
1115                 } else {
1116                     siftUp(i, e);
1117                 }
1118                 if (queue[0] == e) {
1119                     leader = null;
1120                     available.signal();
1121                 }
1122             } finally {
1123                 lock.unlock();
1124             }
1125             return true;
1126         }
1127 
put(Runnable e)1128         public void put(Runnable e) {
1129             offer(e);
1130         }
1131 
add(Runnable e)1132         public boolean add(Runnable e) {
1133             return offer(e);
1134         }
1135 
offer(Runnable e, long timeout, TimeUnit unit)1136         public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1137             return offer(e);
1138         }
1139 
1140         /**
1141          * Performs common bookkeeping for poll and take: Replaces
1142          * first element with last and sifts it down.  Call only when
1143          * holding lock.
1144          * @param f the task to remove and return
1145          */
finishPoll(RunnableScheduledFuture<?> f)1146         private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1147             int s = --size;
1148             RunnableScheduledFuture<?> x = queue[s];
1149             queue[s] = null;
1150             if (s != 0)
1151                 siftDown(0, x);
1152             setIndex(f, -1);
1153             return f;
1154         }
1155 
poll()1156         public RunnableScheduledFuture<?> poll() {
1157             final ReentrantLock lock = this.lock;
1158             lock.lock();
1159             try {
1160                 RunnableScheduledFuture<?> first = queue[0];
1161                 return (first == null || first.getDelay(NANOSECONDS) > 0)
1162                     ? null
1163                     : finishPoll(first);
1164             } finally {
1165                 lock.unlock();
1166             }
1167         }
1168 
take()1169         public RunnableScheduledFuture<?> take() throws InterruptedException {
1170             final ReentrantLock lock = this.lock;
1171             lock.lockInterruptibly();
1172             try {
1173                 for (;;) {
1174                     RunnableScheduledFuture<?> first = queue[0];
1175                     if (first == null)
1176                         available.await();
1177                     else {
1178                         long delay = first.getDelay(NANOSECONDS);
1179                         if (delay <= 0L)
1180                             return finishPoll(first);
1181                         first = null; // don't retain ref while waiting
1182                         if (leader != null)
1183                             available.await();
1184                         else {
1185                             Thread thisThread = Thread.currentThread();
1186                             leader = thisThread;
1187                             try {
1188                                 available.awaitNanos(delay);
1189                             } finally {
1190                                 if (leader == thisThread)
1191                                     leader = null;
1192                             }
1193                         }
1194                     }
1195                 }
1196             } finally {
1197                 if (leader == null && queue[0] != null)
1198                     available.signal();
1199                 lock.unlock();
1200             }
1201         }
1202 
poll(long timeout, TimeUnit unit)1203         public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1204             throws InterruptedException {
1205             long nanos = unit.toNanos(timeout);
1206             final ReentrantLock lock = this.lock;
1207             lock.lockInterruptibly();
1208             try {
1209                 for (;;) {
1210                     RunnableScheduledFuture<?> first = queue[0];
1211                     if (first == null) {
1212                         if (nanos <= 0L)
1213                             return null;
1214                         else
1215                             nanos = available.awaitNanos(nanos);
1216                     } else {
1217                         long delay = first.getDelay(NANOSECONDS);
1218                         if (delay <= 0L)
1219                             return finishPoll(first);
1220                         if (nanos <= 0L)
1221                             return null;
1222                         first = null; // don't retain ref while waiting
1223                         if (nanos < delay || leader != null)
1224                             nanos = available.awaitNanos(nanos);
1225                         else {
1226                             Thread thisThread = Thread.currentThread();
1227                             leader = thisThread;
1228                             try {
1229                                 long timeLeft = available.awaitNanos(delay);
1230                                 nanos -= delay - timeLeft;
1231                             } finally {
1232                                 if (leader == thisThread)
1233                                     leader = null;
1234                             }
1235                         }
1236                     }
1237                 }
1238             } finally {
1239                 if (leader == null && queue[0] != null)
1240                     available.signal();
1241                 lock.unlock();
1242             }
1243         }
1244 
clear()1245         public void clear() {
1246             final ReentrantLock lock = this.lock;
1247             lock.lock();
1248             try {
1249                 for (int i = 0; i < size; i++) {
1250                     RunnableScheduledFuture<?> t = queue[i];
1251                     if (t != null) {
1252                         queue[i] = null;
1253                         setIndex(t, -1);
1254                     }
1255                 }
1256                 size = 0;
1257             } finally {
1258                 lock.unlock();
1259             }
1260         }
1261 
drainTo(Collection<? super Runnable> c)1262         public int drainTo(Collection<? super Runnable> c) {
1263             return drainTo(c, Integer.MAX_VALUE);
1264         }
1265 
drainTo(Collection<? super Runnable> c, int maxElements)1266         public int drainTo(Collection<? super Runnable> c, int maxElements) {
1267             Objects.requireNonNull(c);
1268             if (c == this)
1269                 throw new IllegalArgumentException();
1270             if (maxElements <= 0)
1271                 return 0;
1272             final ReentrantLock lock = this.lock;
1273             lock.lock();
1274             try {
1275                 int n = 0;
1276                 for (RunnableScheduledFuture<?> first;
1277                      n < maxElements
1278                          && (first = queue[0]) != null
1279                          && first.getDelay(NANOSECONDS) <= 0;) {
1280                     c.add(first);   // In this order, in case add() throws.
1281                     finishPoll(first);
1282                     ++n;
1283                 }
1284                 return n;
1285             } finally {
1286                 lock.unlock();
1287             }
1288         }
1289 
toArray()1290         public Object[] toArray() {
1291             final ReentrantLock lock = this.lock;
1292             lock.lock();
1293             try {
1294                 return Arrays.copyOf(queue, size, Object[].class);
1295             } finally {
1296                 lock.unlock();
1297             }
1298         }
1299 
1300         @SuppressWarnings("unchecked")
toArray(T[] a)1301         public <T> T[] toArray(T[] a) {
1302             final ReentrantLock lock = this.lock;
1303             lock.lock();
1304             try {
1305                 if (a.length < size)
1306                     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1307                 System.arraycopy(queue, 0, a, 0, size);
1308                 if (a.length > size)
1309                     a[size] = null;
1310                 return a;
1311             } finally {
1312                 lock.unlock();
1313             }
1314         }
1315 
iterator()1316         public Iterator<Runnable> iterator() {
1317             final ReentrantLock lock = this.lock;
1318             lock.lock();
1319             try {
1320                 return new Itr(Arrays.copyOf(queue, size));
1321             } finally {
1322                 lock.unlock();
1323             }
1324         }
1325 
1326         /**
1327          * Snapshot iterator that works off copy of underlying q array.
1328          */
1329         private class Itr implements Iterator<Runnable> {
1330             final RunnableScheduledFuture<?>[] array;
1331             int cursor;        // index of next element to return; initially 0
1332             int lastRet = -1;  // index of last element returned; -1 if no such
1333 
Itr(RunnableScheduledFuture<?>[] array)1334             Itr(RunnableScheduledFuture<?>[] array) {
1335                 this.array = array;
1336             }
1337 
hasNext()1338             public boolean hasNext() {
1339                 return cursor < array.length;
1340             }
1341 
next()1342             public Runnable next() {
1343                 if (cursor >= array.length)
1344                     throw new NoSuchElementException();
1345                 return array[lastRet = cursor++];
1346             }
1347 
remove()1348             public void remove() {
1349                 if (lastRet < 0)
1350                     throw new IllegalStateException();
1351                 DelayedWorkQueue.this.remove(array[lastRet]);
1352                 lastRet = -1;
1353             }
1354         }
1355     }
1356 }
1357