1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import java.io.Serializable;
39 import java.lang.invoke.MethodHandles;
40 import java.lang.invoke.VarHandle;
41 import java.lang.reflect.Constructor;
42 import java.util.Collection;
43 import java.util.List;
44 import java.util.RandomAccess;
45 import java.util.concurrent.locks.LockSupport;
46 
47 // BEGIN android-note
48 // removed java 9 code
49 // END android-note
50 
51 /**
52  * Abstract base class for tasks that run within a {@link ForkJoinPool}.
53  * A {@code ForkJoinTask} is a thread-like entity that is much
54  * lighter weight than a normal thread.  Huge numbers of tasks and
55  * subtasks may be hosted by a small number of actual threads in a
56  * ForkJoinPool, at the price of some usage limitations.
57  *
58  * <p>A "main" {@code ForkJoinTask} begins execution when it is
59  * explicitly submitted to a {@link ForkJoinPool}, or, if not already
60  * engaged in a ForkJoin computation, commenced in the {@link
61  * ForkJoinPool#commonPool()} via {@link #fork}, {@link #invoke}, or
62  * related methods.  Once started, it will usually in turn start other
63  * subtasks.  As indicated by the name of this class, many programs
64  * using {@code ForkJoinTask} employ only methods {@link #fork} and
65  * {@link #join}, or derivatives such as {@link
66  * #invokeAll(ForkJoinTask...) invokeAll}.  However, this class also
67  * provides a number of other methods that can come into play in
68  * advanced usages, as well as extension mechanics that allow support
69  * of new forms of fork/join processing.
70  *
71  * <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}.
72  * The efficiency of {@code ForkJoinTask}s stems from a set of
73  * restrictions (that are only partially statically enforceable)
74  * reflecting their main use as computational tasks calculating pure
75  * functions or operating on purely isolated objects.  The primary
76  * coordination mechanisms are {@link #fork}, that arranges
77  * asynchronous execution, and {@link #join}, that doesn't proceed
78  * until the task's result has been computed.  Computations should
79  * ideally avoid {@code synchronized} methods or blocks, and should
80  * minimize other blocking synchronization apart from joining other
81  * tasks or using synchronizers such as Phasers that are advertised to
82  * cooperate with fork/join scheduling. Subdividable tasks should also
83  * not perform blocking I/O, and should ideally access variables that
84  * are completely independent of those accessed by other running
85  * tasks. These guidelines are loosely enforced by not permitting
86  * checked exceptions such as {@code IOExceptions} to be
87  * thrown. However, computations may still encounter unchecked
88  * exceptions, that are rethrown to callers attempting to join
89  * them. These exceptions may additionally include {@link
90  * RejectedExecutionException} stemming from internal resource
91  * exhaustion, such as failure to allocate internal task
92  * queues. Rethrown exceptions behave in the same way as regular
93  * exceptions, but, when possible, contain stack traces (as displayed
94  * for example using {@code ex.printStackTrace()}) of both the thread
95  * that initiated the computation as well as the thread actually
96  * encountering the exception; minimally only the latter.
97  *
98  * <p>It is possible to define and use ForkJoinTasks that may block,
99  * but doing so requires three further considerations: (1) Completion
100  * of few if any <em>other</em> tasks should be dependent on a task
101  * that blocks on external synchronization or I/O. Event-style async
102  * tasks that are never joined (for example, those subclassing {@link
103  * CountedCompleter}) often fall into this category.  (2) To minimize
104  * resource impact, tasks should be small; ideally performing only the
105  * (possibly) blocking action. (3) Unless the {@link
106  * ForkJoinPool.ManagedBlocker} API is used, or the number of possibly
107  * blocked tasks is known to be less than the pool's {@link
108  * ForkJoinPool#getParallelism} level, the pool cannot guarantee that
109  * enough threads will be available to ensure progress or good
110  * performance.
111  *
112  * <p>The primary method for awaiting completion and extracting
113  * results of a task is {@link #join}, but there are several variants:
114  * The {@link Future#get} methods support interruptible and/or timed
115  * waits for completion and report results using {@code Future}
116  * conventions. Method {@link #invoke} is semantically
117  * equivalent to {@code fork(); join()} but always attempts to begin
118  * execution in the current thread. The "<em>quiet</em>" forms of
119  * these methods do not extract results or report exceptions. These
120  * may be useful when a set of tasks are being executed, and you need
121  * to delay processing of results or exceptions until all complete.
122  * Method {@code invokeAll} (available in multiple versions)
123  * performs the most common form of parallel invocation: forking a set
124  * of tasks and joining them all.
125  *
126  * <p>In the most typical usages, a fork-join pair act like a call
127  * (fork) and return (join) from a parallel recursive function. As is
128  * the case with other forms of recursive calls, returns (joins)
129  * should be performed innermost-first. For example, {@code a.fork();
130  * b.fork(); b.join(); a.join();} is likely to be substantially more
131  * efficient than joining {@code a} before {@code b}.
132  *
133  * <p>The execution status of tasks may be queried at several levels
134  * of detail: {@link #isDone} is true if a task completed in any way
135  * (including the case where a task was cancelled without executing);
136  * {@link #isCompletedNormally} is true if a task completed without
137  * cancellation or encountering an exception; {@link #isCancelled} is
138  * true if the task was cancelled (in which case {@link #getException}
139  * returns a {@link CancellationException}); and
140  * {@link #isCompletedAbnormally} is true if a task was either
141  * cancelled or encountered an exception, in which case {@link
142  * #getException} will return either the encountered exception or
143  * {@link CancellationException}.
144  *
145  * <p>The ForkJoinTask class is not usually directly subclassed.
146  * Instead, you subclass one of the abstract classes that support a
147  * particular style of fork/join processing, typically {@link
148  * RecursiveAction} for most computations that do not return results,
149  * {@link RecursiveTask} for those that do, and {@link
150  * CountedCompleter} for those in which completed actions trigger
151  * other actions.  Normally, a concrete ForkJoinTask subclass declares
152  * fields comprising its parameters, established in a constructor, and
153  * then defines a {@code compute} method that somehow uses the control
154  * methods supplied by this base class.
155  *
156  * <p>Method {@link #join} and its variants are appropriate for use
157  * only when completion dependencies are acyclic; that is, the
158  * parallel computation can be described as a directed acyclic graph
159  * (DAG). Otherwise, executions may encounter a form of deadlock as
160  * tasks cyclically wait for each other.  However, this framework
161  * supports other methods and techniques (for example the use of
162  * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that
163  * may be of use in constructing custom subclasses for problems that
164  * are not statically structured as DAGs. To support such usages, a
165  * ForkJoinTask may be atomically <em>tagged</em> with a {@code short}
166  * value using {@link #setForkJoinTaskTag} or {@link
167  * #compareAndSetForkJoinTaskTag} and checked using {@link
168  * #getForkJoinTaskTag}. The ForkJoinTask implementation does not use
169  * these {@code protected} methods or tags for any purpose, but they
170  * may be of use in the construction of specialized subclasses.  For
171  * example, parallel graph traversals can use the supplied methods to
172  * avoid revisiting nodes/tasks that have already been processed.
173  * (Method names for tagging are bulky in part to encourage definition
174  * of methods that reflect their usage patterns.)
175  *
176  * <p>Most base support methods are {@code final}, to prevent
177  * overriding of implementations that are intrinsically tied to the
178  * underlying lightweight task scheduling framework.  Developers
179  * creating new basic styles of fork/join processing should minimally
180  * implement {@code protected} methods {@link #exec}, {@link
181  * #setRawResult}, and {@link #getRawResult}, while also introducing
182  * an abstract computational method that can be implemented in its
183  * subclasses, possibly relying on other {@code protected} methods
184  * provided by this class.
185  *
186  * <p>ForkJoinTasks should perform relatively small amounts of
187  * computation. Large tasks should be split into smaller subtasks,
188  * usually via recursive decomposition. As a very rough rule of thumb,
189  * a task should perform more than 100 and less than 10000 basic
190  * computational steps, and should avoid indefinite looping. If tasks
191  * are too big, then parallelism cannot improve throughput. If too
192  * small, then memory and internal task maintenance overhead may
193  * overwhelm processing.
194  *
195  * <p>This class provides {@code adapt} methods for {@link Runnable}
196  * and {@link Callable}, that may be of use when mixing execution of
197  * {@code ForkJoinTasks} with other kinds of tasks. When all tasks are
198  * of this form, consider using a pool constructed in <em>asyncMode</em>.
199  *
200  * <p>ForkJoinTasks are {@code Serializable}, which enables them to be
201  * used in extensions such as remote execution frameworks. It is
202  * sensible to serialize tasks only before or after, but not during,
203  * execution. Serialization is not relied on during execution itself.
204  *
205  * @since 1.7
206  * @author Doug Lea
207  */
208 public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
209 
210     /*
211      * See the internal documentation of class ForkJoinPool for a
212      * general implementation overview.  ForkJoinTasks are mainly
213      * responsible for maintaining their "status" field amidst relays
214      * to methods in ForkJoinWorkerThread and ForkJoinPool.
215      *
216      * The methods of this class are more-or-less layered into
217      * (1) basic status maintenance
218      * (2) execution and awaiting completion
219      * (3) user-level methods that additionally report results.
220      * This is sometimes hard to see because this file orders exported
221      * methods in a way that flows well in javadocs.
222      *
223      * Revision notes: The use of "Aux" field replaces previous
224      * reliance on a table to hold exceptions and synchronized blocks
225      * and monitors to wait for completion.
226      */
227 
228     /**
229      * Nodes for threads waiting for completion, or holding a thrown
230      * exception (never both). Waiting threads prepend nodes
231      * Treiber-stack-style.  Signallers detach and unpark
232      * waiters. Cancelled waiters try to unsplice.
233      */
234     static final class Aux {
235         final Thread thread;
236         final Throwable ex;  // null if a waiter
237         Aux next;            // accessed only via memory-acquire chains
Aux(Thread thread, Throwable ex)238         Aux(Thread thread, Throwable ex) {
239             this.thread = thread;
240             this.ex = ex;
241         }
casNext(Aux c, Aux v)242         final boolean casNext(Aux c, Aux v) { // used only in cancellation
243             return NEXT.compareAndSet(this, c, v);
244         }
245         private static final VarHandle NEXT;
246         static {
247             try {
248                 NEXT = MethodHandles.lookup()
249                     .findVarHandle(Aux.class, "next", Aux.class);
250             } catch (ReflectiveOperationException e) {
251                 throw new ExceptionInInitializerError(e);
252             }
253         }
254     }
255 
256     /*
257      * The status field holds bits packed into a single int to ensure
258      * atomicity.  Status is initially zero, and takes on nonnegative
259      * values until completed, upon which it holds (sign bit) DONE,
260      * possibly with ABNORMAL (cancelled or exceptional) and THROWN
261      * (in which case an exception has been stored). A value of
262      * ABNORMAL without DONE signifies an interrupted wait.  These
263      * control bits occupy only (some of) the upper half (16 bits) of
264      * status field. The lower bits are used for user-defined tags.
265      */
266     private static final int DONE         = 1 << 31; // must be negative
267     private static final int ABNORMAL     = 1 << 16;
268     private static final int THROWN       = 1 << 17;
269     private static final int SMASK        = 0xffff;  // short bits for tags
270     private static final int UNCOMPENSATE = 1 << 16; // helpJoin return sentinel
271 
272     // Fields
273     volatile int status;                // accessed directly by pool and workers
274     private transient volatile Aux aux; // either waiters or thrown Exception
275 
276     // Support for atomic operations
277     private static final VarHandle STATUS;
278     private static final VarHandle AUX;
getAndBitwiseOrStatus(int v)279     private int getAndBitwiseOrStatus(int v) {
280         return (int)STATUS.getAndBitwiseOr(this, v);
281     }
casStatus(int c, int v)282     private boolean casStatus(int c, int v) {
283         return STATUS.compareAndSet(this, c, v);
284     }
casAux(Aux c, Aux v)285     private boolean casAux(Aux c, Aux v) {
286         return AUX.compareAndSet(this, c, v);
287     }
288 
289     /** Removes and unparks waiters */
signalWaiters()290     private void signalWaiters() {
291         for (Aux a; (a = aux) != null && a.ex == null; ) {
292             if (casAux(a, null)) {             // detach entire list
293                 for (Thread t; a != null; a = a.next) {
294                     if ((t = a.thread) != Thread.currentThread() && t != null)
295                         LockSupport.unpark(t); // don't self-signal
296                 }
297                 break;
298             }
299         }
300     }
301 
302     /**
303      * Sets DONE status and wakes up threads waiting to join this task.
304      * @return status on exit
305      */
setDone()306     private int setDone() {
307         int s = getAndBitwiseOrStatus(DONE) | DONE;
308         signalWaiters();
309         return s;
310     }
311 
312     /**
313      * Sets ABNORMAL DONE status unless already done, and wakes up threads
314      * waiting to join this task.
315      * @return status on exit
316      */
trySetCancelled()317     private int trySetCancelled() {
318         int s;
319         do {} while ((s = status) >= 0 && !casStatus(s, s |= (DONE | ABNORMAL)));
320         signalWaiters();
321         return s;
322     }
323 
324     /**
325      * Records exception and sets ABNORMAL THROWN DONE status unless
326      * already done, and wakes up threads waiting to join this task.
327      * If losing a race with setDone or trySetCancelled, the exception
328      * may be recorded but not reported.
329      *
330      * @return status on exit
331      */
trySetThrown(Throwable ex)332     final int trySetThrown(Throwable ex) {
333         Aux h = new Aux(Thread.currentThread(), ex), p = null;
334         boolean installed = false;
335         int s;
336         while ((s = status) >= 0) {
337             Aux a;
338             if (!installed && ((a = aux) == null || a.ex == null) &&
339                 (installed = casAux(a, h)))
340                 p = a; // list of waiters replaced by h
341             if (installed && casStatus(s, s |= (DONE | ABNORMAL | THROWN)))
342                 break;
343         }
344         for (; p != null; p = p.next)
345             LockSupport.unpark(p.thread);
346         return s;
347     }
348 
349     /**
350      * Records exception unless already done. Overridable in subclasses.
351      *
352      * @return status on exit
353      */
trySetException(Throwable ex)354     int trySetException(Throwable ex) {
355         return trySetThrown(ex);
356     }
357 
358     /**
359      * Constructor for subclasses to call.
360      */
ForkJoinTask()361     public ForkJoinTask() {}
362 
isExceptionalStatus(int s)363     static boolean isExceptionalStatus(int s) {  // needed by subclasses
364         return (s & THROWN) != 0;
365     }
366 
367     /**
368      * Unless done, calls exec and records status if completed, but
369      * doesn't wait for completion otherwise.
370      *
371      * @return status on exit from this method
372      */
doExec()373     final int doExec() {
374         int s; boolean completed;
375         if ((s = status) >= 0) {
376             try {
377                 completed = exec();
378             } catch (Throwable rex) {
379                 s = trySetException(rex);
380                 completed = false;
381             }
382             if (completed)
383                 s = setDone();
384         }
385         return s;
386     }
387 
388     /**
389      * Helps and/or waits for completion from join, get, or invoke;
390      * called from either internal or external threads.
391      *
392      * @param pool if nonnull, known submitted pool, else assumes current pool
393      * @param ran true if task known to have been exec'd
394      * @param interruptible true if park interruptibly when external
395      * @param timed true if use timed wait
396      * @param nanos if timed, timeout value
397      * @return ABNORMAL if interrupted, else status on exit
398      */
awaitDone(ForkJoinPool pool, boolean ran, boolean interruptible, boolean timed, long nanos)399     private int awaitDone(ForkJoinPool pool, boolean ran,
400                           boolean interruptible, boolean timed,
401                           long nanos) {
402         ForkJoinPool p; boolean internal; int s; Thread t;
403         ForkJoinPool.WorkQueue q = null;
404         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
405             ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
406             p = wt.pool;
407             if (pool == null)
408                 pool = p;
409             if (internal = (pool == p))
410                 q = wt.workQueue;
411         }
412         else {
413             internal = false;
414             p = ForkJoinPool.common;
415             if (pool == null)
416                 pool = p;
417             if (pool == p && p != null)
418                 q = p.externalQueue();
419         }
420         if (interruptible && Thread.interrupted())
421             return ABNORMAL;
422         if ((s = status) < 0)
423             return s;
424         long deadline = 0L;
425         if (timed) {
426             if (nanos <= 0L)
427                 return 0;
428             else if ((deadline = nanos + System.nanoTime()) == 0L)
429                 deadline = 1L;
430         }
431         boolean uncompensate = false;
432         if (q != null && p != null) {  // try helping
433             // help even in timed mode if pool has no parallelism
434             boolean canHelp = !timed || (p.mode & SMASK) == 0;
435             if (canHelp) {
436                 if ((this instanceof CountedCompleter) &&
437                     (s = p.helpComplete(this, q, internal)) < 0)
438                     return s;
439                 if (!ran && ((!internal && q.externalTryUnpush(this)) ||
440                              q.tryRemove(this, internal)) && (s = doExec()) < 0)
441                     return s;
442             }
443             if (internal) {
444                 if ((s = p.helpJoin(this, q, canHelp)) < 0)
445                     return s;
446                 if (s == UNCOMPENSATE)
447                     uncompensate = true;
448             }
449         }
450         // block until done or cancelled wait
451         boolean interrupted = false, queued = false;
452         boolean parked = false, fail = false;
453         Aux node = null;
454         while ((s = status) >= 0) {
455             Aux a; long ns;
456             if (fail || (fail = (pool != null && pool.mode < 0)))
457                 casStatus(s, s | (DONE | ABNORMAL)); // try to cancel
458             else if (parked && Thread.interrupted()) {
459                 if (interruptible) {
460                     s = ABNORMAL;
461                     break;
462                 }
463                 interrupted = true;
464             }
465             else if (queued) {
466                 if (deadline != 0L) {
467                     if ((ns = deadline - System.nanoTime()) <= 0L)
468                         break;
469                     LockSupport.parkNanos(ns);
470                 }
471                 else
472                     LockSupport.park();
473                 parked = true;
474             }
475             else if (node != null) {
476                 if ((a = aux) != null && a.ex != null)
477                     Thread.onSpinWait();     // exception in progress
478                 else if (queued = casAux(node.next = a, node))
479                     LockSupport.setCurrentBlocker(this);
480             }
481             else {
482                 try {
483                     node = new Aux(Thread.currentThread(), null);
484                 } catch (Throwable ex) {     // cannot create
485                     fail = true;
486                 }
487             }
488         }
489         if (pool != null && uncompensate)
490             pool.uncompensate();
491 
492         if (queued) {
493             LockSupport.setCurrentBlocker(null);
494             if (s >= 0) { // cancellation similar to AbstractQueuedSynchronizer
495                 outer: for (Aux a; (a = aux) != null && a.ex == null; ) {
496                     for (Aux trail = null;;) {
497                         Aux next = a.next;
498                         if (a == node) {
499                             if (trail != null)
500                                 trail.casNext(trail, next);
501                             else if (casAux(a, next))
502                                 break outer; // cannot be re-encountered
503                             break;           // restart
504                         } else {
505                             trail = a;
506                             if ((a = next) == null)
507                                 break outer;
508                         }
509                     }
510                 }
511             }
512             else {
513                 signalWaiters();             // help clean or signal
514                 if (interrupted)
515                     Thread.currentThread().interrupt();
516             }
517         }
518         return s;
519     }
520 
521     /**
522      * Cancels, ignoring any exceptions thrown by cancel.  Cancel is
523      * spec'ed not to throw any exceptions, but if it does anyway, we
524      * have no recourse, so guard against this case.
525      */
cancelIgnoringExceptions(Future<?> t)526     static final void cancelIgnoringExceptions(Future<?> t) {
527         if (t != null) {
528             try {
529                 t.cancel(true);
530             } catch (Throwable ignore) {
531             }
532         }
533     }
534 
535     /**
536      * Returns a rethrowable exception for this task, if available.
537      * To provide accurate stack traces, if the exception was not
538      * thrown by the current thread, we try to create a new exception
539      * of the same type as the one thrown, but with the recorded
540      * exception as its cause. If there is no such constructor, we
541      * instead try to use a no-arg constructor, followed by initCause,
542      * to the same effect. If none of these apply, or any fail due to
543      * other exceptions, we return the recorded exception, which is
544      * still correct, although it may contain a misleading stack
545      * trace.
546      *
547      * @return the exception, or null if none
548      */
getThrowableException()549     private Throwable getThrowableException() {
550         Throwable ex; Aux a;
551         if ((a = aux) == null)
552             ex = null;
553         else if ((ex = a.ex) != null && a.thread != Thread.currentThread()) {
554             try {
555                 Constructor<?> noArgCtor = null, oneArgCtor = null;
556                 for (Constructor<?> c : ex.getClass().getConstructors()) {
557                     Class<?>[] ps = c.getParameterTypes();
558                     if (ps.length == 0)
559                         noArgCtor = c;
560                     else if (ps.length == 1 && ps[0] == Throwable.class) {
561                         oneArgCtor = c;
562                         break;
563                     }
564                 }
565                 if (oneArgCtor != null)
566                     ex = (Throwable)oneArgCtor.newInstance(ex);
567                 else if (noArgCtor != null) {
568                     Throwable rx = (Throwable)noArgCtor.newInstance();
569                     rx.initCause(ex);
570                     ex = rx;
571                 }
572             } catch (Exception ignore) {
573             }
574         }
575         return ex;
576     }
577 
578     /**
579      * Returns exception associated with the given status, or null if none.
580      */
getException(int s)581     private Throwable getException(int s) {
582         Throwable ex = null;
583         if ((s & ABNORMAL) != 0 &&
584             ((s & THROWN) == 0 || (ex = getThrowableException()) == null))
585             ex = new CancellationException();
586         return ex;
587     }
588 
589     /**
590      * Throws exception associated with the given status, or
591      * CancellationException if none recorded.
592      */
reportException(int s)593     private void reportException(int s) {
594         ForkJoinTask.<RuntimeException>uncheckedThrow(
595             (s & THROWN) != 0 ? getThrowableException() : null);
596     }
597 
598     /**
599      * Throws exception for (timed or untimed) get, wrapping if
600      * necessary in an ExecutionException.
601      */
reportExecutionException(int s)602     private void reportExecutionException(int s) {
603         Throwable ex = null;
604         if (s == ABNORMAL)
605             ex = new InterruptedException();
606         else if (s >= 0)
607             ex = new TimeoutException();
608         else if ((s & THROWN) != 0 && (ex = getThrowableException()) != null)
609             ex = new ExecutionException(ex);
610         ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
611     }
612 
613     /**
614      * A version of "sneaky throw" to relay exceptions in other
615      * contexts.
616      */
rethrow(Throwable ex)617     static void rethrow(Throwable ex) {
618         ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
619     }
620 
621     /**
622      * The sneaky part of sneaky throw, relying on generics
623      * limitations to evade compiler complaints about rethrowing
624      * unchecked exceptions. If argument null, throws
625      * CancellationException.
626      */
627     @SuppressWarnings("unchecked") static <T extends Throwable>
uncheckedThrow(Throwable t)628     void uncheckedThrow(Throwable t) throws T {
629         if (t == null)
630             t = new CancellationException();
631         throw (T)t; // rely on vacuous cast
632     }
633 
634     // public methods
635 
636     /**
637      * Arranges to asynchronously execute this task in the pool the
638      * current task is running in, if applicable, or using the {@link
639      * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}.  While
640      * it is not necessarily enforced, it is a usage error to fork a
641      * task more than once unless it has completed and been
642      * reinitialized.  Subsequent modifications to the state of this
643      * task or any data it operates on are not necessarily
644      * consistently observable by any thread other than the one
645      * executing it unless preceded by a call to {@link #join} or
646      * related methods, or a call to {@link #isDone} returning {@code
647      * true}.
648      *
649      * @return {@code this}, to simplify usage
650      */
fork()651     public final ForkJoinTask<V> fork() {
652         Thread t; ForkJoinWorkerThread w;
653         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
654             (w = (ForkJoinWorkerThread)t).workQueue.push(this, w.pool);
655         else
656             ForkJoinPool.common.externalPush(this);
657         return this;
658     }
659 
660     /**
661      * Returns the result of the computation when it
662      * {@linkplain #isDone is done}.
663      * This method differs from {@link #get()} in that abnormal
664      * completion results in {@code RuntimeException} or {@code Error},
665      * not {@code ExecutionException}, and that interrupts of the
666      * calling thread do <em>not</em> cause the method to abruptly
667      * return by throwing {@code InterruptedException}.
668      *
669      * @return the computed result
670      */
join()671     public final V join() {
672         int s;
673         if ((s = status) >= 0)
674             s = awaitDone(null, false, false, false, 0L);
675         if ((s & ABNORMAL) != 0)
676             reportException(s);
677         return getRawResult();
678     }
679 
680     /**
681      * Commences performing this task, awaits its completion if
682      * necessary, and returns its result, or throws an (unchecked)
683      * {@code RuntimeException} or {@code Error} if the underlying
684      * computation did so.
685      *
686      * @return the computed result
687      */
invoke()688     public final V invoke() {
689         int s;
690         if ((s = doExec()) >= 0)
691             s = awaitDone(null, true, false, false, 0L);
692         if ((s & ABNORMAL) != 0)
693             reportException(s);
694         return getRawResult();
695     }
696 
697     /**
698      * Forks the given tasks, returning when {@code isDone} holds for
699      * each task or an (unchecked) exception is encountered, in which
700      * case the exception is rethrown. If more than one task
701      * encounters an exception, then this method throws any one of
702      * these exceptions. If any task encounters an exception, the
703      * other may be cancelled. However, the execution status of
704      * individual tasks is not guaranteed upon exceptional return. The
705      * status of each task may be obtained using {@link
706      * #getException()} and related methods to check if they have been
707      * cancelled, completed normally or exceptionally, or left
708      * unprocessed.
709      *
710      * @param t1 the first task
711      * @param t2 the second task
712      * @throws NullPointerException if any task is null
713      */
invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2)714     public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
715         int s1, s2;
716         if (t1 == null || t2 == null)
717             throw new NullPointerException();
718         t2.fork();
719         if ((s1 = t1.doExec()) >= 0)
720             s1 = t1.awaitDone(null, true, false, false, 0L);
721         if ((s1 & ABNORMAL) != 0) {
722             cancelIgnoringExceptions(t2);
723             t1.reportException(s1);
724         }
725         else if (((s2 = t2.awaitDone(null, false, false, false, 0L)) & ABNORMAL) != 0)
726             t2.reportException(s2);
727     }
728 
729     /**
730      * Forks the given tasks, returning when {@code isDone} holds for
731      * each task or an (unchecked) exception is encountered, in which
732      * case the exception is rethrown. If more than one task
733      * encounters an exception, then this method throws any one of
734      * these exceptions. If any task encounters an exception, others
735      * may be cancelled. However, the execution status of individual
736      * tasks is not guaranteed upon exceptional return. The status of
737      * each task may be obtained using {@link #getException()} and
738      * related methods to check if they have been cancelled, completed
739      * normally or exceptionally, or left unprocessed.
740      *
741      * @param tasks the tasks
742      * @throws NullPointerException if any task is null
743      */
invokeAll(ForkJoinTask<?>.... tasks)744     public static void invokeAll(ForkJoinTask<?>... tasks) {
745         Throwable ex = null;
746         int last = tasks.length - 1;
747         for (int i = last; i >= 0; --i) {
748             ForkJoinTask<?> t;
749             if ((t = tasks[i]) == null) {
750                 ex = new NullPointerException();
751                 break;
752             }
753             if (i == 0) {
754                 int s;
755                 if ((s = t.doExec()) >= 0)
756                     s = t.awaitDone(null, true, false, false, 0L);
757                 if ((s & ABNORMAL) != 0)
758                     ex = t.getException(s);
759                 break;
760             }
761             t.fork();
762         }
763         if (ex == null) {
764             for (int i = 1; i <= last; ++i) {
765                 ForkJoinTask<?> t;
766                 if ((t = tasks[i]) != null) {
767                     int s;
768                     if ((s = t.status) >= 0)
769                         s = t.awaitDone(null, false, false, false, 0L);
770                     if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
771                         break;
772                 }
773             }
774         }
775         if (ex != null) {
776             for (int i = 1; i <= last; ++i)
777                 cancelIgnoringExceptions(tasks[i]);
778             rethrow(ex);
779         }
780     }
781 
782     /**
783      * Forks all tasks in the specified collection, returning when
784      * {@code isDone} holds for each task or an (unchecked) exception
785      * is encountered, in which case the exception is rethrown. If
786      * more than one task encounters an exception, then this method
787      * throws any one of these exceptions. If any task encounters an
788      * exception, others may be cancelled. However, the execution
789      * status of individual tasks is not guaranteed upon exceptional
790      * return. The status of each task may be obtained using {@link
791      * #getException()} and related methods to check if they have been
792      * cancelled, completed normally or exceptionally, or left
793      * unprocessed.
794      *
795      * @param tasks the collection of tasks
796      * @param <T> the type of the values returned from the tasks
797      * @return the tasks argument, to simplify usage
798      * @throws NullPointerException if tasks or any element are null
799      */
invokeAll(Collection<T> tasks)800     public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
801         if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
802             invokeAll(tasks.toArray(new ForkJoinTask<?>[0]));
803             return tasks;
804         }
805         @SuppressWarnings("unchecked")
806         List<? extends ForkJoinTask<?>> ts =
807             (List<? extends ForkJoinTask<?>>) tasks;
808         Throwable ex = null;
809         int last = ts.size() - 1;  // nearly same as array version
810         for (int i = last; i >= 0; --i) {
811             ForkJoinTask<?> t;
812             if ((t = ts.get(i)) == null) {
813                 ex = new NullPointerException();
814                 break;
815             }
816             if (i == 0) {
817                 int s;
818                 if ((s = t.doExec()) >= 0)
819                     s = t.awaitDone(null, true, false, false, 0L);
820                 if ((s & ABNORMAL) != 0)
821                     ex = t.getException(s);
822                 break;
823             }
824             t.fork();
825         }
826         if (ex == null) {
827             for (int i = 1; i <= last; ++i) {
828                 ForkJoinTask<?> t;
829                 if ((t = ts.get(i)) != null) {
830                     int s;
831                     if ((s = t.status) >= 0)
832                         s = t.awaitDone(null, false, false, false, 0L);
833                     if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
834                         break;
835                 }
836             }
837         }
838         if (ex != null) {
839             for (int i = 1; i <= last; ++i)
840                 cancelIgnoringExceptions(ts.get(i));
841             rethrow(ex);
842         }
843         return tasks;
844     }
845 
846     /**
847      * Attempts to cancel execution of this task. This attempt will
848      * fail if the task has already completed or could not be
849      * cancelled for some other reason. If successful, and this task
850      * has not started when {@code cancel} is called, execution of
851      * this task is suppressed. After this method returns
852      * successfully, unless there is an intervening call to {@link
853      * #reinitialize}, subsequent calls to {@link #isCancelled},
854      * {@link #isDone}, and {@code cancel} will return {@code true}
855      * and calls to {@link #join} and related methods will result in
856      * {@code CancellationException}.
857      *
858      * <p>This method may be overridden in subclasses, but if so, must
859      * still ensure that these properties hold. In particular, the
860      * {@code cancel} method itself must not throw exceptions.
861      *
862      * <p>This method is designed to be invoked by <em>other</em>
863      * tasks. To terminate the current task, you can just return or
864      * throw an unchecked exception from its computation method, or
865      * invoke {@link #completeExceptionally(Throwable)}.
866      *
867      * @param mayInterruptIfRunning this value has no effect in the
868      * default implementation because interrupts are not used to
869      * control cancellation.
870      *
871      * @return {@code true} if this task is now cancelled
872      */
cancel(boolean mayInterruptIfRunning)873     public boolean cancel(boolean mayInterruptIfRunning) {
874         return (trySetCancelled() & (ABNORMAL | THROWN)) == ABNORMAL;
875     }
876 
isDone()877     public final boolean isDone() {
878         return status < 0;
879     }
880 
isCancelled()881     public final boolean isCancelled() {
882         return (status & (ABNORMAL | THROWN)) == ABNORMAL;
883     }
884 
885     /**
886      * Returns {@code true} if this task threw an exception or was cancelled.
887      *
888      * @return {@code true} if this task threw an exception or was cancelled
889      */
isCompletedAbnormally()890     public final boolean isCompletedAbnormally() {
891         return (status & ABNORMAL) != 0;
892     }
893 
894     /**
895      * Returns {@code true} if this task completed without throwing an
896      * exception and was not cancelled.
897      *
898      * @return {@code true} if this task completed without throwing an
899      * exception and was not cancelled
900      */
isCompletedNormally()901     public final boolean isCompletedNormally() {
902         return (status & (DONE | ABNORMAL)) == DONE;
903     }
904 
905     /**
906      * Returns the exception thrown by the base computation, or a
907      * {@code CancellationException} if cancelled, or {@code null} if
908      * none or if the method has not yet completed.
909      *
910      * @return the exception, or {@code null} if none
911      */
getException()912     public final Throwable getException() {
913         return getException(status);
914     }
915 
916     /**
917      * Completes this task abnormally, and if not already aborted or
918      * cancelled, causes it to throw the given exception upon
919      * {@code join} and related operations. This method may be used
920      * to induce exceptions in asynchronous tasks, or to force
921      * completion of tasks that would not otherwise complete.  Its use
922      * in other situations is discouraged.  This method is
923      * overridable, but overridden versions must invoke {@code super}
924      * implementation to maintain guarantees.
925      *
926      * @param ex the exception to throw. If this exception is not a
927      * {@code RuntimeException} or {@code Error}, the actual exception
928      * thrown will be a {@code RuntimeException} with cause {@code ex}.
929      */
completeExceptionally(Throwable ex)930     public void completeExceptionally(Throwable ex) {
931         trySetException((ex instanceof RuntimeException) ||
932                         (ex instanceof Error) ? ex :
933                         new RuntimeException(ex));
934     }
935 
936     /**
937      * Completes this task, and if not already aborted or cancelled,
938      * returning the given value as the result of subsequent
939      * invocations of {@code join} and related operations. This method
940      * may be used to provide results for asynchronous tasks, or to
941      * provide alternative handling for tasks that would not otherwise
942      * complete normally. Its use in other situations is
943      * discouraged. This method is overridable, but overridden
944      * versions must invoke {@code super} implementation to maintain
945      * guarantees.
946      *
947      * @param value the result value for this task
948      */
complete(V value)949     public void complete(V value) {
950         try {
951             setRawResult(value);
952         } catch (Throwable rex) {
953             trySetException(rex);
954             return;
955         }
956         setDone();
957     }
958 
959     /**
960      * Completes this task normally without setting a value. The most
961      * recent value established by {@link #setRawResult} (or {@code
962      * null} by default) will be returned as the result of subsequent
963      * invocations of {@code join} and related operations.
964      *
965      * @since 1.8
966      */
quietlyComplete()967     public final void quietlyComplete() {
968         setDone();
969     }
970 
971     /**
972      * Waits if necessary for the computation to complete, and then
973      * retrieves its result.
974      *
975      * @return the computed result
976      * @throws CancellationException if the computation was cancelled
977      * @throws ExecutionException if the computation threw an
978      * exception
979      * @throws InterruptedException if the current thread is not a
980      * member of a ForkJoinPool and was interrupted while waiting
981      */
get()982     public final V get() throws InterruptedException, ExecutionException {
983         int s = awaitDone(null, false, true, false, 0L);
984         if ((s & ABNORMAL) != 0)
985             reportExecutionException(s);
986         return getRawResult();
987     }
988 
989     /**
990      * Waits if necessary for at most the given time for the computation
991      * to complete, and then retrieves its result, if available.
992      *
993      * @param timeout the maximum time to wait
994      * @param unit the time unit of the timeout argument
995      * @return the computed result
996      * @throws CancellationException if the computation was cancelled
997      * @throws ExecutionException if the computation threw an
998      * exception
999      * @throws InterruptedException if the current thread is not a
1000      * member of a ForkJoinPool and was interrupted while waiting
1001      * @throws TimeoutException if the wait timed out
1002      */
get(long timeout, TimeUnit unit)1003     public final V get(long timeout, TimeUnit unit)
1004         throws InterruptedException, ExecutionException, TimeoutException {
1005         long nanos = unit.toNanos(timeout);
1006         int s = awaitDone(null, false, true, true, nanos);
1007         if (s >= 0 || (s & ABNORMAL) != 0)
1008             reportExecutionException(s);
1009         return getRawResult();
1010     }
1011 
1012     /**
1013      * Joins this task, without returning its result or throwing its
1014      * exception. This method may be useful when processing
1015      * collections of tasks when some have been cancelled or otherwise
1016      * known to have aborted.
1017      */
quietlyJoin()1018     public final void quietlyJoin() {
1019         if (status >= 0)
1020             awaitDone(null, false, false, false, 0L);
1021     }
1022 
1023 
1024     /**
1025      * Commences performing this task and awaits its completion if
1026      * necessary, without returning its result or throwing its
1027      * exception.
1028      */
quietlyInvoke()1029     public final void quietlyInvoke() {
1030         if (doExec() >= 0)
1031             awaitDone(null, true, false, false, 0L);
1032     }
1033 
1034     // Versions of join/get for pool.invoke* methods that use external,
1035     // possibly-non-commonPool submits
1036 
awaitPoolInvoke(ForkJoinPool pool)1037     final void awaitPoolInvoke(ForkJoinPool pool) {
1038         awaitDone(pool, false, false, false, 0L);
1039     }
awaitPoolInvoke(ForkJoinPool pool, long nanos)1040     final void awaitPoolInvoke(ForkJoinPool pool, long nanos) {
1041         awaitDone(pool, false, true, true, nanos);
1042     }
joinForPoolInvoke(ForkJoinPool pool)1043     final V joinForPoolInvoke(ForkJoinPool pool) {
1044         int s = awaitDone(pool, false, false, false, 0L);
1045         if ((s & ABNORMAL) != 0)
1046             reportException(s);
1047         return getRawResult();
1048     }
getForPoolInvoke(ForkJoinPool pool)1049     final V getForPoolInvoke(ForkJoinPool pool)
1050         throws InterruptedException, ExecutionException {
1051         int s = awaitDone(pool, false, true, false, 0L);
1052         if ((s & ABNORMAL) != 0)
1053             reportExecutionException(s);
1054         return getRawResult();
1055     }
getForPoolInvoke(ForkJoinPool pool, long nanos)1056     final V getForPoolInvoke(ForkJoinPool pool, long nanos)
1057         throws InterruptedException, ExecutionException, TimeoutException {
1058         int s = awaitDone(pool, false, true, true, nanos);
1059         if (s >= 0 || (s & ABNORMAL) != 0)
1060             reportExecutionException(s);
1061         return getRawResult();
1062     }
1063 
1064     /**
1065      * Possibly executes tasks until the pool hosting the current task
1066      * {@linkplain ForkJoinPool#isQuiescent is quiescent}.  This
1067      * method may be of use in designs in which many tasks are forked,
1068      * but none are explicitly joined, instead executing them until
1069      * all are processed.
1070      */
helpQuiesce()1071     public static void helpQuiesce() {
1072         Thread t; ForkJoinWorkerThread w; ForkJoinPool p;
1073         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
1074             (p = (w = (ForkJoinWorkerThread)t).pool) != null)
1075             p.helpQuiescePool(w.workQueue, Long.MAX_VALUE, false);
1076         else
1077             ForkJoinPool.common.externalHelpQuiescePool(Long.MAX_VALUE, false);
1078     }
1079 
1080     /**
1081      * Resets the internal bookkeeping state of this task, allowing a
1082      * subsequent {@code fork}. This method allows repeated reuse of
1083      * this task, but only if reuse occurs when this task has either
1084      * never been forked, or has been forked, then completed and all
1085      * outstanding joins of this task have also completed. Effects
1086      * under any other usage conditions are not guaranteed.
1087      * This method may be useful when executing
1088      * pre-constructed trees of subtasks in loops.
1089      *
1090      * <p>Upon completion of this method, {@code isDone()} reports
1091      * {@code false}, and {@code getException()} reports {@code
1092      * null}. However, the value returned by {@code getRawResult} is
1093      * unaffected. To clear this value, you can invoke {@code
1094      * setRawResult(null)}.
1095      */
reinitialize()1096     public void reinitialize() {
1097         aux = null;
1098         status = 0;
1099     }
1100 
1101     /**
1102      * Returns the pool hosting the current thread, or {@code null}
1103      * if the current thread is executing outside of any ForkJoinPool.
1104      *
1105      * <p>This method returns {@code null} if and only if {@link
1106      * #inForkJoinPool} returns {@code false}.
1107      *
1108      * @return the pool, or {@code null} if none
1109      */
getPool()1110     public static ForkJoinPool getPool() {
1111         Thread t;
1112         return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1113                 ((ForkJoinWorkerThread) t).pool : null);
1114     }
1115 
1116     /**
1117      * Returns {@code true} if the current thread is a {@link
1118      * ForkJoinWorkerThread} executing as a ForkJoinPool computation.
1119      *
1120      * @return {@code true} if the current thread is a {@link
1121      * ForkJoinWorkerThread} executing as a ForkJoinPool computation,
1122      * or {@code false} otherwise
1123      */
inForkJoinPool()1124     public static boolean inForkJoinPool() {
1125         return Thread.currentThread() instanceof ForkJoinWorkerThread;
1126     }
1127 
1128     /**
1129      * Tries to unschedule this task for execution. This method will
1130      * typically (but is not guaranteed to) succeed if this task is
1131      * the most recently forked task by the current thread, and has
1132      * not commenced executing in another thread.  This method may be
1133      * useful when arranging alternative local processing of tasks
1134      * that could have been, but were not, stolen.
1135      *
1136      * @return {@code true} if unforked
1137      */
tryUnfork()1138     public boolean tryUnfork() {
1139         Thread t; ForkJoinPool.WorkQueue q;
1140         return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
1141             ? (q = ((ForkJoinWorkerThread)t).workQueue) != null
1142                && q.tryUnpush(this)
1143             : (q = ForkJoinPool.commonQueue()) != null
1144                && q.externalTryUnpush(this);
1145     }
1146 
1147     /**
1148      * Returns an estimate of the number of tasks that have been
1149      * forked by the current worker thread but not yet executed. This
1150      * value may be useful for heuristic decisions about whether to
1151      * fork other tasks.
1152      *
1153      * @return the number of tasks
1154      */
getQueuedTaskCount()1155     public static int getQueuedTaskCount() {
1156         Thread t; ForkJoinPool.WorkQueue q;
1157         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
1158             q = ((ForkJoinWorkerThread)t).workQueue;
1159         else
1160             q = ForkJoinPool.commonQueue();
1161         return (q == null) ? 0 : q.queueSize();
1162     }
1163 
1164     /**
1165      * Returns an estimate of how many more locally queued tasks are
1166      * held by the current worker thread than there are other worker
1167      * threads that might steal them, or zero if this thread is not
1168      * operating in a ForkJoinPool. This value may be useful for
1169      * heuristic decisions about whether to fork other tasks. In many
1170      * usages of ForkJoinTasks, at steady state, each worker should
1171      * aim to maintain a small constant surplus (for example, 3) of
1172      * tasks, and to process computations locally if this threshold is
1173      * exceeded.
1174      *
1175      * @return the surplus number of tasks, which may be negative
1176      */
getSurplusQueuedTaskCount()1177     public static int getSurplusQueuedTaskCount() {
1178         return ForkJoinPool.getSurplusQueuedTaskCount();
1179     }
1180 
1181     // Extension methods
1182 
1183     /**
1184      * Returns the result that would be returned by {@link #join}, even
1185      * if this task completed abnormally, or {@code null} if this task
1186      * is not known to have been completed.  This method is designed
1187      * to aid debugging, as well as to support extensions. Its use in
1188      * any other context is discouraged.
1189      *
1190      * @return the result, or {@code null} if not completed
1191      */
getRawResult()1192     public abstract V getRawResult();
1193 
1194     /**
1195      * Forces the given value to be returned as a result.  This method
1196      * is designed to support extensions, and should not in general be
1197      * called otherwise.
1198      *
1199      * @param value the value
1200      */
setRawResult(V value)1201     protected abstract void setRawResult(V value);
1202 
1203     /**
1204      * Immediately performs the base action of this task and returns
1205      * true if, upon return from this method, this task is guaranteed
1206      * to have completed. This method may return false otherwise, to
1207      * indicate that this task is not necessarily complete (or is not
1208      * known to be complete), for example in asynchronous actions that
1209      * require explicit invocations of completion methods. This method
1210      * may also throw an (unchecked) exception to indicate abnormal
1211      * exit. This method is designed to support extensions, and should
1212      * not in general be called otherwise.
1213      *
1214      * @return {@code true} if this task is known to have completed normally
1215      */
exec()1216     protected abstract boolean exec();
1217 
1218     /**
1219      * Returns, but does not unschedule or execute, a task queued by
1220      * the current thread but not yet executed, if one is immediately
1221      * available. There is no guarantee that this task will actually
1222      * be polled or executed next. Conversely, this method may return
1223      * null even if a task exists but cannot be accessed without
1224      * contention with other threads.  This method is designed
1225      * primarily to support extensions, and is unlikely to be useful
1226      * otherwise.
1227      *
1228      * @return the next task, or {@code null} if none are available
1229      */
peekNextLocalTask()1230     protected static ForkJoinTask<?> peekNextLocalTask() {
1231         Thread t; ForkJoinPool.WorkQueue q;
1232         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
1233             q = ((ForkJoinWorkerThread)t).workQueue;
1234         else
1235             q = ForkJoinPool.commonQueue();
1236         return (q == null) ? null : q.peek();
1237     }
1238 
1239     /**
1240      * Unschedules and returns, without executing, the next task
1241      * queued by the current thread but not yet executed, if the
1242      * current thread is operating in a ForkJoinPool.  This method is
1243      * designed primarily to support extensions, and is unlikely to be
1244      * useful otherwise.
1245      *
1246      * @return the next task, or {@code null} if none are available
1247      */
pollNextLocalTask()1248     protected static ForkJoinTask<?> pollNextLocalTask() {
1249         Thread t;
1250         return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1251                 ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() : null);
1252     }
1253 
1254     /**
1255      * If the current thread is operating in a ForkJoinPool,
1256      * unschedules and returns, without executing, the next task
1257      * queued by the current thread but not yet executed, if one is
1258      * available, or if not available, a task that was forked by some
1259      * other thread, if available. Availability may be transient, so a
1260      * {@code null} result does not necessarily imply quiescence of
1261      * the pool this task is operating in.  This method is designed
1262      * primarily to support extensions, and is unlikely to be useful
1263      * otherwise.
1264      *
1265      * @return a task, or {@code null} if none are available
1266      */
pollTask()1267     protected static ForkJoinTask<?> pollTask() {
1268         Thread t; ForkJoinWorkerThread w;
1269         return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1270                 (w = (ForkJoinWorkerThread)t).pool.nextTaskFor(w.workQueue) :
1271                 null);
1272     }
1273 
1274     /**
1275      * If the current thread is operating in a ForkJoinPool,
1276      * unschedules and returns, without executing, a task externally
1277      * submitted to the pool, if one is available. Availability may be
1278      * transient, so a {@code null} result does not necessarily imply
1279      * quiescence of the pool.  This method is designed primarily to
1280      * support extensions, and is unlikely to be useful otherwise.
1281      *
1282      * @return a task, or {@code null} if none are available
1283      * @since 9
1284      * @hide API from OpenJDK 9, not yet exposed on Android.
1285      */
pollSubmission()1286     protected static ForkJoinTask<?> pollSubmission() {
1287         Thread t;
1288         return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1289                 ((ForkJoinWorkerThread)t).pool.pollSubmission() : null);
1290     }
1291 
1292     // tag operations
1293 
1294     /**
1295      * Returns the tag for this task.
1296      *
1297      * @return the tag for this task
1298      * @since 1.8
1299      */
getForkJoinTaskTag()1300     public final short getForkJoinTaskTag() {
1301         return (short)status;
1302     }
1303 
1304     /**
1305      * Atomically sets the tag value for this task and returns the old value.
1306      *
1307      * @param newValue the new tag value
1308      * @return the previous value of the tag
1309      * @since 1.8
1310      */
setForkJoinTaskTag(short newValue)1311     public final short setForkJoinTaskTag(short newValue) {
1312         for (int s;;) {
1313             if (casStatus(s = status, (s & ~SMASK) | (newValue & SMASK)))
1314                 return (short)s;
1315         }
1316     }
1317 
1318     /**
1319      * Atomically conditionally sets the tag value for this task.
1320      * Among other applications, tags can be used as visit markers
1321      * in tasks operating on graphs, as in methods that check: {@code
1322      * if (task.compareAndSetForkJoinTaskTag((short)0, (short)1))}
1323      * before processing, otherwise exiting because the node has
1324      * already been visited.
1325      *
1326      * @param expect the expected tag value
1327      * @param update the new tag value
1328      * @return {@code true} if successful; i.e., the current value was
1329      * equal to {@code expect} and was changed to {@code update}.
1330      * @since 1.8
1331      */
compareAndSetForkJoinTaskTag(short expect, short update)1332     public final boolean compareAndSetForkJoinTaskTag(short expect, short update) {
1333         for (int s;;) {
1334             if ((short)(s = status) != expect)
1335                 return false;
1336             if (casStatus(s, (s & ~SMASK) | (update & SMASK)))
1337                 return true;
1338         }
1339     }
1340 
1341     /**
1342      * Adapter for Runnables. This implements RunnableFuture
1343      * to be compliant with AbstractExecutorService constraints
1344      * when used in ForkJoinPool.
1345      */
1346     static final class AdaptedRunnable<T> extends ForkJoinTask<T>
1347         implements RunnableFuture<T> {
1348         @SuppressWarnings("serial") // Conditionally serializable
1349         final Runnable runnable;
1350         @SuppressWarnings("serial") // Conditionally serializable
1351         T result;
AdaptedRunnable(Runnable runnable, T result)1352         AdaptedRunnable(Runnable runnable, T result) {
1353             if (runnable == null) throw new NullPointerException();
1354             this.runnable = runnable;
1355             this.result = result; // OK to set this even before completion
1356         }
getRawResult()1357         public final T getRawResult() { return result; }
setRawResult(T v)1358         public final void setRawResult(T v) { result = v; }
exec()1359         public final boolean exec() { runnable.run(); return true; }
run()1360         public final void run() { invoke(); }
toString()1361         public String toString() {
1362             return super.toString() + "[Wrapped task = " + runnable + "]";
1363         }
1364         private static final long serialVersionUID = 5232453952276885070L;
1365     }
1366 
1367     /**
1368      * Adapter for Runnables without results.
1369      */
1370     static final class AdaptedRunnableAction extends ForkJoinTask<Void>
1371         implements RunnableFuture<Void> {
1372         @SuppressWarnings("serial") // Conditionally serializable
1373         final Runnable runnable;
AdaptedRunnableAction(Runnable runnable)1374         AdaptedRunnableAction(Runnable runnable) {
1375             if (runnable == null) throw new NullPointerException();
1376             this.runnable = runnable;
1377         }
getRawResult()1378         public final Void getRawResult() { return null; }
setRawResult(Void v)1379         public final void setRawResult(Void v) { }
exec()1380         public final boolean exec() { runnable.run(); return true; }
run()1381         public final void run() { invoke(); }
toString()1382         public String toString() {
1383             return super.toString() + "[Wrapped task = " + runnable + "]";
1384         }
1385         private static final long serialVersionUID = 5232453952276885070L;
1386     }
1387 
1388     /**
1389      * Adapter for Runnables in which failure forces worker exception.
1390      */
1391     static final class RunnableExecuteAction extends ForkJoinTask<Void> {
1392         @SuppressWarnings("serial") // Conditionally serializable
1393         final Runnable runnable;
RunnableExecuteAction(Runnable runnable)1394         RunnableExecuteAction(Runnable runnable) {
1395             if (runnable == null) throw new NullPointerException();
1396             this.runnable = runnable;
1397         }
getRawResult()1398         public final Void getRawResult() { return null; }
setRawResult(Void v)1399         public final void setRawResult(Void v) { }
exec()1400         public final boolean exec() { runnable.run(); return true; }
trySetException(Throwable ex)1401         int trySetException(Throwable ex) { // if a handler, invoke it
1402             int s; Thread t; java.lang.Thread.UncaughtExceptionHandler h;
1403             if (isExceptionalStatus(s = trySetThrown(ex)) &&
1404                 (h = ((t = Thread.currentThread()).
1405                       getUncaughtExceptionHandler())) != null) {
1406                 try {
1407                     h.uncaughtException(t, ex);
1408                 } catch (Throwable ignore) {
1409                 }
1410             }
1411             return s;
1412         }
1413         private static final long serialVersionUID = 5232453952276885070L;
1414     }
1415 
1416     /**
1417      * Adapter for Callables.
1418      */
1419     static final class AdaptedCallable<T> extends ForkJoinTask<T>
1420         implements RunnableFuture<T> {
1421         @SuppressWarnings("serial") // Conditionally serializable
1422         final Callable<? extends T> callable;
1423         @SuppressWarnings("serial") // Conditionally serializable
1424         T result;
AdaptedCallable(Callable<? extends T> callable)1425         AdaptedCallable(Callable<? extends T> callable) {
1426             if (callable == null) throw new NullPointerException();
1427             this.callable = callable;
1428         }
getRawResult()1429         public final T getRawResult() { return result; }
setRawResult(T v)1430         public final void setRawResult(T v) { result = v; }
exec()1431         public final boolean exec() {
1432             try {
1433                 result = callable.call();
1434                 return true;
1435             } catch (RuntimeException rex) {
1436                 throw rex;
1437             } catch (Exception ex) {
1438                 throw new RuntimeException(ex);
1439             }
1440         }
run()1441         public final void run() { invoke(); }
toString()1442         public String toString() {
1443             return super.toString() + "[Wrapped task = " + callable + "]";
1444         }
1445         private static final long serialVersionUID = 2838392045355241008L;
1446     }
1447 
1448     static final class AdaptedInterruptibleCallable<T> extends ForkJoinTask<T>
1449         implements RunnableFuture<T> {
1450         @SuppressWarnings("serial") // Conditionally serializable
1451         final Callable<? extends T> callable;
1452         @SuppressWarnings("serial") // Conditionally serializable
1453         transient volatile Thread runner;
1454         T result;
AdaptedInterruptibleCallable(Callable<? extends T> callable)1455         AdaptedInterruptibleCallable(Callable<? extends T> callable) {
1456             if (callable == null) throw new NullPointerException();
1457             this.callable = callable;
1458         }
getRawResult()1459         public final T getRawResult() { return result; }
setRawResult(T v)1460         public final void setRawResult(T v) { result = v; }
exec()1461         public final boolean exec() {
1462             Thread.interrupted();
1463             runner = Thread.currentThread();
1464             try {
1465                 if (!isDone()) // recheck
1466                     result = callable.call();
1467                 return true;
1468             } catch (RuntimeException rex) {
1469                 throw rex;
1470             } catch (Exception ex) {
1471                 throw new RuntimeException(ex);
1472             } finally {
1473                 runner = null;
1474                 Thread.interrupted();
1475             }
1476         }
run()1477         public final void run() { invoke(); }
cancel(boolean mayInterruptIfRunning)1478         public final boolean cancel(boolean mayInterruptIfRunning) {
1479             Thread t;
1480             boolean stat = super.cancel(false);
1481             if (mayInterruptIfRunning && (t = runner) != null) {
1482                 try {
1483                     t.interrupt();
1484                 } catch (Throwable ignore) {
1485                 }
1486             }
1487             return stat;
1488         }
toString()1489         public String toString() {
1490             return super.toString() + "[Wrapped task = " + callable + "]";
1491         }
1492         private static final long serialVersionUID = 2838392045355241008L;
1493     }
1494 
1495     /**
1496      * Returns a new {@code ForkJoinTask} that performs the {@code run}
1497      * method of the given {@code Runnable} as its action, and returns
1498      * a null result upon {@link #join}.
1499      *
1500      * @param runnable the runnable action
1501      * @return the task
1502      */
adapt(Runnable runnable)1503     public static ForkJoinTask<?> adapt(Runnable runnable) {
1504         return new AdaptedRunnableAction(runnable);
1505     }
1506 
1507     /**
1508      * Returns a new {@code ForkJoinTask} that performs the {@code run}
1509      * method of the given {@code Runnable} as its action, and returns
1510      * the given result upon {@link #join}.
1511      *
1512      * @param runnable the runnable action
1513      * @param result the result upon completion
1514      * @param <T> the type of the result
1515      * @return the task
1516      */
adapt(Runnable runnable, T result)1517     public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
1518         return new AdaptedRunnable<T>(runnable, result);
1519     }
1520 
1521     /**
1522      * Returns a new {@code ForkJoinTask} that performs the {@code call}
1523      * method of the given {@code Callable} as its action, and returns
1524      * its result upon {@link #join}, translating any checked exceptions
1525      * encountered into {@code RuntimeException}.
1526      *
1527      * @param callable the callable action
1528      * @param <T> the type of the callable's result
1529      * @return the task
1530      */
adapt(Callable<? extends T> callable)1531     public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
1532         return new AdaptedCallable<T>(callable);
1533     }
1534 
1535     /**
1536      * Returns a new {@code ForkJoinTask} that performs the {@code call}
1537      * method of the given {@code Callable} as its action, and returns
1538      * its result upon {@link #join}, translating any checked exceptions
1539      * encountered into {@code RuntimeException}.  Additionally,
1540      * invocations of {@code cancel} with {@code mayInterruptIfRunning
1541      * true} will attempt to interrupt the thread performing the task.
1542      *
1543      * @param callable the callable action
1544      * @param <T> the type of the callable's result
1545      * @return the task
1546      *
1547      * @since 17
1548      */
1549     // adaptInterruptible deferred to its own independent change
1550     // https://bugs.openjdk.java.net/browse/JDK-8246587
adaptInterruptible(Callable<? extends T> callable)1551     /* TODO: public */ private static <T> ForkJoinTask<T> adaptInterruptible(Callable<? extends T> callable) {
1552         return new AdaptedInterruptibleCallable<T>(callable);
1553     }
1554 
1555     // Serialization support
1556 
1557     private static final long serialVersionUID = -7721805057305804111L;
1558 
1559     /**
1560      * Saves this task to a stream (that is, serializes it).
1561      *
1562      * @param s the stream
1563      * @throws java.io.IOException if an I/O error occurs
1564      * @serialData the current run status and the exception thrown
1565      * during execution, or {@code null} if none
1566      */
writeObject(java.io.ObjectOutputStream s)1567     private void writeObject(java.io.ObjectOutputStream s)
1568         throws java.io.IOException {
1569         Aux a;
1570         s.defaultWriteObject();
1571         s.writeObject((a = aux) == null ? null : a.ex);
1572     }
1573 
1574     /**
1575      * Reconstitutes this task from a stream (that is, deserializes it).
1576      * @param s the stream
1577      * @throws ClassNotFoundException if the class of a serialized object
1578      *         could not be found
1579      * @throws java.io.IOException if an I/O error occurs
1580      */
readObject(java.io.ObjectInputStream s)1581     private void readObject(java.io.ObjectInputStream s)
1582         throws java.io.IOException, ClassNotFoundException {
1583         s.defaultReadObject();
1584         Object ex = s.readObject();
1585         if (ex != null)
1586             trySetThrown((Throwable)ex);
1587     }
1588 
1589     static {
1590         try {
1591             MethodHandles.Lookup l = MethodHandles.lookup();
1592             STATUS = l.findVarHandle(ForkJoinTask.class, "status", int.class);
1593             AUX = l.findVarHandle(ForkJoinTask.class, "aux", Aux.class);
1594         } catch (ReflectiveOperationException e) {
1595             throw new ExceptionInInitializerError(e);
1596         }
1597     }
1598 
1599 }
1600