1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/publicdomain/zero/1.0/
5  */
6 
7 package java.util.concurrent;
8 import static java.util.concurrent.TimeUnit.NANOSECONDS;
9 import java.util.concurrent.atomic.AtomicLong;
10 import java.util.concurrent.locks.Condition;
11 import java.util.concurrent.locks.ReentrantLock;
12 import java.util.*;
13 
14 // BEGIN android-note
15 // omit class-level docs on setRemoveOnCancelPolicy()
16 // END android-note
17 
18 /**
19  * A {@link ThreadPoolExecutor} that can additionally schedule
20  * commands to run after a given delay, or to execute
21  * periodically. This class is preferable to {@link java.util.Timer}
22  * when multiple worker threads are needed, or when the additional
23  * flexibility or capabilities of {@link ThreadPoolExecutor} (which
24  * this class extends) are required.
25  *
26  * <p>Delayed tasks execute no sooner than they are enabled, but
27  * without any real-time guarantees about when, after they are
28  * enabled, they will commence. Tasks scheduled for exactly the same
29  * execution time are enabled in first-in-first-out (FIFO) order of
30  * submission.
31  *
32  * <p>When a submitted task is cancelled before it is run, execution
33  * is suppressed. By default, such a cancelled task is not
34  * automatically removed from the work queue until its delay
35  * elapses. While this enables further inspection and monitoring, it
36  * may also cause unbounded retention of cancelled tasks.
37  *
38  * <p>Successive executions of a task scheduled via
39  * {@code scheduleAtFixedRate} or
40  * {@code scheduleWithFixedDelay} do not overlap. While different
41  * executions may be performed by different threads, the effects of
42  * prior executions <a
43  * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
44  * those of subsequent ones.
45  *
46  * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
47  * of the inherited tuning methods are not useful for it. In
48  * particular, because it acts as a fixed-sized pool using
49  * {@code corePoolSize} threads and an unbounded queue, adjustments
50  * to {@code maximumPoolSize} have no useful effect. Additionally, it
51  * is almost never a good idea to set {@code corePoolSize} to zero or
52  * use {@code allowCoreThreadTimeOut} because this may leave the pool
53  * without threads to handle tasks once they become eligible to run.
54  *
55  * <p><b>Extension notes:</b> This class overrides the
56  * {@link ThreadPoolExecutor#execute(Runnable) execute} and
57  * {@link AbstractExecutorService#submit(Runnable) submit}
58  * methods to generate internal {@link ScheduledFuture} objects to
59  * control per-task delays and scheduling.  To preserve
60  * functionality, any further overrides of these methods in
61  * subclasses must invoke superclass versions, which effectively
62  * disables additional task customization.  However, this class
63  * provides alternative protected extension method
64  * {@code decorateTask} (one version each for {@code Runnable} and
65  * {@code Callable}) that can be used to customize the concrete task
66  * types used to execute commands entered via {@code execute},
67  * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
68  * and {@code scheduleWithFixedDelay}.  By default, a
69  * {@code ScheduledThreadPoolExecutor} uses a task type extending
70  * {@link FutureTask}. However, this may be modified or replaced using
71  * subclasses of the form:
72  *
73  *  <pre> {@code
74  * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
75  *
76  *   static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
77  *
78  *   protected <V> RunnableScheduledFuture<V> decorateTask(
79  *                Runnable r, RunnableScheduledFuture<V> task) {
80  *       return new CustomTask<V>(r, task);
81  *   }
82  *
83  *   protected <V> RunnableScheduledFuture<V> decorateTask(
84  *                Callable<V> c, RunnableScheduledFuture<V> task) {
85  *       return new CustomTask<V>(c, task);
86  *   }
87  *   // ... add constructors, etc.
88  * }}</pre>
89  *
90  * @since 1.5
91  * @author Doug Lea
92  */
93 public class ScheduledThreadPoolExecutor
94         extends ThreadPoolExecutor
95         implements ScheduledExecutorService {
96 
97     /*
98      * This class specializes ThreadPoolExecutor implementation by
99      *
100      * 1. Using a custom task type, ScheduledFutureTask for
101      *    tasks, even those that don't require scheduling (i.e.,
102      *    those submitted using ExecutorService execute, not
103      *    ScheduledExecutorService methods) which are treated as
104      *    delayed tasks with a delay of zero.
105      *
106      * 2. Using a custom queue (DelayedWorkQueue), a variant of
107      *    unbounded DelayQueue. The lack of capacity constraint and
108      *    the fact that corePoolSize and maximumPoolSize are
109      *    effectively identical simplifies some execution mechanics
110      *    (see delayedExecute) compared to ThreadPoolExecutor.
111      *
112      * 3. Supporting optional run-after-shutdown parameters, which
113      *    leads to overrides of shutdown methods to remove and cancel
114      *    tasks that should NOT be run after shutdown, as well as
115      *    different recheck logic when task (re)submission overlaps
116      *    with a shutdown.
117      *
118      * 4. Task decoration methods to allow interception and
119      *    instrumentation, which are needed because subclasses cannot
120      *    otherwise override submit methods to get this effect. These
121      *    don't have any impact on pool control logic though.
122      */
123 
124     /**
125      * False if should cancel/suppress periodic tasks on shutdown.
126      */
127     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
128 
129     /**
130      * False if should cancel non-periodic tasks on shutdown.
131      */
132     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
133 
134     /**
135      * True if ScheduledFutureTask.cancel should remove from queue
136      */
137     private volatile boolean removeOnCancel = false;
138 
139     /**
140      * Sequence number to break scheduling ties, and in turn to
141      * guarantee FIFO order among tied entries.
142      */
143     private static final AtomicLong sequencer = new AtomicLong();
144 
145     /**
146      * Returns current nanosecond time.
147      */
now()148     final long now() {
149         return System.nanoTime();
150     }
151 
152     private class ScheduledFutureTask<V>
153             extends FutureTask<V> implements RunnableScheduledFuture<V> {
154 
155         /** Sequence number to break ties FIFO */
156         private final long sequenceNumber;
157 
158         /** The time the task is enabled to execute in nanoTime units */
159         private long time;
160 
161         /**
162          * Period in nanoseconds for repeating tasks.  A positive
163          * value indicates fixed-rate execution.  A negative value
164          * indicates fixed-delay execution.  A value of 0 indicates a
165          * non-repeating task.
166          */
167         private final long period;
168 
169         /** The actual task to be re-enqueued by reExecutePeriodic */
170         RunnableScheduledFuture<V> outerTask = this;
171 
172         /**
173          * Index into delay queue, to support faster cancellation.
174          */
175         int heapIndex;
176 
177         /**
178          * Creates a one-shot action with given nanoTime-based trigger time.
179          */
ScheduledFutureTask(Runnable r, V result, long ns)180         ScheduledFutureTask(Runnable r, V result, long ns) {
181             super(r, result);
182             this.time = ns;
183             this.period = 0;
184             this.sequenceNumber = sequencer.getAndIncrement();
185         }
186 
187         /**
188          * Creates a periodic action with given nano time and period.
189          */
ScheduledFutureTask(Runnable r, V result, long ns, long period)190         ScheduledFutureTask(Runnable r, V result, long ns, long period) {
191             super(r, result);
192             this.time = ns;
193             this.period = period;
194             this.sequenceNumber = sequencer.getAndIncrement();
195         }
196 
197         /**
198          * Creates a one-shot action with given nanoTime-based trigger time.
199          */
ScheduledFutureTask(Callable<V> callable, long ns)200         ScheduledFutureTask(Callable<V> callable, long ns) {
201             super(callable);
202             this.time = ns;
203             this.period = 0;
204             this.sequenceNumber = sequencer.getAndIncrement();
205         }
206 
getDelay(TimeUnit unit)207         public long getDelay(TimeUnit unit) {
208             return unit.convert(time - now(), NANOSECONDS);
209         }
210 
compareTo(Delayed other)211         public int compareTo(Delayed other) {
212             if (other == this) // compare zero if same object
213                 return 0;
214             if (other instanceof ScheduledFutureTask) {
215                 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
216                 long diff = time - x.time;
217                 if (diff < 0)
218                     return -1;
219                 else if (diff > 0)
220                     return 1;
221                 else if (sequenceNumber < x.sequenceNumber)
222                     return -1;
223                 else
224                     return 1;
225             }
226             long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
227             return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
228         }
229 
230         /**
231          * Returns {@code true} if this is a periodic (not a one-shot) action.
232          *
233          * @return {@code true} if periodic
234          */
isPeriodic()235         public boolean isPeriodic() {
236             return period != 0;
237         }
238 
239         /**
240          * Sets the next time to run for a periodic task.
241          */
setNextRunTime()242         private void setNextRunTime() {
243             long p = period;
244             if (p > 0)
245                 time += p;
246             else
247                 time = triggerTime(-p);
248         }
249 
cancel(boolean mayInterruptIfRunning)250         public boolean cancel(boolean mayInterruptIfRunning) {
251             boolean cancelled = super.cancel(mayInterruptIfRunning);
252             if (cancelled && removeOnCancel && heapIndex >= 0)
253                 remove(this);
254             return cancelled;
255         }
256 
257         /**
258          * Overrides FutureTask version so as to reset/requeue if periodic.
259          */
run()260         public void run() {
261             boolean periodic = isPeriodic();
262             if (!canRunInCurrentRunState(periodic))
263                 cancel(false);
264             else if (!periodic)
265                 ScheduledFutureTask.super.run();
266             else if (ScheduledFutureTask.super.runAndReset()) {
267                 setNextRunTime();
268                 reExecutePeriodic(outerTask);
269             }
270         }
271     }
272 
273     /**
274      * Returns true if can run a task given current run state
275      * and run-after-shutdown parameters.
276      *
277      * @param periodic true if this task periodic, false if delayed
278      */
canRunInCurrentRunState(boolean periodic)279     boolean canRunInCurrentRunState(boolean periodic) {
280         return isRunningOrShutdown(periodic ?
281                                    continueExistingPeriodicTasksAfterShutdown :
282                                    executeExistingDelayedTasksAfterShutdown);
283     }
284 
285     /**
286      * Main execution method for delayed or periodic tasks.  If pool
287      * is shut down, rejects the task. Otherwise adds task to queue
288      * and starts a thread, if necessary, to run it.  (We cannot
289      * prestart the thread to run the task because the task (probably)
290      * shouldn't be run yet.)  If the pool is shut down while the task
291      * is being added, cancel and remove it if required by state and
292      * run-after-shutdown parameters.
293      *
294      * @param task the task
295      */
delayedExecute(RunnableScheduledFuture<?> task)296     private void delayedExecute(RunnableScheduledFuture<?> task) {
297         if (isShutdown())
298             reject(task);
299         else {
300             super.getQueue().add(task);
301             if (isShutdown() &&
302                 !canRunInCurrentRunState(task.isPeriodic()) &&
303                 remove(task))
304                 task.cancel(false);
305             else
306                 ensurePrestart();
307         }
308     }
309 
310     /**
311      * Requeues a periodic task unless current run state precludes it.
312      * Same idea as delayedExecute except drops task rather than rejecting.
313      *
314      * @param task the task
315      */
reExecutePeriodic(RunnableScheduledFuture<?> task)316     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
317         if (canRunInCurrentRunState(true)) {
318             super.getQueue().add(task);
319             if (!canRunInCurrentRunState(true) && remove(task))
320                 task.cancel(false);
321             else
322                 ensurePrestart();
323         }
324     }
325 
326     /**
327      * Cancels and clears the queue of all tasks that should not be run
328      * due to shutdown policy.  Invoked within super.shutdown.
329      */
onShutdown()330     @Override void onShutdown() {
331         BlockingQueue<Runnable> q = super.getQueue();
332         boolean keepDelayed =
333             getExecuteExistingDelayedTasksAfterShutdownPolicy();
334         boolean keepPeriodic =
335             getContinueExistingPeriodicTasksAfterShutdownPolicy();
336         if (!keepDelayed && !keepPeriodic) {
337             for (Object e : q.toArray())
338                 if (e instanceof RunnableScheduledFuture<?>)
339                     ((RunnableScheduledFuture<?>) e).cancel(false);
340             q.clear();
341         }
342         else {
343             // Traverse snapshot to avoid iterator exceptions
344             for (Object e : q.toArray()) {
345                 if (e instanceof RunnableScheduledFuture) {
346                     RunnableScheduledFuture<?> t =
347                         (RunnableScheduledFuture<?>)e;
348                     if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
349                         t.isCancelled()) { // also remove if already cancelled
350                         if (q.remove(t))
351                             t.cancel(false);
352                     }
353                 }
354             }
355         }
356         tryTerminate();
357     }
358 
359     /**
360      * Modifies or replaces the task used to execute a runnable.
361      * This method can be used to override the concrete
362      * class used for managing internal tasks.
363      * The default implementation simply returns the given task.
364      *
365      * @param runnable the submitted Runnable
366      * @param task the task created to execute the runnable
367      * @return a task that can execute the runnable
368      * @since 1.6
369      */
decorateTask( Runnable runnable, RunnableScheduledFuture<V> task)370     protected <V> RunnableScheduledFuture<V> decorateTask(
371         Runnable runnable, RunnableScheduledFuture<V> task) {
372         return task;
373     }
374 
375     /**
376      * Modifies or replaces the task used to execute a callable.
377      * This method can be used to override the concrete
378      * class used for managing internal tasks.
379      * The default implementation simply returns the given task.
380      *
381      * @param callable the submitted Callable
382      * @param task the task created to execute the callable
383      * @return a task that can execute the callable
384      * @since 1.6
385      */
decorateTask( Callable<V> callable, RunnableScheduledFuture<V> task)386     protected <V> RunnableScheduledFuture<V> decorateTask(
387         Callable<V> callable, RunnableScheduledFuture<V> task) {
388         return task;
389     }
390 
391     /**
392      * Creates a new {@code ScheduledThreadPoolExecutor} with the
393      * given core pool size.
394      *
395      * @param corePoolSize the number of threads to keep in the pool, even
396      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
397      * @throws IllegalArgumentException if {@code corePoolSize < 0}
398      */
ScheduledThreadPoolExecutor(int corePoolSize)399     public ScheduledThreadPoolExecutor(int corePoolSize) {
400         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
401               new DelayedWorkQueue());
402     }
403 
404     /**
405      * Creates a new {@code ScheduledThreadPoolExecutor} with the
406      * given initial parameters.
407      *
408      * @param corePoolSize the number of threads to keep in the pool, even
409      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
410      * @param threadFactory the factory to use when the executor
411      *        creates a new thread
412      * @throws IllegalArgumentException if {@code corePoolSize < 0}
413      * @throws NullPointerException if {@code threadFactory} is null
414      */
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)415     public ScheduledThreadPoolExecutor(int corePoolSize,
416                                        ThreadFactory threadFactory) {
417         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
418               new DelayedWorkQueue(), threadFactory);
419     }
420 
421     /**
422      * Creates a new ScheduledThreadPoolExecutor with the given
423      * initial parameters.
424      *
425      * @param corePoolSize the number of threads to keep in the pool, even
426      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
427      * @param handler the handler to use when execution is blocked
428      *        because the thread bounds and queue capacities are reached
429      * @throws IllegalArgumentException if {@code corePoolSize < 0}
430      * @throws NullPointerException if {@code handler} is null
431      */
ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)432     public ScheduledThreadPoolExecutor(int corePoolSize,
433                                        RejectedExecutionHandler handler) {
434         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
435               new DelayedWorkQueue(), handler);
436     }
437 
438     /**
439      * Creates a new ScheduledThreadPoolExecutor with the given
440      * initial parameters.
441      *
442      * @param corePoolSize the number of threads to keep in the pool, even
443      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
444      * @param threadFactory the factory to use when the executor
445      *        creates a new thread
446      * @param handler the handler to use when execution is blocked
447      *        because the thread bounds and queue capacities are reached
448      * @throws IllegalArgumentException if {@code corePoolSize < 0}
449      * @throws NullPointerException if {@code threadFactory} or
450      *         {@code handler} is null
451      */
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)452     public ScheduledThreadPoolExecutor(int corePoolSize,
453                                        ThreadFactory threadFactory,
454                                        RejectedExecutionHandler handler) {
455         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
456               new DelayedWorkQueue(), threadFactory, handler);
457     }
458 
459     /**
460      * Returns the trigger time of a delayed action.
461      */
triggerTime(long delay, TimeUnit unit)462     private long triggerTime(long delay, TimeUnit unit) {
463         return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
464     }
465 
466     /**
467      * Returns the trigger time of a delayed action.
468      */
triggerTime(long delay)469     long triggerTime(long delay) {
470         return now() +
471             ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
472     }
473 
474     /**
475      * Constrains the values of all delays in the queue to be within
476      * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
477      * This may occur if a task is eligible to be dequeued, but has
478      * not yet been, while some other task is added with a delay of
479      * Long.MAX_VALUE.
480      */
overflowFree(long delay)481     private long overflowFree(long delay) {
482         Delayed head = (Delayed) super.getQueue().peek();
483         if (head != null) {
484             long headDelay = head.getDelay(NANOSECONDS);
485             if (headDelay < 0 && (delay - headDelay < 0))
486                 delay = Long.MAX_VALUE + headDelay;
487         }
488         return delay;
489     }
490 
491     /**
492      * @throws RejectedExecutionException {@inheritDoc}
493      * @throws NullPointerException       {@inheritDoc}
494      */
schedule(Runnable command, long delay, TimeUnit unit)495     public ScheduledFuture<?> schedule(Runnable command,
496                                        long delay,
497                                        TimeUnit unit) {
498         if (command == null || unit == null)
499             throw new NullPointerException();
500         RunnableScheduledFuture<?> t = decorateTask(command,
501             new ScheduledFutureTask<Void>(command, null,
502                                           triggerTime(delay, unit)));
503         delayedExecute(t);
504         return t;
505     }
506 
507     /**
508      * @throws RejectedExecutionException {@inheritDoc}
509      * @throws NullPointerException       {@inheritDoc}
510      */
schedule(Callable<V> callable, long delay, TimeUnit unit)511     public <V> ScheduledFuture<V> schedule(Callable<V> callable,
512                                            long delay,
513                                            TimeUnit unit) {
514         if (callable == null || unit == null)
515             throw new NullPointerException();
516         RunnableScheduledFuture<V> t = decorateTask(callable,
517             new ScheduledFutureTask<V>(callable,
518                                        triggerTime(delay, unit)));
519         delayedExecute(t);
520         return t;
521     }
522 
523     /**
524      * @throws RejectedExecutionException {@inheritDoc}
525      * @throws NullPointerException       {@inheritDoc}
526      * @throws IllegalArgumentException   {@inheritDoc}
527      */
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)528     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
529                                                   long initialDelay,
530                                                   long period,
531                                                   TimeUnit unit) {
532         if (command == null || unit == null)
533             throw new NullPointerException();
534         if (period <= 0)
535             throw new IllegalArgumentException();
536         ScheduledFutureTask<Void> sft =
537             new ScheduledFutureTask<Void>(command,
538                                           null,
539                                           triggerTime(initialDelay, unit),
540                                           unit.toNanos(period));
541         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
542         sft.outerTask = t;
543         delayedExecute(t);
544         return t;
545     }
546 
547     /**
548      * @throws RejectedExecutionException {@inheritDoc}
549      * @throws NullPointerException       {@inheritDoc}
550      * @throws IllegalArgumentException   {@inheritDoc}
551      */
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)552     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
553                                                      long initialDelay,
554                                                      long delay,
555                                                      TimeUnit unit) {
556         if (command == null || unit == null)
557             throw new NullPointerException();
558         if (delay <= 0)
559             throw new IllegalArgumentException();
560         ScheduledFutureTask<Void> sft =
561             new ScheduledFutureTask<Void>(command,
562                                           null,
563                                           triggerTime(initialDelay, unit),
564                                           unit.toNanos(-delay));
565         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
566         sft.outerTask = t;
567         delayedExecute(t);
568         return t;
569     }
570 
571     /**
572      * Executes {@code command} with zero required delay.
573      * This has effect equivalent to
574      * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
575      * Note that inspections of the queue and of the list returned by
576      * {@code shutdownNow} will access the zero-delayed
577      * {@link ScheduledFuture}, not the {@code command} itself.
578      *
579      * <p>A consequence of the use of {@code ScheduledFuture} objects is
580      * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
581      * called with a null second {@code Throwable} argument, even if the
582      * {@code command} terminated abruptly.  Instead, the {@code Throwable}
583      * thrown by such a task can be obtained via {@link Future#get}.
584      *
585      * @throws RejectedExecutionException at discretion of
586      *         {@code RejectedExecutionHandler}, if the task
587      *         cannot be accepted for execution because the
588      *         executor has been shut down
589      * @throws NullPointerException {@inheritDoc}
590      */
execute(Runnable command)591     public void execute(Runnable command) {
592         schedule(command, 0, NANOSECONDS);
593     }
594 
595     // Override AbstractExecutorService methods
596 
597     /**
598      * @throws RejectedExecutionException {@inheritDoc}
599      * @throws NullPointerException       {@inheritDoc}
600      */
submit(Runnable task)601     public Future<?> submit(Runnable task) {
602         return schedule(task, 0, NANOSECONDS);
603     }
604 
605     /**
606      * @throws RejectedExecutionException {@inheritDoc}
607      * @throws NullPointerException       {@inheritDoc}
608      */
submit(Runnable task, T result)609     public <T> Future<T> submit(Runnable task, T result) {
610         return schedule(Executors.callable(task, result), 0, NANOSECONDS);
611     }
612 
613     /**
614      * @throws RejectedExecutionException {@inheritDoc}
615      * @throws NullPointerException       {@inheritDoc}
616      */
submit(Callable<T> task)617     public <T> Future<T> submit(Callable<T> task) {
618         return schedule(task, 0, NANOSECONDS);
619     }
620 
621     /**
622      * Sets the policy on whether to continue executing existing
623      * periodic tasks even when this executor has been {@code shutdown}.
624      * In this case, these tasks will only terminate upon
625      * {@code shutdownNow} or after setting the policy to
626      * {@code false} when already shutdown.
627      * This value is by default {@code false}.
628      *
629      * @param value if {@code true}, continue after shutdown, else don't
630      * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
631      */
setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)632     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
633         continueExistingPeriodicTasksAfterShutdown = value;
634         if (!value && isShutdown())
635             onShutdown();
636     }
637 
638     /**
639      * Gets the policy on whether to continue executing existing
640      * periodic tasks even when this executor has been {@code shutdown}.
641      * In this case, these tasks will only terminate upon
642      * {@code shutdownNow} or after setting the policy to
643      * {@code false} when already shutdown.
644      * This value is by default {@code false}.
645      *
646      * @return {@code true} if will continue after shutdown
647      * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
648      */
getContinueExistingPeriodicTasksAfterShutdownPolicy()649     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
650         return continueExistingPeriodicTasksAfterShutdown;
651     }
652 
653     /**
654      * Sets the policy on whether to execute existing delayed
655      * tasks even when this executor has been {@code shutdown}.
656      * In this case, these tasks will only terminate upon
657      * {@code shutdownNow}, or after setting the policy to
658      * {@code false} when already shutdown.
659      * This value is by default {@code true}.
660      *
661      * @param value if {@code true}, execute after shutdown, else don't
662      * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
663      */
setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value)664     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
665         executeExistingDelayedTasksAfterShutdown = value;
666         if (!value && isShutdown())
667             onShutdown();
668     }
669 
670     /**
671      * Gets the policy on whether to execute existing delayed
672      * tasks even when this executor has been {@code shutdown}.
673      * In this case, these tasks will only terminate upon
674      * {@code shutdownNow}, or after setting the policy to
675      * {@code false} when already shutdown.
676      * This value is by default {@code true}.
677      *
678      * @return {@code true} if will execute after shutdown
679      * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
680      */
getExecuteExistingDelayedTasksAfterShutdownPolicy()681     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
682         return executeExistingDelayedTasksAfterShutdown;
683     }
684 
685     /**
686      * Sets the policy on whether cancelled tasks should be immediately
687      * removed from the work queue at time of cancellation.  This value is
688      * by default {@code false}.
689      *
690      * @param value if {@code true}, remove on cancellation, else don't
691      * @see #getRemoveOnCancelPolicy
692      * @since 1.7
693      */
setRemoveOnCancelPolicy(boolean value)694     public void setRemoveOnCancelPolicy(boolean value) {
695         removeOnCancel = value;
696     }
697 
698     /**
699      * Gets the policy on whether cancelled tasks should be immediately
700      * removed from the work queue at time of cancellation.  This value is
701      * by default {@code false}.
702      *
703      * @return {@code true} if cancelled tasks are immediately removed
704      *         from the queue
705      * @see #setRemoveOnCancelPolicy
706      * @since 1.7
707      */
getRemoveOnCancelPolicy()708     public boolean getRemoveOnCancelPolicy() {
709         return removeOnCancel;
710     }
711 
712     /**
713      * Initiates an orderly shutdown in which previously submitted
714      * tasks are executed, but no new tasks will be accepted.
715      * Invocation has no additional effect if already shut down.
716      *
717      * <p>This method does not wait for previously submitted tasks to
718      * complete execution.  Use {@link #awaitTermination awaitTermination}
719      * to do that.
720      *
721      * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
722      * has been set {@code false}, existing delayed tasks whose delays
723      * have not yet elapsed are cancelled.  And unless the {@code
724      * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
725      * {@code true}, future executions of existing periodic tasks will
726      * be cancelled.
727      */
shutdown()728     public void shutdown() {
729         super.shutdown();
730     }
731 
732     /**
733      * Attempts to stop all actively executing tasks, halts the
734      * processing of waiting tasks, and returns a list of the tasks
735      * that were awaiting execution.
736      *
737      * <p>This method does not wait for actively executing tasks to
738      * terminate.  Use {@link #awaitTermination awaitTermination} to
739      * do that.
740      *
741      * <p>There are no guarantees beyond best-effort attempts to stop
742      * processing actively executing tasks.  This implementation
743      * cancels tasks via {@link Thread#interrupt}, so any task that
744      * fails to respond to interrupts may never terminate.
745      *
746      * @return list of tasks that never commenced execution.
747      *         Each element of this list is a {@link ScheduledFuture},
748      *         including those tasks submitted using {@code execute},
749      *         which are for scheduling purposes used as the basis of a
750      *         zero-delay {@code ScheduledFuture}.
751      */
shutdownNow()752     public List<Runnable> shutdownNow() {
753         return super.shutdownNow();
754     }
755 
756     /**
757      * Returns the task queue used by this executor.  Each element of
758      * this queue is a {@link ScheduledFuture}, including those
759      * tasks submitted using {@code execute} which are for scheduling
760      * purposes used as the basis of a zero-delay
761      * {@code ScheduledFuture}.  Iteration over this queue is
762      * <em>not</em> guaranteed to traverse tasks in the order in
763      * which they will execute.
764      *
765      * @return the task queue
766      */
getQueue()767     public BlockingQueue<Runnable> getQueue() {
768         return super.getQueue();
769     }
770 
771     /**
772      * Specialized delay queue. To mesh with TPE declarations, this
773      * class must be declared as a BlockingQueue<Runnable> even though
774      * it can only hold RunnableScheduledFutures.
775      */
776     static class DelayedWorkQueue extends AbstractQueue<Runnable>
777         implements BlockingQueue<Runnable> {
778 
779         /*
780          * A DelayedWorkQueue is based on a heap-based data structure
781          * like those in DelayQueue and PriorityQueue, except that
782          * every ScheduledFutureTask also records its index into the
783          * heap array. This eliminates the need to find a task upon
784          * cancellation, greatly speeding up removal (down from O(n)
785          * to O(log n)), and reducing garbage retention that would
786          * otherwise occur by waiting for the element to rise to top
787          * before clearing. But because the queue may also hold
788          * RunnableScheduledFutures that are not ScheduledFutureTasks,
789          * we are not guaranteed to have such indices available, in
790          * which case we fall back to linear search. (We expect that
791          * most tasks will not be decorated, and that the faster cases
792          * will be much more common.)
793          *
794          * All heap operations must record index changes -- mainly
795          * within siftUp and siftDown. Upon removal, a task's
796          * heapIndex is set to -1. Note that ScheduledFutureTasks can
797          * appear at most once in the queue (this need not be true for
798          * other kinds of tasks or work queues), so are uniquely
799          * identified by heapIndex.
800          */
801 
802         private static final int INITIAL_CAPACITY = 16;
803         private RunnableScheduledFuture<?>[] queue =
804             new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
805         private final ReentrantLock lock = new ReentrantLock();
806         private int size = 0;
807 
808         /**
809          * Thread designated to wait for the task at the head of the
810          * queue.  This variant of the Leader-Follower pattern
811          * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
812          * minimize unnecessary timed waiting.  When a thread becomes
813          * the leader, it waits only for the next delay to elapse, but
814          * other threads await indefinitely.  The leader thread must
815          * signal some other thread before returning from take() or
816          * poll(...), unless some other thread becomes leader in the
817          * interim.  Whenever the head of the queue is replaced with a
818          * task with an earlier expiration time, the leader field is
819          * invalidated by being reset to null, and some waiting
820          * thread, but not necessarily the current leader, is
821          * signalled.  So waiting threads must be prepared to acquire
822          * and lose leadership while waiting.
823          */
824         private Thread leader = null;
825 
826         /**
827          * Condition signalled when a newer task becomes available at the
828          * head of the queue or a new thread may need to become leader.
829          */
830         private final Condition available = lock.newCondition();
831 
832         /**
833          * Sets f's heapIndex if it is a ScheduledFutureTask.
834          */
setIndex(RunnableScheduledFuture<?> f, int idx)835         private void setIndex(RunnableScheduledFuture<?> f, int idx) {
836             if (f instanceof ScheduledFutureTask)
837                 ((ScheduledFutureTask)f).heapIndex = idx;
838         }
839 
840         /**
841          * Sifts element added at bottom up to its heap-ordered spot.
842          * Call only when holding lock.
843          */
siftUp(int k, RunnableScheduledFuture<?> key)844         private void siftUp(int k, RunnableScheduledFuture<?> key) {
845             while (k > 0) {
846                 int parent = (k - 1) >>> 1;
847                 RunnableScheduledFuture<?> e = queue[parent];
848                 if (key.compareTo(e) >= 0)
849                     break;
850                 queue[k] = e;
851                 setIndex(e, k);
852                 k = parent;
853             }
854             queue[k] = key;
855             setIndex(key, k);
856         }
857 
858         /**
859          * Sifts element added at top down to its heap-ordered spot.
860          * Call only when holding lock.
861          */
siftDown(int k, RunnableScheduledFuture<?> key)862         private void siftDown(int k, RunnableScheduledFuture<?> key) {
863             int half = size >>> 1;
864             while (k < half) {
865                 int child = (k << 1) + 1;
866                 RunnableScheduledFuture<?> c = queue[child];
867                 int right = child + 1;
868                 if (right < size && c.compareTo(queue[right]) > 0)
869                     c = queue[child = right];
870                 if (key.compareTo(c) <= 0)
871                     break;
872                 queue[k] = c;
873                 setIndex(c, k);
874                 k = child;
875             }
876             queue[k] = key;
877             setIndex(key, k);
878         }
879 
880         /**
881          * Resizes the heap array.  Call only when holding lock.
882          */
grow()883         private void grow() {
884             int oldCapacity = queue.length;
885             int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
886             if (newCapacity < 0) // overflow
887                 newCapacity = Integer.MAX_VALUE;
888             queue = Arrays.copyOf(queue, newCapacity);
889         }
890 
891         /**
892          * Finds index of given object, or -1 if absent.
893          */
indexOf(Object x)894         private int indexOf(Object x) {
895             if (x != null) {
896                 if (x instanceof ScheduledFutureTask) {
897                     int i = ((ScheduledFutureTask) x).heapIndex;
898                     // Sanity check; x could conceivably be a
899                     // ScheduledFutureTask from some other pool.
900                     if (i >= 0 && i < size && queue[i] == x)
901                         return i;
902                 } else {
903                     for (int i = 0; i < size; i++)
904                         if (x.equals(queue[i]))
905                             return i;
906                 }
907             }
908             return -1;
909         }
910 
contains(Object x)911         public boolean contains(Object x) {
912             final ReentrantLock lock = this.lock;
913             lock.lock();
914             try {
915                 return indexOf(x) != -1;
916             } finally {
917                 lock.unlock();
918             }
919         }
920 
remove(Object x)921         public boolean remove(Object x) {
922             final ReentrantLock lock = this.lock;
923             lock.lock();
924             try {
925                 int i = indexOf(x);
926                 if (i < 0)
927                     return false;
928 
929                 setIndex(queue[i], -1);
930                 int s = --size;
931                 RunnableScheduledFuture<?> replacement = queue[s];
932                 queue[s] = null;
933                 if (s != i) {
934                     siftDown(i, replacement);
935                     if (queue[i] == replacement)
936                         siftUp(i, replacement);
937                 }
938                 return true;
939             } finally {
940                 lock.unlock();
941             }
942         }
943 
size()944         public int size() {
945             final ReentrantLock lock = this.lock;
946             lock.lock();
947             try {
948                 return size;
949             } finally {
950                 lock.unlock();
951             }
952         }
953 
isEmpty()954         public boolean isEmpty() {
955             return size() == 0;
956         }
957 
remainingCapacity()958         public int remainingCapacity() {
959             return Integer.MAX_VALUE;
960         }
961 
peek()962         public RunnableScheduledFuture<?> peek() {
963             final ReentrantLock lock = this.lock;
964             lock.lock();
965             try {
966                 return queue[0];
967             } finally {
968                 lock.unlock();
969             }
970         }
971 
offer(Runnable x)972         public boolean offer(Runnable x) {
973             if (x == null)
974                 throw new NullPointerException();
975             RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
976             final ReentrantLock lock = this.lock;
977             lock.lock();
978             try {
979                 int i = size;
980                 if (i >= queue.length)
981                     grow();
982                 size = i + 1;
983                 if (i == 0) {
984                     queue[0] = e;
985                     setIndex(e, 0);
986                 } else {
987                     siftUp(i, e);
988                 }
989                 if (queue[0] == e) {
990                     leader = null;
991                     available.signal();
992                 }
993             } finally {
994                 lock.unlock();
995             }
996             return true;
997         }
998 
put(Runnable e)999         public void put(Runnable e) {
1000             offer(e);
1001         }
1002 
add(Runnable e)1003         public boolean add(Runnable e) {
1004             return offer(e);
1005         }
1006 
offer(Runnable e, long timeout, TimeUnit unit)1007         public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1008             return offer(e);
1009         }
1010 
1011         /**
1012          * Performs common bookkeeping for poll and take: Replaces
1013          * first element with last and sifts it down.  Call only when
1014          * holding lock.
1015          * @param f the task to remove and return
1016          */
finishPoll(RunnableScheduledFuture<?> f)1017         private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1018             int s = --size;
1019             RunnableScheduledFuture<?> x = queue[s];
1020             queue[s] = null;
1021             if (s != 0)
1022                 siftDown(0, x);
1023             setIndex(f, -1);
1024             return f;
1025         }
1026 
poll()1027         public RunnableScheduledFuture<?> poll() {
1028             final ReentrantLock lock = this.lock;
1029             lock.lock();
1030             try {
1031                 RunnableScheduledFuture<?> first = queue[0];
1032                 if (first == null || first.getDelay(NANOSECONDS) > 0)
1033                     return null;
1034                 else
1035                     return finishPoll(first);
1036             } finally {
1037                 lock.unlock();
1038             }
1039         }
1040 
take()1041         public RunnableScheduledFuture<?> take() throws InterruptedException {
1042             final ReentrantLock lock = this.lock;
1043             lock.lockInterruptibly();
1044             try {
1045                 for (;;) {
1046                     RunnableScheduledFuture<?> first = queue[0];
1047                     if (first == null)
1048                         available.await();
1049                     else {
1050                         long delay = first.getDelay(NANOSECONDS);
1051                         if (delay <= 0)
1052                             return finishPoll(first);
1053                         first = null; // don't retain ref while waiting
1054                         if (leader != null)
1055                             available.await();
1056                         else {
1057                             Thread thisThread = Thread.currentThread();
1058                             leader = thisThread;
1059                             try {
1060                                 available.awaitNanos(delay);
1061                             } finally {
1062                                 if (leader == thisThread)
1063                                     leader = null;
1064                             }
1065                         }
1066                     }
1067                 }
1068             } finally {
1069                 if (leader == null && queue[0] != null)
1070                     available.signal();
1071                 lock.unlock();
1072             }
1073         }
1074 
poll(long timeout, TimeUnit unit)1075         public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1076             throws InterruptedException {
1077             long nanos = unit.toNanos(timeout);
1078             final ReentrantLock lock = this.lock;
1079             lock.lockInterruptibly();
1080             try {
1081                 for (;;) {
1082                     RunnableScheduledFuture<?> first = queue[0];
1083                     if (first == null) {
1084                         if (nanos <= 0)
1085                             return null;
1086                         else
1087                             nanos = available.awaitNanos(nanos);
1088                     } else {
1089                         long delay = first.getDelay(NANOSECONDS);
1090                         if (delay <= 0)
1091                             return finishPoll(first);
1092                         if (nanos <= 0)
1093                             return null;
1094                         first = null; // don't retain ref while waiting
1095                         if (nanos < delay || leader != null)
1096                             nanos = available.awaitNanos(nanos);
1097                         else {
1098                             Thread thisThread = Thread.currentThread();
1099                             leader = thisThread;
1100                             try {
1101                                 long timeLeft = available.awaitNanos(delay);
1102                                 nanos -= delay - timeLeft;
1103                             } finally {
1104                                 if (leader == thisThread)
1105                                     leader = null;
1106                             }
1107                         }
1108                     }
1109                 }
1110             } finally {
1111                 if (leader == null && queue[0] != null)
1112                     available.signal();
1113                 lock.unlock();
1114             }
1115         }
1116 
clear()1117         public void clear() {
1118             final ReentrantLock lock = this.lock;
1119             lock.lock();
1120             try {
1121                 for (int i = 0; i < size; i++) {
1122                     RunnableScheduledFuture<?> t = queue[i];
1123                     if (t != null) {
1124                         queue[i] = null;
1125                         setIndex(t, -1);
1126                     }
1127                 }
1128                 size = 0;
1129             } finally {
1130                 lock.unlock();
1131             }
1132         }
1133 
1134         /**
1135          * Returns first element only if it is expired.
1136          * Used only by drainTo.  Call only when holding lock.
1137          */
peekExpired()1138         private RunnableScheduledFuture<?> peekExpired() {
1139             // assert lock.isHeldByCurrentThread();
1140             RunnableScheduledFuture<?> first = queue[0];
1141             return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1142                 null : first;
1143         }
1144 
drainTo(Collection<? super Runnable> c)1145         public int drainTo(Collection<? super Runnable> c) {
1146             if (c == null)
1147                 throw new NullPointerException();
1148             if (c == this)
1149                 throw new IllegalArgumentException();
1150             final ReentrantLock lock = this.lock;
1151             lock.lock();
1152             try {
1153                 RunnableScheduledFuture<?> first;
1154                 int n = 0;
1155                 while ((first = peekExpired()) != null) {
1156                     c.add(first);   // In this order, in case add() throws.
1157                     finishPoll(first);
1158                     ++n;
1159                 }
1160                 return n;
1161             } finally {
1162                 lock.unlock();
1163             }
1164         }
1165 
drainTo(Collection<? super Runnable> c, int maxElements)1166         public int drainTo(Collection<? super Runnable> c, int maxElements) {
1167             if (c == null)
1168                 throw new NullPointerException();
1169             if (c == this)
1170                 throw new IllegalArgumentException();
1171             if (maxElements <= 0)
1172                 return 0;
1173             final ReentrantLock lock = this.lock;
1174             lock.lock();
1175             try {
1176                 RunnableScheduledFuture<?> first;
1177                 int n = 0;
1178                 while (n < maxElements && (first = peekExpired()) != null) {
1179                     c.add(first);   // In this order, in case add() throws.
1180                     finishPoll(first);
1181                     ++n;
1182                 }
1183                 return n;
1184             } finally {
1185                 lock.unlock();
1186             }
1187         }
1188 
toArray()1189         public Object[] toArray() {
1190             final ReentrantLock lock = this.lock;
1191             lock.lock();
1192             try {
1193                 return Arrays.copyOf(queue, size, Object[].class);
1194             } finally {
1195                 lock.unlock();
1196             }
1197         }
1198 
1199         @SuppressWarnings("unchecked")
toArray(T[] a)1200         public <T> T[] toArray(T[] a) {
1201             final ReentrantLock lock = this.lock;
1202             lock.lock();
1203             try {
1204                 if (a.length < size)
1205                     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1206                 System.arraycopy(queue, 0, a, 0, size);
1207                 if (a.length > size)
1208                     a[size] = null;
1209                 return a;
1210             } finally {
1211                 lock.unlock();
1212             }
1213         }
1214 
iterator()1215         public Iterator<Runnable> iterator() {
1216             return new Itr(Arrays.copyOf(queue, size));
1217         }
1218 
1219         /**
1220          * Snapshot iterator that works off copy of underlying q array.
1221          */
1222         private class Itr implements Iterator<Runnable> {
1223             final RunnableScheduledFuture[] array;
1224             int cursor = 0;     // index of next element to return
1225             int lastRet = -1;   // index of last element, or -1 if no such
1226 
Itr(RunnableScheduledFuture[] array)1227             Itr(RunnableScheduledFuture[] array) {
1228                 this.array = array;
1229             }
1230 
hasNext()1231             public boolean hasNext() {
1232                 return cursor < array.length;
1233             }
1234 
next()1235             public Runnable next() {
1236                 if (cursor >= array.length)
1237                     throw new NoSuchElementException();
1238                 lastRet = cursor;
1239                 return array[cursor++];
1240             }
1241 
remove()1242             public void remove() {
1243                 if (lastRet < 0)
1244                     throw new IllegalStateException();
1245                 DelayedWorkQueue.this.remove(array[lastRet]);
1246                 lastRet = -1;
1247             }
1248         }
1249     }
1250 }
1251