1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/publicdomain/zero/1.0/
5  */
6 
7 package java.util.concurrent;
8 
9 import java.util.concurrent.locks.LockSupport;
10 
11 /**
12  * A cancellable asynchronous computation.  This class provides a base
13  * implementation of {@link Future}, with methods to start and cancel
14  * a computation, query to see if the computation is complete, and
15  * retrieve the result of the computation.  The result can only be
16  * retrieved when the computation has completed; the {@code get}
17  * methods will block if the computation has not yet completed.  Once
18  * the computation has completed, the computation cannot be restarted
19  * or cancelled (unless the computation is invoked using
20  * {@link #runAndReset}).
21  *
22  * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
23  * {@link Runnable} object.  Because {@code FutureTask} implements
24  * {@code Runnable}, a {@code FutureTask} can be submitted to an
25  * {@link Executor} for execution.
26  *
27  * <p>In addition to serving as a standalone class, this class provides
28  * {@code protected} functionality that may be useful when creating
29  * customized task classes.
30  *
31  * @since 1.5
32  * @author Doug Lea
33  * @param <V> The result type returned by this FutureTask's {@code get} methods
34  */
35 public class FutureTask<V> implements RunnableFuture<V> {
36     /*
37      * Revision notes: This differs from previous versions of this
38      * class that relied on AbstractQueuedSynchronizer, mainly to
39      * avoid surprising users about retaining interrupt status during
40      * cancellation races. Sync control in the current design relies
41      * on a "state" field updated via CAS to track completion, along
42      * with a simple Treiber stack to hold waiting threads.
43      *
44      * Style note: As usual, we bypass overhead of using
45      * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
46      */
47 
48     /**
49      * The run state of this task, initially NEW.  The run state
50      * transitions to a terminal state only in methods set,
51      * setException, and cancel.  During completion, state may take on
52      * transient values of COMPLETING (while outcome is being set) or
53      * INTERRUPTING (only while interrupting the runner to satisfy a
54      * cancel(true)). Transitions from these intermediate to final
55      * states use cheaper ordered/lazy writes because values are unique
56      * and cannot be further modified.
57      *
58      * Possible state transitions:
59      * NEW -> COMPLETING -> NORMAL
60      * NEW -> COMPLETING -> EXCEPTIONAL
61      * NEW -> CANCELLED
62      * NEW -> INTERRUPTING -> INTERRUPTED
63      */
64     private volatile int state;
65     private static final int NEW          = 0;
66     private static final int COMPLETING   = 1;
67     private static final int NORMAL       = 2;
68     private static final int EXCEPTIONAL  = 3;
69     private static final int CANCELLED    = 4;
70     private static final int INTERRUPTING = 5;
71     private static final int INTERRUPTED  = 6;
72 
73     /** The underlying callable; nulled out after running */
74     private Callable<V> callable;
75     /** The result to return or exception to throw from get() */
76     private Object outcome; // non-volatile, protected by state reads/writes
77     /** The thread running the callable; CASed during run() */
78     private volatile Thread runner;
79     /** Treiber stack of waiting threads */
80     private volatile WaitNode waiters;
81 
82     /**
83      * Returns result or throws exception for completed task.
84      *
85      * @param s completed state value
86      */
87     @SuppressWarnings("unchecked")
report(int s)88     private V report(int s) throws ExecutionException {
89         Object x = outcome;
90         if (s == NORMAL)
91             return (V)x;
92         if (s >= CANCELLED)
93             throw new CancellationException();
94         throw new ExecutionException((Throwable)x);
95     }
96 
97     /**
98      * Creates a {@code FutureTask} that will, upon running, execute the
99      * given {@code Callable}.
100      *
101      * @param  callable the callable task
102      * @throws NullPointerException if the callable is null
103      */
FutureTask(Callable<V> callable)104     public FutureTask(Callable<V> callable) {
105         if (callable == null)
106             throw new NullPointerException();
107         this.callable = callable;
108         this.state = NEW;       // ensure visibility of callable
109     }
110 
111     /**
112      * Creates a {@code FutureTask} that will, upon running, execute the
113      * given {@code Runnable}, and arrange that {@code get} will return the
114      * given result on successful completion.
115      *
116      * @param runnable the runnable task
117      * @param result the result to return on successful completion. If
118      * you don't need a particular result, consider using
119      * constructions of the form:
120      * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
121      * @throws NullPointerException if the runnable is null
122      */
FutureTask(Runnable runnable, V result)123     public FutureTask(Runnable runnable, V result) {
124         this.callable = Executors.callable(runnable, result);
125         this.state = NEW;       // ensure visibility of callable
126     }
127 
isCancelled()128     public boolean isCancelled() {
129         return state >= CANCELLED;
130     }
131 
isDone()132     public boolean isDone() {
133         return state != NEW;
134     }
135 
cancel(boolean mayInterruptIfRunning)136     public boolean cancel(boolean mayInterruptIfRunning) {
137         if (!(state == NEW &&
138               U.compareAndSwapInt(this, STATE, NEW,
139                   mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
140             return false;
141         try {    // in case call to interrupt throws exception
142             if (mayInterruptIfRunning) {
143                 try {
144                     Thread t = runner;
145                     if (t != null)
146                         t.interrupt();
147                 } finally { // final state
148                     U.putOrderedInt(this, STATE, INTERRUPTED);
149                 }
150             }
151         } finally {
152             finishCompletion();
153         }
154         return true;
155     }
156 
157     /**
158      * @throws CancellationException {@inheritDoc}
159      */
get()160     public V get() throws InterruptedException, ExecutionException {
161         int s = state;
162         if (s <= COMPLETING)
163             s = awaitDone(false, 0L);
164         return report(s);
165     }
166 
167     /**
168      * @throws CancellationException {@inheritDoc}
169      */
get(long timeout, TimeUnit unit)170     public V get(long timeout, TimeUnit unit)
171         throws InterruptedException, ExecutionException, TimeoutException {
172         if (unit == null)
173             throw new NullPointerException();
174         int s = state;
175         if (s <= COMPLETING &&
176             (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
177             throw new TimeoutException();
178         return report(s);
179     }
180 
181     /**
182      * Protected method invoked when this task transitions to state
183      * {@code isDone} (whether normally or via cancellation). The
184      * default implementation does nothing.  Subclasses may override
185      * this method to invoke completion callbacks or perform
186      * bookkeeping. Note that you can query status inside the
187      * implementation of this method to determine whether this task
188      * has been cancelled.
189      */
done()190     protected void done() { }
191 
192     /**
193      * Sets the result of this future to the given value unless
194      * this future has already been set or has been cancelled.
195      *
196      * <p>This method is invoked internally by the {@link #run} method
197      * upon successful completion of the computation.
198      *
199      * @param v the value
200      */
set(V v)201     protected void set(V v) {
202         if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
203             outcome = v;
204             U.putOrderedInt(this, STATE, NORMAL); // final state
205             finishCompletion();
206         }
207     }
208 
209     /**
210      * Causes this future to report an {@link ExecutionException}
211      * with the given throwable as its cause, unless this future has
212      * already been set or has been cancelled.
213      *
214      * <p>This method is invoked internally by the {@link #run} method
215      * upon failure of the computation.
216      *
217      * @param t the cause of failure
218      */
setException(Throwable t)219     protected void setException(Throwable t) {
220         if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
221             outcome = t;
222             U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
223             finishCompletion();
224         }
225     }
226 
run()227     public void run() {
228         if (state != NEW ||
229             !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
230             return;
231         try {
232             Callable<V> c = callable;
233             if (c != null && state == NEW) {
234                 V result;
235                 boolean ran;
236                 try {
237                     result = c.call();
238                     ran = true;
239                 } catch (Throwable ex) {
240                     result = null;
241                     ran = false;
242                     setException(ex);
243                 }
244                 if (ran)
245                     set(result);
246             }
247         } finally {
248             // runner must be non-null until state is settled to
249             // prevent concurrent calls to run()
250             runner = null;
251             // state must be re-read after nulling runner to prevent
252             // leaked interrupts
253             int s = state;
254             if (s >= INTERRUPTING)
255                 handlePossibleCancellationInterrupt(s);
256         }
257     }
258 
259     /**
260      * Executes the computation without setting its result, and then
261      * resets this future to initial state, failing to do so if the
262      * computation encounters an exception or is cancelled.  This is
263      * designed for use with tasks that intrinsically execute more
264      * than once.
265      *
266      * @return {@code true} if successfully run and reset
267      */
runAndReset()268     protected boolean runAndReset() {
269         if (state != NEW ||
270             !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
271             return false;
272         boolean ran = false;
273         int s = state;
274         try {
275             Callable<V> c = callable;
276             if (c != null && s == NEW) {
277                 try {
278                     c.call(); // don't set result
279                     ran = true;
280                 } catch (Throwable ex) {
281                     setException(ex);
282                 }
283             }
284         } finally {
285             // runner must be non-null until state is settled to
286             // prevent concurrent calls to run()
287             runner = null;
288             // state must be re-read after nulling runner to prevent
289             // leaked interrupts
290             s = state;
291             if (s >= INTERRUPTING)
292                 handlePossibleCancellationInterrupt(s);
293         }
294         return ran && s == NEW;
295     }
296 
297     /**
298      * Ensures that any interrupt from a possible cancel(true) is only
299      * delivered to a task while in run or runAndReset.
300      */
handlePossibleCancellationInterrupt(int s)301     private void handlePossibleCancellationInterrupt(int s) {
302         // It is possible for our interrupter to stall before getting a
303         // chance to interrupt us.  Let's spin-wait patiently.
304         if (s == INTERRUPTING)
305             while (state == INTERRUPTING)
306                 Thread.yield(); // wait out pending interrupt
307 
308         // assert state == INTERRUPTED;
309 
310         // We want to clear any interrupt we may have received from
311         // cancel(true).  However, it is permissible to use interrupts
312         // as an independent mechanism for a task to communicate with
313         // its caller, and there is no way to clear only the
314         // cancellation interrupt.
315         //
316         // Thread.interrupted();
317     }
318 
319     /**
320      * Simple linked list nodes to record waiting threads in a Treiber
321      * stack.  See other classes such as Phaser and SynchronousQueue
322      * for more detailed explanation.
323      */
324     static final class WaitNode {
325         volatile Thread thread;
326         volatile WaitNode next;
WaitNode()327         WaitNode() { thread = Thread.currentThread(); }
328     }
329 
330     /**
331      * Removes and signals all waiting threads, invokes done(), and
332      * nulls out callable.
333      */
finishCompletion()334     private void finishCompletion() {
335         // assert state > COMPLETING;
336         for (WaitNode q; (q = waiters) != null;) {
337             if (U.compareAndSwapObject(this, WAITERS, q, null)) {
338                 for (;;) {
339                     Thread t = q.thread;
340                     if (t != null) {
341                         q.thread = null;
342                         LockSupport.unpark(t);
343                     }
344                     WaitNode next = q.next;
345                     if (next == null)
346                         break;
347                     q.next = null; // unlink to help gc
348                     q = next;
349                 }
350                 break;
351             }
352         }
353 
354         done();
355 
356         callable = null;        // to reduce footprint
357     }
358 
359     /**
360      * Awaits completion or aborts on interrupt or timeout.
361      *
362      * @param timed true if use timed waits
363      * @param nanos time to wait, if timed
364      * @return state upon completion or at timeout
365      */
awaitDone(boolean timed, long nanos)366     private int awaitDone(boolean timed, long nanos)
367         throws InterruptedException {
368         // The code below is very delicate, to achieve these goals:
369         // - call nanoTime exactly once for each call to park
370         // - if nanos <= 0L, return promptly without allocation or nanoTime
371         // - if nanos == Long.MIN_VALUE, don't underflow
372         // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
373         //   and we suffer a spurious wakeup, we will do no worse than
374         //   to park-spin for a while
375         long startTime = 0L;    // Special value 0L means not yet parked
376         WaitNode q = null;
377         boolean queued = false;
378         for (;;) {
379             int s = state;
380             if (s > COMPLETING) {
381                 if (q != null)
382                     q.thread = null;
383                 return s;
384             }
385             else if (s == COMPLETING)
386                 // We may have already promised (via isDone) that we are done
387                 // so never return empty-handed or throw InterruptedException
388                 Thread.yield();
389             else if (Thread.interrupted()) {
390                 removeWaiter(q);
391                 throw new InterruptedException();
392             }
393             else if (q == null) {
394                 if (timed && nanos <= 0L)
395                     return s;
396                 q = new WaitNode();
397             }
398             else if (!queued)
399                 queued = U.compareAndSwapObject(this, WAITERS,
400                                                 q.next = waiters, q);
401             else if (timed) {
402                 final long parkNanos;
403                 if (startTime == 0L) { // first time
404                     startTime = System.nanoTime();
405                     if (startTime == 0L)
406                         startTime = 1L;
407                     parkNanos = nanos;
408                 } else {
409                     long elapsed = System.nanoTime() - startTime;
410                     if (elapsed >= nanos) {
411                         removeWaiter(q);
412                         return state;
413                     }
414                     parkNanos = nanos - elapsed;
415                 }
416                 // nanoTime may be slow; recheck before parking
417                 if (state < COMPLETING)
418                     LockSupport.parkNanos(this, parkNanos);
419             }
420             else
421                 LockSupport.park(this);
422         }
423     }
424 
425     /**
426      * Tries to unlink a timed-out or interrupted wait node to avoid
427      * accumulating garbage.  Internal nodes are simply unspliced
428      * without CAS since it is harmless if they are traversed anyway
429      * by releasers.  To avoid effects of unsplicing from already
430      * removed nodes, the list is retraversed in case of an apparent
431      * race.  This is slow when there are a lot of nodes, but we don't
432      * expect lists to be long enough to outweigh higher-overhead
433      * schemes.
434      */
removeWaiter(WaitNode node)435     private void removeWaiter(WaitNode node) {
436         if (node != null) {
437             node.thread = null;
438             retry:
439             for (;;) {          // restart on removeWaiter race
440                 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
441                     s = q.next;
442                     if (q.thread != null)
443                         pred = q;
444                     else if (pred != null) {
445                         pred.next = s;
446                         if (pred.thread == null) // check for race
447                             continue retry;
448                     }
449                     else if (!U.compareAndSwapObject(this, WAITERS, q, s))
450                         continue retry;
451                 }
452                 break;
453             }
454         }
455     }
456 
457     // Unsafe mechanics
458     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
459     private static final long STATE;
460     private static final long RUNNER;
461     private static final long WAITERS;
462     static {
463         try {
464             STATE = U.objectFieldOffset
465                 (FutureTask.class.getDeclaredField("state"));
466             RUNNER = U.objectFieldOffset
467                 (FutureTask.class.getDeclaredField("runner"));
468             WAITERS = U.objectFieldOffset
469                 (FutureTask.class.getDeclaredField("waiters"));
470         } catch (ReflectiveOperationException e) {
471             throw new Error(e);
472         }
473 
474         // Reduce the risk of rare disastrous classloading in first call to
475         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
476         Class<?> ensureLoaded = LockSupport.class;
477     }
478 
479 }
480