1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/publicdomain/zero/1.0/
5  */
6 
7 package java.util.concurrent;
8 
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.NANOSECONDS;
11 
12 import java.util.AbstractQueue;
13 import java.util.Arrays;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.List;
17 import java.util.NoSuchElementException;
18 import java.util.concurrent.atomic.AtomicLong;
19 import java.util.concurrent.locks.Condition;
20 import java.util.concurrent.locks.ReentrantLock;
21 
22 // BEGIN android-note
23 // omit class-level docs on setRemoveOnCancelPolicy()
24 // END android-note
25 
26 /**
27  * A {@link ThreadPoolExecutor} that can additionally schedule
28  * commands to run after a given delay, or to execute periodically.
29  * This class is preferable to {@link java.util.Timer} when multiple
30  * worker threads are needed, or when the additional flexibility or
31  * capabilities of {@link ThreadPoolExecutor} (which this class
32  * extends) are required.
33  *
34  * <p>Delayed tasks execute no sooner than they are enabled, but
35  * without any real-time guarantees about when, after they are
36  * enabled, they will commence. Tasks scheduled for exactly the same
37  * execution time are enabled in first-in-first-out (FIFO) order of
38  * submission.
39  *
40  * <p>When a submitted task is cancelled before it is run, execution
41  * is suppressed.  By default, such a cancelled task is not
42  * automatically removed from the work queue until its delay elapses.
43  * While this enables further inspection and monitoring, it may also
44  * cause unbounded retention of cancelled tasks.
45  *
46  * <p>Successive executions of a periodic task scheduled via
47  * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
48  * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
49  * do not overlap. While different executions may be performed by
50  * different threads, the effects of prior executions
51  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
52  * those of subsequent ones.
53  *
54  * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
55  * of the inherited tuning methods are not useful for it. In
56  * particular, because it acts as a fixed-sized pool using
57  * {@code corePoolSize} threads and an unbounded queue, adjustments
58  * to {@code maximumPoolSize} have no useful effect. Additionally, it
59  * is almost never a good idea to set {@code corePoolSize} to zero or
60  * use {@code allowCoreThreadTimeOut} because this may leave the pool
61  * without threads to handle tasks once they become eligible to run.
62  *
63  * <p><b>Extension notes:</b> This class overrides the
64  * {@link ThreadPoolExecutor#execute(Runnable) execute} and
65  * {@link AbstractExecutorService#submit(Runnable) submit}
66  * methods to generate internal {@link ScheduledFuture} objects to
67  * control per-task delays and scheduling.  To preserve
68  * functionality, any further overrides of these methods in
69  * subclasses must invoke superclass versions, which effectively
70  * disables additional task customization.  However, this class
71  * provides alternative protected extension method
72  * {@code decorateTask} (one version each for {@code Runnable} and
73  * {@code Callable}) that can be used to customize the concrete task
74  * types used to execute commands entered via {@code execute},
75  * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
76  * and {@code scheduleWithFixedDelay}.  By default, a
77  * {@code ScheduledThreadPoolExecutor} uses a task type extending
78  * {@link FutureTask}. However, this may be modified or replaced using
79  * subclasses of the form:
80  *
81  * <pre> {@code
82  * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
83  *
84  *   static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
85  *
86  *   protected <V> RunnableScheduledFuture<V> decorateTask(
87  *                Runnable r, RunnableScheduledFuture<V> task) {
88  *       return new CustomTask<V>(r, task);
89  *   }
90  *
91  *   protected <V> RunnableScheduledFuture<V> decorateTask(
92  *                Callable<V> c, RunnableScheduledFuture<V> task) {
93  *       return new CustomTask<V>(c, task);
94  *   }
95  *   // ... add constructors, etc.
96  * }}</pre>
97  *
98  * @since 1.5
99  * @author Doug Lea
100  */
101 public class ScheduledThreadPoolExecutor
102         extends ThreadPoolExecutor
103         implements ScheduledExecutorService {
104 
105     /*
106      * This class specializes ThreadPoolExecutor implementation by
107      *
108      * 1. Using a custom task type ScheduledFutureTask, even for tasks
109      *    that don't require scheduling because they are submitted
110      *    using ExecutorService rather than ScheduledExecutorService
111      *    methods, which are treated as tasks with a delay of zero.
112      *
113      * 2. Using a custom queue (DelayedWorkQueue), a variant of
114      *    unbounded DelayQueue. The lack of capacity constraint and
115      *    the fact that corePoolSize and maximumPoolSize are
116      *    effectively identical simplifies some execution mechanics
117      *    (see delayedExecute) compared to ThreadPoolExecutor.
118      *
119      * 3. Supporting optional run-after-shutdown parameters, which
120      *    leads to overrides of shutdown methods to remove and cancel
121      *    tasks that should NOT be run after shutdown, as well as
122      *    different recheck logic when task (re)submission overlaps
123      *    with a shutdown.
124      *
125      * 4. Task decoration methods to allow interception and
126      *    instrumentation, which are needed because subclasses cannot
127      *    otherwise override submit methods to get this effect. These
128      *    don't have any impact on pool control logic though.
129      */
130 
131     /**
132      * False if should cancel/suppress periodic tasks on shutdown.
133      */
134     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
135 
136     /**
137      * False if should cancel non-periodic tasks on shutdown.
138      */
139     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
140 
141     /**
142      * True if ScheduledFutureTask.cancel should remove from queue.
143      */
144     volatile boolean removeOnCancel;
145 
146     /**
147      * Sequence number to break scheduling ties, and in turn to
148      * guarantee FIFO order among tied entries.
149      */
150     private static final AtomicLong sequencer = new AtomicLong();
151 
152     private class ScheduledFutureTask<V>
153             extends FutureTask<V> implements RunnableScheduledFuture<V> {
154 
155         /** Sequence number to break ties FIFO */
156         private final long sequenceNumber;
157 
158         /** The nanoTime-based time when the task is enabled to execute. */
159         private volatile long time;
160 
161         /**
162          * Period for repeating tasks, in nanoseconds.
163          * A positive value indicates fixed-rate execution.
164          * A negative value indicates fixed-delay execution.
165          * A value of 0 indicates a non-repeating (one-shot) task.
166          */
167         private final long period;
168 
169         /** The actual task to be re-enqueued by reExecutePeriodic */
170         RunnableScheduledFuture<V> outerTask = this;
171 
172         /**
173          * Index into delay queue, to support faster cancellation.
174          */
175         int heapIndex;
176 
177         /**
178          * Creates a one-shot action with given nanoTime-based trigger time.
179          */
ScheduledFutureTask(Runnable r, V result, long triggerTime, long sequenceNumber)180         ScheduledFutureTask(Runnable r, V result, long triggerTime,
181                             long sequenceNumber) {
182             super(r, result);
183             this.time = triggerTime;
184             this.period = 0;
185             this.sequenceNumber = sequenceNumber;
186         }
187 
188         /**
189          * Creates a periodic action with given nanoTime-based initial
190          * trigger time and period.
191          */
ScheduledFutureTask(Runnable r, V result, long triggerTime, long period, long sequenceNumber)192         ScheduledFutureTask(Runnable r, V result, long triggerTime,
193                             long period, long sequenceNumber) {
194             super(r, result);
195             this.time = triggerTime;
196             this.period = period;
197             this.sequenceNumber = sequenceNumber;
198         }
199 
200         /**
201          * Creates a one-shot action with given nanoTime-based trigger time.
202          */
ScheduledFutureTask(Callable<V> callable, long triggerTime, long sequenceNumber)203         ScheduledFutureTask(Callable<V> callable, long triggerTime,
204                             long sequenceNumber) {
205             super(callable);
206             this.time = triggerTime;
207             this.period = 0;
208             this.sequenceNumber = sequenceNumber;
209         }
210 
getDelay(TimeUnit unit)211         public long getDelay(TimeUnit unit) {
212             return unit.convert(time - System.nanoTime(), NANOSECONDS);
213         }
214 
compareTo(Delayed other)215         public int compareTo(Delayed other) {
216             if (other == this) // compare zero if same object
217                 return 0;
218             if (other instanceof ScheduledFutureTask) {
219                 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
220                 long diff = time - x.time;
221                 if (diff < 0)
222                     return -1;
223                 else if (diff > 0)
224                     return 1;
225                 else if (sequenceNumber < x.sequenceNumber)
226                     return -1;
227                 else
228                     return 1;
229             }
230             long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
231             return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
232         }
233 
234         /**
235          * Returns {@code true} if this is a periodic (not a one-shot) action.
236          *
237          * @return {@code true} if periodic
238          */
isPeriodic()239         public boolean isPeriodic() {
240             return period != 0;
241         }
242 
243         /**
244          * Sets the next time to run for a periodic task.
245          */
setNextRunTime()246         private void setNextRunTime() {
247             long p = period;
248             if (p > 0)
249                 time += p;
250             else
251                 time = triggerTime(-p);
252         }
253 
cancel(boolean mayInterruptIfRunning)254         public boolean cancel(boolean mayInterruptIfRunning) {
255             // The racy read of heapIndex below is benign:
256             // if heapIndex < 0, then OOTA guarantees that we have surely
257             // been removed; else we recheck under lock in remove()
258             boolean cancelled = super.cancel(mayInterruptIfRunning);
259             if (cancelled && removeOnCancel && heapIndex >= 0)
260                 remove(this);
261             return cancelled;
262         }
263 
264         /**
265          * Overrides FutureTask version so as to reset/requeue if periodic.
266          */
run()267         public void run() {
268             boolean periodic = isPeriodic();
269             if (!canRunInCurrentRunState(periodic))
270                 cancel(false);
271             else if (!periodic)
272                 super.run();
273             else if (super.runAndReset()) {
274                 setNextRunTime();
275                 reExecutePeriodic(outerTask);
276             }
277         }
278     }
279 
280     /**
281      * Returns true if can run a task given current run state
282      * and run-after-shutdown parameters.
283      *
284      * @param periodic true if this task periodic, false if delayed
285      */
canRunInCurrentRunState(boolean periodic)286     boolean canRunInCurrentRunState(boolean periodic) {
287         return isRunningOrShutdown(periodic ?
288                                    continueExistingPeriodicTasksAfterShutdown :
289                                    executeExistingDelayedTasksAfterShutdown);
290     }
291 
292     /**
293      * Main execution method for delayed or periodic tasks.  If pool
294      * is shut down, rejects the task. Otherwise adds task to queue
295      * and starts a thread, if necessary, to run it.  (We cannot
296      * prestart the thread to run the task because the task (probably)
297      * shouldn't be run yet.)  If the pool is shut down while the task
298      * is being added, cancel and remove it if required by state and
299      * run-after-shutdown parameters.
300      *
301      * @param task the task
302      */
delayedExecute(RunnableScheduledFuture<?> task)303     private void delayedExecute(RunnableScheduledFuture<?> task) {
304         if (isShutdown())
305             reject(task);
306         else {
307             super.getQueue().add(task);
308             if (isShutdown() &&
309                 !canRunInCurrentRunState(task.isPeriodic()) &&
310                 remove(task))
311                 task.cancel(false);
312             else
313                 ensurePrestart();
314         }
315     }
316 
317     /**
318      * Requeues a periodic task unless current run state precludes it.
319      * Same idea as delayedExecute except drops task rather than rejecting.
320      *
321      * @param task the task
322      */
reExecutePeriodic(RunnableScheduledFuture<?> task)323     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
324         if (canRunInCurrentRunState(true)) {
325             super.getQueue().add(task);
326             if (!canRunInCurrentRunState(true) && remove(task))
327                 task.cancel(false);
328             else
329                 ensurePrestart();
330         }
331     }
332 
333     /**
334      * Cancels and clears the queue of all tasks that should not be run
335      * due to shutdown policy.  Invoked within super.shutdown.
336      */
onShutdown()337     @Override void onShutdown() {
338         BlockingQueue<Runnable> q = super.getQueue();
339         boolean keepDelayed =
340             getExecuteExistingDelayedTasksAfterShutdownPolicy();
341         boolean keepPeriodic =
342             getContinueExistingPeriodicTasksAfterShutdownPolicy();
343         if (!keepDelayed && !keepPeriodic) {
344             for (Object e : q.toArray())
345                 if (e instanceof RunnableScheduledFuture<?>)
346                     ((RunnableScheduledFuture<?>) e).cancel(false);
347             q.clear();
348         }
349         else {
350             // Traverse snapshot to avoid iterator exceptions
351             for (Object e : q.toArray()) {
352                 if (e instanceof RunnableScheduledFuture) {
353                     RunnableScheduledFuture<?> t =
354                         (RunnableScheduledFuture<?>)e;
355                     if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
356                         t.isCancelled()) { // also remove if already cancelled
357                         if (q.remove(t))
358                             t.cancel(false);
359                     }
360                 }
361             }
362         }
363         tryTerminate();
364     }
365 
366     /**
367      * Modifies or replaces the task used to execute a runnable.
368      * This method can be used to override the concrete
369      * class used for managing internal tasks.
370      * The default implementation simply returns the given task.
371      *
372      * @param runnable the submitted Runnable
373      * @param task the task created to execute the runnable
374      * @param <V> the type of the task's result
375      * @return a task that can execute the runnable
376      * @since 1.6
377      */
decorateTask( Runnable runnable, RunnableScheduledFuture<V> task)378     protected <V> RunnableScheduledFuture<V> decorateTask(
379         Runnable runnable, RunnableScheduledFuture<V> task) {
380         return task;
381     }
382 
383     /**
384      * Modifies or replaces the task used to execute a callable.
385      * This method can be used to override the concrete
386      * class used for managing internal tasks.
387      * The default implementation simply returns the given task.
388      *
389      * @param callable the submitted Callable
390      * @param task the task created to execute the callable
391      * @param <V> the type of the task's result
392      * @return a task that can execute the callable
393      * @since 1.6
394      */
decorateTask( Callable<V> callable, RunnableScheduledFuture<V> task)395     protected <V> RunnableScheduledFuture<V> decorateTask(
396         Callable<V> callable, RunnableScheduledFuture<V> task) {
397         return task;
398     }
399 
400     /**
401      * The default keep-alive time for pool threads.
402      *
403      * Normally, this value is unused because all pool threads will be
404      * core threads, but if a user creates a pool with a corePoolSize
405      * of zero (against our advice), we keep a thread alive as long as
406      * there are queued tasks.  If the keep alive time is zero (the
407      * historic value), we end up hot-spinning in getTask, wasting a
408      * CPU.  But on the other hand, if we set the value too high, and
409      * users create a one-shot pool which they don't cleanly shutdown,
410      * the pool's non-daemon threads will prevent JVM termination.  A
411      * small but non-zero value (relative to a JVM's lifetime) seems
412      * best.
413      */
414     private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
415 
416     /**
417      * Creates a new {@code ScheduledThreadPoolExecutor} with the
418      * given core pool size.
419      *
420      * @param corePoolSize the number of threads to keep in the pool, even
421      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
422      * @throws IllegalArgumentException if {@code corePoolSize < 0}
423      */
ScheduledThreadPoolExecutor(int corePoolSize)424     public ScheduledThreadPoolExecutor(int corePoolSize) {
425         super(corePoolSize, Integer.MAX_VALUE,
426               DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
427               new DelayedWorkQueue());
428     }
429 
430     /**
431      * Creates a new {@code ScheduledThreadPoolExecutor} with the
432      * given initial parameters.
433      *
434      * @param corePoolSize the number of threads to keep in the pool, even
435      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
436      * @param threadFactory the factory to use when the executor
437      *        creates a new thread
438      * @throws IllegalArgumentException if {@code corePoolSize < 0}
439      * @throws NullPointerException if {@code threadFactory} is null
440      */
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)441     public ScheduledThreadPoolExecutor(int corePoolSize,
442                                        ThreadFactory threadFactory) {
443         super(corePoolSize, Integer.MAX_VALUE,
444               DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
445               new DelayedWorkQueue(), threadFactory);
446     }
447 
448     /**
449      * Creates a new {@code ScheduledThreadPoolExecutor} with the
450      * given initial parameters.
451      *
452      * @param corePoolSize the number of threads to keep in the pool, even
453      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
454      * @param handler the handler to use when execution is blocked
455      *        because the thread bounds and queue capacities are reached
456      * @throws IllegalArgumentException if {@code corePoolSize < 0}
457      * @throws NullPointerException if {@code handler} is null
458      */
ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)459     public ScheduledThreadPoolExecutor(int corePoolSize,
460                                        RejectedExecutionHandler handler) {
461         super(corePoolSize, Integer.MAX_VALUE,
462               DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
463               new DelayedWorkQueue(), handler);
464     }
465 
466     /**
467      * Creates a new {@code ScheduledThreadPoolExecutor} with the
468      * given initial parameters.
469      *
470      * @param corePoolSize the number of threads to keep in the pool, even
471      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
472      * @param threadFactory the factory to use when the executor
473      *        creates a new thread
474      * @param handler the handler to use when execution is blocked
475      *        because the thread bounds and queue capacities are reached
476      * @throws IllegalArgumentException if {@code corePoolSize < 0}
477      * @throws NullPointerException if {@code threadFactory} or
478      *         {@code handler} is null
479      */
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)480     public ScheduledThreadPoolExecutor(int corePoolSize,
481                                        ThreadFactory threadFactory,
482                                        RejectedExecutionHandler handler) {
483         super(corePoolSize, Integer.MAX_VALUE,
484               DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
485               new DelayedWorkQueue(), threadFactory, handler);
486     }
487 
488     /**
489      * Returns the nanoTime-based trigger time of a delayed action.
490      */
triggerTime(long delay, TimeUnit unit)491     private long triggerTime(long delay, TimeUnit unit) {
492         return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
493     }
494 
495     /**
496      * Returns the nanoTime-based trigger time of a delayed action.
497      */
triggerTime(long delay)498     long triggerTime(long delay) {
499         return System.nanoTime() +
500             ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
501     }
502 
503     /**
504      * Constrains the values of all delays in the queue to be within
505      * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
506      * This may occur if a task is eligible to be dequeued, but has
507      * not yet been, while some other task is added with a delay of
508      * Long.MAX_VALUE.
509      */
overflowFree(long delay)510     private long overflowFree(long delay) {
511         Delayed head = (Delayed) super.getQueue().peek();
512         if (head != null) {
513             long headDelay = head.getDelay(NANOSECONDS);
514             if (headDelay < 0 && (delay - headDelay < 0))
515                 delay = Long.MAX_VALUE + headDelay;
516         }
517         return delay;
518     }
519 
520     /**
521      * @throws RejectedExecutionException {@inheritDoc}
522      * @throws NullPointerException       {@inheritDoc}
523      */
schedule(Runnable command, long delay, TimeUnit unit)524     public ScheduledFuture<?> schedule(Runnable command,
525                                        long delay,
526                                        TimeUnit unit) {
527         if (command == null || unit == null)
528             throw new NullPointerException();
529         RunnableScheduledFuture<Void> t = decorateTask(command,
530             new ScheduledFutureTask<Void>(command, null,
531                                           triggerTime(delay, unit),
532                                           sequencer.getAndIncrement()));
533         delayedExecute(t);
534         return t;
535     }
536 
537     /**
538      * @throws RejectedExecutionException {@inheritDoc}
539      * @throws NullPointerException       {@inheritDoc}
540      */
schedule(Callable<V> callable, long delay, TimeUnit unit)541     public <V> ScheduledFuture<V> schedule(Callable<V> callable,
542                                            long delay,
543                                            TimeUnit unit) {
544         if (callable == null || unit == null)
545             throw new NullPointerException();
546         RunnableScheduledFuture<V> t = decorateTask(callable,
547             new ScheduledFutureTask<V>(callable,
548                                        triggerTime(delay, unit),
549                                        sequencer.getAndIncrement()));
550         delayedExecute(t);
551         return t;
552     }
553 
554     /**
555      * @throws RejectedExecutionException {@inheritDoc}
556      * @throws NullPointerException       {@inheritDoc}
557      * @throws IllegalArgumentException   {@inheritDoc}
558      */
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)559     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
560                                                   long initialDelay,
561                                                   long period,
562                                                   TimeUnit unit) {
563         if (command == null || unit == null)
564             throw new NullPointerException();
565         if (period <= 0L)
566             throw new IllegalArgumentException();
567         ScheduledFutureTask<Void> sft =
568             new ScheduledFutureTask<Void>(command,
569                                           null,
570                                           triggerTime(initialDelay, unit),
571                                           unit.toNanos(period),
572                                           sequencer.getAndIncrement());
573         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
574         sft.outerTask = t;
575         delayedExecute(t);
576         return t;
577     }
578 
579     /**
580      * @throws RejectedExecutionException {@inheritDoc}
581      * @throws NullPointerException       {@inheritDoc}
582      * @throws IllegalArgumentException   {@inheritDoc}
583      */
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)584     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
585                                                      long initialDelay,
586                                                      long delay,
587                                                      TimeUnit unit) {
588         if (command == null || unit == null)
589             throw new NullPointerException();
590         if (delay <= 0L)
591             throw new IllegalArgumentException();
592         ScheduledFutureTask<Void> sft =
593             new ScheduledFutureTask<Void>(command,
594                                           null,
595                                           triggerTime(initialDelay, unit),
596                                           -unit.toNanos(delay),
597                                           sequencer.getAndIncrement());
598         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
599         sft.outerTask = t;
600         delayedExecute(t);
601         return t;
602     }
603 
604     /**
605      * Executes {@code command} with zero required delay.
606      * This has effect equivalent to
607      * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
608      * Note that inspections of the queue and of the list returned by
609      * {@code shutdownNow} will access the zero-delayed
610      * {@link ScheduledFuture}, not the {@code command} itself.
611      *
612      * <p>A consequence of the use of {@code ScheduledFuture} objects is
613      * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
614      * called with a null second {@code Throwable} argument, even if the
615      * {@code command} terminated abruptly.  Instead, the {@code Throwable}
616      * thrown by such a task can be obtained via {@link Future#get}.
617      *
618      * @throws RejectedExecutionException at discretion of
619      *         {@code RejectedExecutionHandler}, if the task
620      *         cannot be accepted for execution because the
621      *         executor has been shut down
622      * @throws NullPointerException {@inheritDoc}
623      */
execute(Runnable command)624     public void execute(Runnable command) {
625         schedule(command, 0, NANOSECONDS);
626     }
627 
628     // Override AbstractExecutorService methods
629 
630     /**
631      * @throws RejectedExecutionException {@inheritDoc}
632      * @throws NullPointerException       {@inheritDoc}
633      */
submit(Runnable task)634     public Future<?> submit(Runnable task) {
635         return schedule(task, 0, NANOSECONDS);
636     }
637 
638     /**
639      * @throws RejectedExecutionException {@inheritDoc}
640      * @throws NullPointerException       {@inheritDoc}
641      */
submit(Runnable task, T result)642     public <T> Future<T> submit(Runnable task, T result) {
643         return schedule(Executors.callable(task, result), 0, NANOSECONDS);
644     }
645 
646     /**
647      * @throws RejectedExecutionException {@inheritDoc}
648      * @throws NullPointerException       {@inheritDoc}
649      */
submit(Callable<T> task)650     public <T> Future<T> submit(Callable<T> task) {
651         return schedule(task, 0, NANOSECONDS);
652     }
653 
654     /**
655      * Sets the policy on whether to continue executing existing
656      * periodic tasks even when this executor has been {@code shutdown}.
657      * In this case, these tasks will only terminate upon
658      * {@code shutdownNow} or after setting the policy to
659      * {@code false} when already shutdown.
660      * This value is by default {@code false}.
661      *
662      * @param value if {@code true}, continue after shutdown, else don't
663      * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
664      */
setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)665     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
666         continueExistingPeriodicTasksAfterShutdown = value;
667         if (!value && isShutdown())
668             onShutdown();
669     }
670 
671     /**
672      * Gets the policy on whether to continue executing existing
673      * periodic tasks even when this executor has been {@code shutdown}.
674      * In this case, these tasks will only terminate upon
675      * {@code shutdownNow} or after setting the policy to
676      * {@code false} when already shutdown.
677      * This value is by default {@code false}.
678      *
679      * @return {@code true} if will continue after shutdown
680      * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
681      */
getContinueExistingPeriodicTasksAfterShutdownPolicy()682     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
683         return continueExistingPeriodicTasksAfterShutdown;
684     }
685 
686     /**
687      * Sets the policy on whether to execute existing delayed
688      * tasks even when this executor has been {@code shutdown}.
689      * In this case, these tasks will only terminate upon
690      * {@code shutdownNow}, or after setting the policy to
691      * {@code false} when already shutdown.
692      * This value is by default {@code true}.
693      *
694      * @param value if {@code true}, execute after shutdown, else don't
695      * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
696      */
setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value)697     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
698         executeExistingDelayedTasksAfterShutdown = value;
699         if (!value && isShutdown())
700             onShutdown();
701     }
702 
703     /**
704      * Gets the policy on whether to execute existing delayed
705      * tasks even when this executor has been {@code shutdown}.
706      * In this case, these tasks will only terminate upon
707      * {@code shutdownNow}, or after setting the policy to
708      * {@code false} when already shutdown.
709      * This value is by default {@code true}.
710      *
711      * @return {@code true} if will execute after shutdown
712      * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
713      */
getExecuteExistingDelayedTasksAfterShutdownPolicy()714     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
715         return executeExistingDelayedTasksAfterShutdown;
716     }
717 
718     /**
719      * Sets the policy on whether cancelled tasks should be immediately
720      * removed from the work queue at time of cancellation.  This value is
721      * by default {@code false}.
722      *
723      * @param value if {@code true}, remove on cancellation, else don't
724      * @see #getRemoveOnCancelPolicy
725      * @since 1.7
726      */
setRemoveOnCancelPolicy(boolean value)727     public void setRemoveOnCancelPolicy(boolean value) {
728         removeOnCancel = value;
729     }
730 
731     /**
732      * Gets the policy on whether cancelled tasks should be immediately
733      * removed from the work queue at time of cancellation.  This value is
734      * by default {@code false}.
735      *
736      * @return {@code true} if cancelled tasks are immediately removed
737      *         from the queue
738      * @see #setRemoveOnCancelPolicy
739      * @since 1.7
740      */
getRemoveOnCancelPolicy()741     public boolean getRemoveOnCancelPolicy() {
742         return removeOnCancel;
743     }
744 
745     /**
746      * Initiates an orderly shutdown in which previously submitted
747      * tasks are executed, but no new tasks will be accepted.
748      * Invocation has no additional effect if already shut down.
749      *
750      * <p>This method does not wait for previously submitted tasks to
751      * complete execution.  Use {@link #awaitTermination awaitTermination}
752      * to do that.
753      *
754      * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
755      * has been set {@code false}, existing delayed tasks whose delays
756      * have not yet elapsed are cancelled.  And unless the {@code
757      * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
758      * {@code true}, future executions of existing periodic tasks will
759      * be cancelled.
760      */
761     // android-note: Removed "throws SecurityException" doc.
shutdown()762     public void shutdown() {
763         super.shutdown();
764     }
765 
766     /**
767      * Attempts to stop all actively executing tasks, halts the
768      * processing of waiting tasks, and returns a list of the tasks
769      * that were awaiting execution. These tasks are drained (removed)
770      * from the task queue upon return from this method.
771      *
772      * <p>This method does not wait for actively executing tasks to
773      * terminate.  Use {@link #awaitTermination awaitTermination} to
774      * do that.
775      *
776      * <p>There are no guarantees beyond best-effort attempts to stop
777      * processing actively executing tasks.  This implementation
778      * interrupts tasks via {@link Thread#interrupt}; any task that
779      * fails to respond to interrupts may never terminate.
780      *
781      * @return list of tasks that never commenced execution.
782      *         Each element of this list is a {@link ScheduledFuture}.
783      *         For tasks submitted via one of the {@code schedule}
784      *         methods, the element will be identical to the returned
785      *         {@code ScheduledFuture}.  For tasks submitted using
786      *         {@link #execute execute}, the element will be a
787      *         zero-delay {@code ScheduledFuture}.
788      */
789     // android-note: Removed "throws SecurityException" doc.
shutdownNow()790     public List<Runnable> shutdownNow() {
791         return super.shutdownNow();
792     }
793 
794     /**
795      * Returns the task queue used by this executor.  Access to the
796      * task queue is intended primarily for debugging and monitoring.
797      * This queue may be in active use.  Retrieving the task queue
798      * does not prevent queued tasks from executing.
799      *
800      * <p>Each element of this queue is a {@link ScheduledFuture}.
801      * For tasks submitted via one of the {@code schedule} methods, the
802      * element will be identical to the returned {@code ScheduledFuture}.
803      * For tasks submitted using {@link #execute execute}, the element
804      * will be a zero-delay {@code ScheduledFuture}.
805      *
806      * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
807      * tasks in the order in which they will execute.
808      *
809      * @return the task queue
810      */
getQueue()811     public BlockingQueue<Runnable> getQueue() {
812         return super.getQueue();
813     }
814 
815     /**
816      * Specialized delay queue. To mesh with TPE declarations, this
817      * class must be declared as a BlockingQueue<Runnable> even though
818      * it can only hold RunnableScheduledFutures.
819      */
820     static class DelayedWorkQueue extends AbstractQueue<Runnable>
821         implements BlockingQueue<Runnable> {
822 
823         /*
824          * A DelayedWorkQueue is based on a heap-based data structure
825          * like those in DelayQueue and PriorityQueue, except that
826          * every ScheduledFutureTask also records its index into the
827          * heap array. This eliminates the need to find a task upon
828          * cancellation, greatly speeding up removal (down from O(n)
829          * to O(log n)), and reducing garbage retention that would
830          * otherwise occur by waiting for the element to rise to top
831          * before clearing. But because the queue may also hold
832          * RunnableScheduledFutures that are not ScheduledFutureTasks,
833          * we are not guaranteed to have such indices available, in
834          * which case we fall back to linear search. (We expect that
835          * most tasks will not be decorated, and that the faster cases
836          * will be much more common.)
837          *
838          * All heap operations must record index changes -- mainly
839          * within siftUp and siftDown. Upon removal, a task's
840          * heapIndex is set to -1. Note that ScheduledFutureTasks can
841          * appear at most once in the queue (this need not be true for
842          * other kinds of tasks or work queues), so are uniquely
843          * identified by heapIndex.
844          */
845 
846         private static final int INITIAL_CAPACITY = 16;
847         private RunnableScheduledFuture<?>[] queue =
848             new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
849         private final ReentrantLock lock = new ReentrantLock();
850         private int size;
851 
852         /**
853          * Thread designated to wait for the task at the head of the
854          * queue.  This variant of the Leader-Follower pattern
855          * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
856          * minimize unnecessary timed waiting.  When a thread becomes
857          * the leader, it waits only for the next delay to elapse, but
858          * other threads await indefinitely.  The leader thread must
859          * signal some other thread before returning from take() or
860          * poll(...), unless some other thread becomes leader in the
861          * interim.  Whenever the head of the queue is replaced with a
862          * task with an earlier expiration time, the leader field is
863          * invalidated by being reset to null, and some waiting
864          * thread, but not necessarily the current leader, is
865          * signalled.  So waiting threads must be prepared to acquire
866          * and lose leadership while waiting.
867          */
868         private Thread leader;
869 
870         /**
871          * Condition signalled when a newer task becomes available at the
872          * head of the queue or a new thread may need to become leader.
873          */
874         private final Condition available = lock.newCondition();
875 
876         /**
877          * Sets f's heapIndex if it is a ScheduledFutureTask.
878          */
setIndex(RunnableScheduledFuture<?> f, int idx)879         private void setIndex(RunnableScheduledFuture<?> f, int idx) {
880             if (f instanceof ScheduledFutureTask)
881                 ((ScheduledFutureTask)f).heapIndex = idx;
882         }
883 
884         /**
885          * Sifts element added at bottom up to its heap-ordered spot.
886          * Call only when holding lock.
887          */
siftUp(int k, RunnableScheduledFuture<?> key)888         private void siftUp(int k, RunnableScheduledFuture<?> key) {
889             while (k > 0) {
890                 int parent = (k - 1) >>> 1;
891                 RunnableScheduledFuture<?> e = queue[parent];
892                 if (key.compareTo(e) >= 0)
893                     break;
894                 queue[k] = e;
895                 setIndex(e, k);
896                 k = parent;
897             }
898             queue[k] = key;
899             setIndex(key, k);
900         }
901 
902         /**
903          * Sifts element added at top down to its heap-ordered spot.
904          * Call only when holding lock.
905          */
siftDown(int k, RunnableScheduledFuture<?> key)906         private void siftDown(int k, RunnableScheduledFuture<?> key) {
907             int half = size >>> 1;
908             while (k < half) {
909                 int child = (k << 1) + 1;
910                 RunnableScheduledFuture<?> c = queue[child];
911                 int right = child + 1;
912                 if (right < size && c.compareTo(queue[right]) > 0)
913                     c = queue[child = right];
914                 if (key.compareTo(c) <= 0)
915                     break;
916                 queue[k] = c;
917                 setIndex(c, k);
918                 k = child;
919             }
920             queue[k] = key;
921             setIndex(key, k);
922         }
923 
924         /**
925          * Resizes the heap array.  Call only when holding lock.
926          */
grow()927         private void grow() {
928             int oldCapacity = queue.length;
929             int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
930             if (newCapacity < 0) // overflow
931                 newCapacity = Integer.MAX_VALUE;
932             queue = Arrays.copyOf(queue, newCapacity);
933         }
934 
935         /**
936          * Finds index of given object, or -1 if absent.
937          */
indexOf(Object x)938         private int indexOf(Object x) {
939             if (x != null) {
940                 if (x instanceof ScheduledFutureTask) {
941                     int i = ((ScheduledFutureTask) x).heapIndex;
942                     // Sanity check; x could conceivably be a
943                     // ScheduledFutureTask from some other pool.
944                     if (i >= 0 && i < size && queue[i] == x)
945                         return i;
946                 } else {
947                     for (int i = 0; i < size; i++)
948                         if (x.equals(queue[i]))
949                             return i;
950                 }
951             }
952             return -1;
953         }
954 
contains(Object x)955         public boolean contains(Object x) {
956             final ReentrantLock lock = this.lock;
957             lock.lock();
958             try {
959                 return indexOf(x) != -1;
960             } finally {
961                 lock.unlock();
962             }
963         }
964 
remove(Object x)965         public boolean remove(Object x) {
966             final ReentrantLock lock = this.lock;
967             lock.lock();
968             try {
969                 int i = indexOf(x);
970                 if (i < 0)
971                     return false;
972 
973                 setIndex(queue[i], -1);
974                 int s = --size;
975                 RunnableScheduledFuture<?> replacement = queue[s];
976                 queue[s] = null;
977                 if (s != i) {
978                     siftDown(i, replacement);
979                     if (queue[i] == replacement)
980                         siftUp(i, replacement);
981                 }
982                 return true;
983             } finally {
984                 lock.unlock();
985             }
986         }
987 
size()988         public int size() {
989             final ReentrantLock lock = this.lock;
990             lock.lock();
991             try {
992                 return size;
993             } finally {
994                 lock.unlock();
995             }
996         }
997 
isEmpty()998         public boolean isEmpty() {
999             return size() == 0;
1000         }
1001 
remainingCapacity()1002         public int remainingCapacity() {
1003             return Integer.MAX_VALUE;
1004         }
1005 
peek()1006         public RunnableScheduledFuture<?> peek() {
1007             final ReentrantLock lock = this.lock;
1008             lock.lock();
1009             try {
1010                 return queue[0];
1011             } finally {
1012                 lock.unlock();
1013             }
1014         }
1015 
offer(Runnable x)1016         public boolean offer(Runnable x) {
1017             if (x == null)
1018                 throw new NullPointerException();
1019             RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1020             final ReentrantLock lock = this.lock;
1021             lock.lock();
1022             try {
1023                 int i = size;
1024                 if (i >= queue.length)
1025                     grow();
1026                 size = i + 1;
1027                 if (i == 0) {
1028                     queue[0] = e;
1029                     setIndex(e, 0);
1030                 } else {
1031                     siftUp(i, e);
1032                 }
1033                 if (queue[0] == e) {
1034                     leader = null;
1035                     available.signal();
1036                 }
1037             } finally {
1038                 lock.unlock();
1039             }
1040             return true;
1041         }
1042 
put(Runnable e)1043         public void put(Runnable e) {
1044             offer(e);
1045         }
1046 
add(Runnable e)1047         public boolean add(Runnable e) {
1048             return offer(e);
1049         }
1050 
offer(Runnable e, long timeout, TimeUnit unit)1051         public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1052             return offer(e);
1053         }
1054 
1055         /**
1056          * Performs common bookkeeping for poll and take: Replaces
1057          * first element with last and sifts it down.  Call only when
1058          * holding lock.
1059          * @param f the task to remove and return
1060          */
finishPoll(RunnableScheduledFuture<?> f)1061         private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1062             int s = --size;
1063             RunnableScheduledFuture<?> x = queue[s];
1064             queue[s] = null;
1065             if (s != 0)
1066                 siftDown(0, x);
1067             setIndex(f, -1);
1068             return f;
1069         }
1070 
poll()1071         public RunnableScheduledFuture<?> poll() {
1072             final ReentrantLock lock = this.lock;
1073             lock.lock();
1074             try {
1075                 RunnableScheduledFuture<?> first = queue[0];
1076                 return (first == null || first.getDelay(NANOSECONDS) > 0)
1077                     ? null
1078                     : finishPoll(first);
1079             } finally {
1080                 lock.unlock();
1081             }
1082         }
1083 
take()1084         public RunnableScheduledFuture<?> take() throws InterruptedException {
1085             final ReentrantLock lock = this.lock;
1086             lock.lockInterruptibly();
1087             try {
1088                 for (;;) {
1089                     RunnableScheduledFuture<?> first = queue[0];
1090                     if (first == null)
1091                         available.await();
1092                     else {
1093                         long delay = first.getDelay(NANOSECONDS);
1094                         if (delay <= 0L)
1095                             return finishPoll(first);
1096                         first = null; // don't retain ref while waiting
1097                         if (leader != null)
1098                             available.await();
1099                         else {
1100                             Thread thisThread = Thread.currentThread();
1101                             leader = thisThread;
1102                             try {
1103                                 available.awaitNanos(delay);
1104                             } finally {
1105                                 if (leader == thisThread)
1106                                     leader = null;
1107                             }
1108                         }
1109                     }
1110                 }
1111             } finally {
1112                 if (leader == null && queue[0] != null)
1113                     available.signal();
1114                 lock.unlock();
1115             }
1116         }
1117 
poll(long timeout, TimeUnit unit)1118         public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1119             throws InterruptedException {
1120             long nanos = unit.toNanos(timeout);
1121             final ReentrantLock lock = this.lock;
1122             lock.lockInterruptibly();
1123             try {
1124                 for (;;) {
1125                     RunnableScheduledFuture<?> first = queue[0];
1126                     if (first == null) {
1127                         if (nanos <= 0L)
1128                             return null;
1129                         else
1130                             nanos = available.awaitNanos(nanos);
1131                     } else {
1132                         long delay = first.getDelay(NANOSECONDS);
1133                         if (delay <= 0L)
1134                             return finishPoll(first);
1135                         if (nanos <= 0L)
1136                             return null;
1137                         first = null; // don't retain ref while waiting
1138                         if (nanos < delay || leader != null)
1139                             nanos = available.awaitNanos(nanos);
1140                         else {
1141                             Thread thisThread = Thread.currentThread();
1142                             leader = thisThread;
1143                             try {
1144                                 long timeLeft = available.awaitNanos(delay);
1145                                 nanos -= delay - timeLeft;
1146                             } finally {
1147                                 if (leader == thisThread)
1148                                     leader = null;
1149                             }
1150                         }
1151                     }
1152                 }
1153             } finally {
1154                 if (leader == null && queue[0] != null)
1155                     available.signal();
1156                 lock.unlock();
1157             }
1158         }
1159 
clear()1160         public void clear() {
1161             final ReentrantLock lock = this.lock;
1162             lock.lock();
1163             try {
1164                 for (int i = 0; i < size; i++) {
1165                     RunnableScheduledFuture<?> t = queue[i];
1166                     if (t != null) {
1167                         queue[i] = null;
1168                         setIndex(t, -1);
1169                     }
1170                 }
1171                 size = 0;
1172             } finally {
1173                 lock.unlock();
1174             }
1175         }
1176 
1177         /**
1178          * Returns first element only if it is expired.
1179          * Used only by drainTo.  Call only when holding lock.
1180          */
peekExpired()1181         private RunnableScheduledFuture<?> peekExpired() {
1182             // assert lock.isHeldByCurrentThread();
1183             RunnableScheduledFuture<?> first = queue[0];
1184             return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1185                 null : first;
1186         }
1187 
drainTo(Collection<? super Runnable> c)1188         public int drainTo(Collection<? super Runnable> c) {
1189             if (c == null)
1190                 throw new NullPointerException();
1191             if (c == this)
1192                 throw new IllegalArgumentException();
1193             final ReentrantLock lock = this.lock;
1194             lock.lock();
1195             try {
1196                 RunnableScheduledFuture<?> first;
1197                 int n = 0;
1198                 while ((first = peekExpired()) != null) {
1199                     c.add(first);   // In this order, in case add() throws.
1200                     finishPoll(first);
1201                     ++n;
1202                 }
1203                 return n;
1204             } finally {
1205                 lock.unlock();
1206             }
1207         }
1208 
drainTo(Collection<? super Runnable> c, int maxElements)1209         public int drainTo(Collection<? super Runnable> c, int maxElements) {
1210             if (c == null)
1211                 throw new NullPointerException();
1212             if (c == this)
1213                 throw new IllegalArgumentException();
1214             if (maxElements <= 0)
1215                 return 0;
1216             final ReentrantLock lock = this.lock;
1217             lock.lock();
1218             try {
1219                 RunnableScheduledFuture<?> first;
1220                 int n = 0;
1221                 while (n < maxElements && (first = peekExpired()) != null) {
1222                     c.add(first);   // In this order, in case add() throws.
1223                     finishPoll(first);
1224                     ++n;
1225                 }
1226                 return n;
1227             } finally {
1228                 lock.unlock();
1229             }
1230         }
1231 
toArray()1232         public Object[] toArray() {
1233             final ReentrantLock lock = this.lock;
1234             lock.lock();
1235             try {
1236                 return Arrays.copyOf(queue, size, Object[].class);
1237             } finally {
1238                 lock.unlock();
1239             }
1240         }
1241 
1242         @SuppressWarnings("unchecked")
toArray(T[] a)1243         public <T> T[] toArray(T[] a) {
1244             final ReentrantLock lock = this.lock;
1245             lock.lock();
1246             try {
1247                 if (a.length < size)
1248                     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1249                 System.arraycopy(queue, 0, a, 0, size);
1250                 if (a.length > size)
1251                     a[size] = null;
1252                 return a;
1253             } finally {
1254                 lock.unlock();
1255             }
1256         }
1257 
iterator()1258         public Iterator<Runnable> iterator() {
1259             return new Itr(Arrays.copyOf(queue, size));
1260         }
1261 
1262         /**
1263          * Snapshot iterator that works off copy of underlying q array.
1264          */
1265         private class Itr implements Iterator<Runnable> {
1266             final RunnableScheduledFuture<?>[] array;
1267             int cursor;        // index of next element to return; initially 0
1268             int lastRet = -1;  // index of last element returned; -1 if no such
1269 
Itr(RunnableScheduledFuture<?>[] array)1270             Itr(RunnableScheduledFuture<?>[] array) {
1271                 this.array = array;
1272             }
1273 
hasNext()1274             public boolean hasNext() {
1275                 return cursor < array.length;
1276             }
1277 
next()1278             public Runnable next() {
1279                 if (cursor >= array.length)
1280                     throw new NoSuchElementException();
1281                 lastRet = cursor;
1282                 return array[cursor++];
1283             }
1284 
remove()1285             public void remove() {
1286                 if (lastRet < 0)
1287                     throw new IllegalStateException();
1288                 DelayedWorkQueue.this.remove(array[lastRet]);
1289                 lastRet = -1;
1290             }
1291         }
1292     }
1293 }
1294