1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import java.util.concurrent.locks.LockSupport;
39 
40 /**
41  * A cancellable asynchronous computation.  This class provides a base
42  * implementation of {@link Future}, with methods to start and cancel
43  * a computation, query to see if the computation is complete, and
44  * retrieve the result of the computation.  The result can only be
45  * retrieved when the computation has completed; the {@code get}
46  * methods will block if the computation has not yet completed.  Once
47  * the computation has completed, the computation cannot be restarted
48  * or cancelled (unless the computation is invoked using
49  * {@link #runAndReset}).
50  *
51  * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
52  * {@link Runnable} object.  Because {@code FutureTask} implements
53  * {@code Runnable}, a {@code FutureTask} can be submitted to an
54  * {@link Executor} for execution.
55  *
56  * <p>In addition to serving as a standalone class, this class provides
57  * {@code protected} functionality that may be useful when creating
58  * customized task classes.
59  *
60  * @since 1.5
61  * @author Doug Lea
62  * @param <V> The result type returned by this FutureTask's {@code get} methods
63  */
64 public class FutureTask<V> implements RunnableFuture<V> {
65     /*
66      * Revision notes: This differs from previous versions of this
67      * class that relied on AbstractQueuedSynchronizer, mainly to
68      * avoid surprising users about retaining interrupt status during
69      * cancellation races. Sync control in the current design relies
70      * on a "state" field updated via CAS to track completion, along
71      * with a simple Treiber stack to hold waiting threads.
72      *
73      * Style note: As usual, we bypass overhead of using
74      * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
75      */
76 
77     /**
78      * The run state of this task, initially NEW.  The run state
79      * transitions to a terminal state only in methods set,
80      * setException, and cancel.  During completion, state may take on
81      * transient values of COMPLETING (while outcome is being set) or
82      * INTERRUPTING (only while interrupting the runner to satisfy a
83      * cancel(true)). Transitions from these intermediate to final
84      * states use cheaper ordered/lazy writes because values are unique
85      * and cannot be further modified.
86      *
87      * Possible state transitions:
88      * NEW -> COMPLETING -> NORMAL
89      * NEW -> COMPLETING -> EXCEPTIONAL
90      * NEW -> CANCELLED
91      * NEW -> INTERRUPTING -> INTERRUPTED
92      */
93     private volatile int state;
94     private static final int NEW          = 0;
95     private static final int COMPLETING   = 1;
96     private static final int NORMAL       = 2;
97     private static final int EXCEPTIONAL  = 3;
98     private static final int CANCELLED    = 4;
99     private static final int INTERRUPTING = 5;
100     private static final int INTERRUPTED  = 6;
101 
102     /** The underlying callable; nulled out after running */
103     private Callable<V> callable;
104     /** The result to return or exception to throw from get() */
105     private Object outcome; // non-volatile, protected by state reads/writes
106     /** The thread running the callable; CASed during run() */
107     private volatile Thread runner;
108     /** Treiber stack of waiting threads */
109     private volatile WaitNode waiters;
110 
111     /**
112      * Returns result or throws exception for completed task.
113      *
114      * @param s completed state value
115      */
116     @SuppressWarnings("unchecked")
report(int s)117     private V report(int s) throws ExecutionException {
118         Object x = outcome;
119         if (s == NORMAL)
120             return (V)x;
121         if (s >= CANCELLED)
122             throw new CancellationException();
123         throw new ExecutionException((Throwable)x);
124     }
125 
126     /**
127      * Creates a {@code FutureTask} that will, upon running, execute the
128      * given {@code Callable}.
129      *
130      * @param  callable the callable task
131      * @throws NullPointerException if the callable is null
132      */
FutureTask(Callable<V> callable)133     public FutureTask(Callable<V> callable) {
134         if (callable == null)
135             throw new NullPointerException();
136         this.callable = callable;
137         this.state = NEW;       // ensure visibility of callable
138     }
139 
140     /**
141      * Creates a {@code FutureTask} that will, upon running, execute the
142      * given {@code Runnable}, and arrange that {@code get} will return the
143      * given result on successful completion.
144      *
145      * @param runnable the runnable task
146      * @param result the result to return on successful completion. If
147      * you don't need a particular result, consider using
148      * constructions of the form:
149      * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
150      * @throws NullPointerException if the runnable is null
151      */
FutureTask(Runnable runnable, V result)152     public FutureTask(Runnable runnable, V result) {
153         this.callable = Executors.callable(runnable, result);
154         this.state = NEW;       // ensure visibility of callable
155     }
156 
isCancelled()157     public boolean isCancelled() {
158         return state >= CANCELLED;
159     }
160 
isDone()161     public boolean isDone() {
162         return state != NEW;
163     }
164 
cancel(boolean mayInterruptIfRunning)165     public boolean cancel(boolean mayInterruptIfRunning) {
166         if (!(state == NEW &&
167               U.compareAndSwapInt(this, STATE, NEW,
168                   mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
169             return false;
170         try {    // in case call to interrupt throws exception
171             if (mayInterruptIfRunning) {
172                 try {
173                     Thread t = runner;
174                     if (t != null)
175                         t.interrupt();
176                 } finally { // final state
177                     U.putOrderedInt(this, STATE, INTERRUPTED);
178                 }
179             }
180         } finally {
181             finishCompletion();
182         }
183         return true;
184     }
185 
186     /**
187      * @throws CancellationException {@inheritDoc}
188      */
get()189     public V get() throws InterruptedException, ExecutionException {
190         int s = state;
191         if (s <= COMPLETING)
192             s = awaitDone(false, 0L);
193         return report(s);
194     }
195 
196     /**
197      * @throws CancellationException {@inheritDoc}
198      */
get(long timeout, TimeUnit unit)199     public V get(long timeout, TimeUnit unit)
200         throws InterruptedException, ExecutionException, TimeoutException {
201         if (unit == null)
202             throw new NullPointerException();
203         int s = state;
204         if (s <= COMPLETING &&
205             (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
206             throw new TimeoutException();
207         return report(s);
208     }
209 
210     /**
211      * Protected method invoked when this task transitions to state
212      * {@code isDone} (whether normally or via cancellation). The
213      * default implementation does nothing.  Subclasses may override
214      * this method to invoke completion callbacks or perform
215      * bookkeeping. Note that you can query status inside the
216      * implementation of this method to determine whether this task
217      * has been cancelled.
218      */
done()219     protected void done() { }
220 
221     /**
222      * Sets the result of this future to the given value unless
223      * this future has already been set or has been cancelled.
224      *
225      * <p>This method is invoked internally by the {@link #run} method
226      * upon successful completion of the computation.
227      *
228      * @param v the value
229      */
set(V v)230     protected void set(V v) {
231         if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
232             outcome = v;
233             U.putOrderedInt(this, STATE, NORMAL); // final state
234             finishCompletion();
235         }
236     }
237 
238     /**
239      * Causes this future to report an {@link ExecutionException}
240      * with the given throwable as its cause, unless this future has
241      * already been set or has been cancelled.
242      *
243      * <p>This method is invoked internally by the {@link #run} method
244      * upon failure of the computation.
245      *
246      * @param t the cause of failure
247      */
setException(Throwable t)248     protected void setException(Throwable t) {
249         if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
250             outcome = t;
251             U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
252             finishCompletion();
253         }
254     }
255 
run()256     public void run() {
257         if (state != NEW ||
258             !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
259             return;
260         try {
261             Callable<V> c = callable;
262             if (c != null && state == NEW) {
263                 V result;
264                 boolean ran;
265                 try {
266                     result = c.call();
267                     ran = true;
268                 } catch (Throwable ex) {
269                     result = null;
270                     ran = false;
271                     setException(ex);
272                 }
273                 if (ran)
274                     set(result);
275             }
276         } finally {
277             // runner must be non-null until state is settled to
278             // prevent concurrent calls to run()
279             runner = null;
280             // state must be re-read after nulling runner to prevent
281             // leaked interrupts
282             int s = state;
283             if (s >= INTERRUPTING)
284                 handlePossibleCancellationInterrupt(s);
285         }
286     }
287 
288     /**
289      * Executes the computation without setting its result, and then
290      * resets this future to initial state, failing to do so if the
291      * computation encounters an exception or is cancelled.  This is
292      * designed for use with tasks that intrinsically execute more
293      * than once.
294      *
295      * @return {@code true} if successfully run and reset
296      */
runAndReset()297     protected boolean runAndReset() {
298         if (state != NEW ||
299             !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
300             return false;
301         boolean ran = false;
302         int s = state;
303         try {
304             Callable<V> c = callable;
305             if (c != null && s == NEW) {
306                 try {
307                     c.call(); // don't set result
308                     ran = true;
309                 } catch (Throwable ex) {
310                     setException(ex);
311                 }
312             }
313         } finally {
314             // runner must be non-null until state is settled to
315             // prevent concurrent calls to run()
316             runner = null;
317             // state must be re-read after nulling runner to prevent
318             // leaked interrupts
319             s = state;
320             if (s >= INTERRUPTING)
321                 handlePossibleCancellationInterrupt(s);
322         }
323         return ran && s == NEW;
324     }
325 
326     /**
327      * Ensures that any interrupt from a possible cancel(true) is only
328      * delivered to a task while in run or runAndReset.
329      */
handlePossibleCancellationInterrupt(int s)330     private void handlePossibleCancellationInterrupt(int s) {
331         // It is possible for our interrupter to stall before getting a
332         // chance to interrupt us.  Let's spin-wait patiently.
333         if (s == INTERRUPTING)
334             while (state == INTERRUPTING)
335                 Thread.yield(); // wait out pending interrupt
336 
337         // assert state == INTERRUPTED;
338 
339         // We want to clear any interrupt we may have received from
340         // cancel(true).  However, it is permissible to use interrupts
341         // as an independent mechanism for a task to communicate with
342         // its caller, and there is no way to clear only the
343         // cancellation interrupt.
344         //
345         // Thread.interrupted();
346     }
347 
348     /**
349      * Simple linked list nodes to record waiting threads in a Treiber
350      * stack.  See other classes such as Phaser and SynchronousQueue
351      * for more detailed explanation.
352      */
353     static final class WaitNode {
354         volatile Thread thread;
355         volatile WaitNode next;
WaitNode()356         WaitNode() { thread = Thread.currentThread(); }
357     }
358 
359     /**
360      * Removes and signals all waiting threads, invokes done(), and
361      * nulls out callable.
362      */
finishCompletion()363     private void finishCompletion() {
364         // assert state > COMPLETING;
365         for (WaitNode q; (q = waiters) != null;) {
366             if (U.compareAndSwapObject(this, WAITERS, q, null)) {
367                 for (;;) {
368                     Thread t = q.thread;
369                     if (t != null) {
370                         q.thread = null;
371                         LockSupport.unpark(t);
372                     }
373                     WaitNode next = q.next;
374                     if (next == null)
375                         break;
376                     q.next = null; // unlink to help gc
377                     q = next;
378                 }
379                 break;
380             }
381         }
382 
383         done();
384 
385         callable = null;        // to reduce footprint
386     }
387 
388     /**
389      * Awaits completion or aborts on interrupt or timeout.
390      *
391      * @param timed true if use timed waits
392      * @param nanos time to wait, if timed
393      * @return state upon completion or at timeout
394      */
awaitDone(boolean timed, long nanos)395     private int awaitDone(boolean timed, long nanos)
396         throws InterruptedException {
397         // The code below is very delicate, to achieve these goals:
398         // - call nanoTime exactly once for each call to park
399         // - if nanos <= 0L, return promptly without allocation or nanoTime
400         // - if nanos == Long.MIN_VALUE, don't underflow
401         // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
402         //   and we suffer a spurious wakeup, we will do no worse than
403         //   to park-spin for a while
404         long startTime = 0L;    // Special value 0L means not yet parked
405         WaitNode q = null;
406         boolean queued = false;
407         for (;;) {
408             int s = state;
409             if (s > COMPLETING) {
410                 if (q != null)
411                     q.thread = null;
412                 return s;
413             }
414             else if (s == COMPLETING)
415                 // We may have already promised (via isDone) that we are done
416                 // so never return empty-handed or throw InterruptedException
417                 Thread.yield();
418             else if (Thread.interrupted()) {
419                 removeWaiter(q);
420                 throw new InterruptedException();
421             }
422             else if (q == null) {
423                 if (timed && nanos <= 0L)
424                     return s;
425                 q = new WaitNode();
426             }
427             else if (!queued)
428                 queued = U.compareAndSwapObject(this, WAITERS,
429                                                 q.next = waiters, q);
430             else if (timed) {
431                 final long parkNanos;
432                 if (startTime == 0L) { // first time
433                     startTime = System.nanoTime();
434                     if (startTime == 0L)
435                         startTime = 1L;
436                     parkNanos = nanos;
437                 } else {
438                     long elapsed = System.nanoTime() - startTime;
439                     if (elapsed >= nanos) {
440                         removeWaiter(q);
441                         return state;
442                     }
443                     parkNanos = nanos - elapsed;
444                 }
445                 // nanoTime may be slow; recheck before parking
446                 if (state < COMPLETING)
447                     LockSupport.parkNanos(this, parkNanos);
448             }
449             else
450                 LockSupport.park(this);
451         }
452     }
453 
454     /**
455      * Tries to unlink a timed-out or interrupted wait node to avoid
456      * accumulating garbage.  Internal nodes are simply unspliced
457      * without CAS since it is harmless if they are traversed anyway
458      * by releasers.  To avoid effects of unsplicing from already
459      * removed nodes, the list is retraversed in case of an apparent
460      * race.  This is slow when there are a lot of nodes, but we don't
461      * expect lists to be long enough to outweigh higher-overhead
462      * schemes.
463      */
removeWaiter(WaitNode node)464     private void removeWaiter(WaitNode node) {
465         if (node != null) {
466             node.thread = null;
467             retry:
468             for (;;) {          // restart on removeWaiter race
469                 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
470                     s = q.next;
471                     if (q.thread != null)
472                         pred = q;
473                     else if (pred != null) {
474                         pred.next = s;
475                         if (pred.thread == null) // check for race
476                             continue retry;
477                     }
478                     else if (!U.compareAndSwapObject(this, WAITERS, q, s))
479                         continue retry;
480                 }
481                 break;
482             }
483         }
484     }
485 
486     // Unsafe mechanics
487     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
488     private static final long STATE;
489     private static final long RUNNER;
490     private static final long WAITERS;
491     static {
492         try {
493             STATE = U.objectFieldOffset
494                 (FutureTask.class.getDeclaredField("state"));
495             RUNNER = U.objectFieldOffset
496                 (FutureTask.class.getDeclaredField("runner"));
497             WAITERS = U.objectFieldOffset
498                 (FutureTask.class.getDeclaredField("waiters"));
499         } catch (ReflectiveOperationException e) {
500             throw new Error(e);
501         }
502 
503         // Reduce the risk of rare disastrous classloading in first call to
504         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
505         Class<?> ensureLoaded = LockSupport.class;
506     }
507 
508 }
509