1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7 package java.util.concurrent; 8 9 import java.lang.Thread.UncaughtExceptionHandler; 10 import java.util.ArrayList; 11 import java.util.Arrays; 12 import java.util.Collection; 13 import java.util.Collections; 14 import java.util.List; 15 import java.util.concurrent.AbstractExecutorService; 16 import java.util.concurrent.Callable; 17 import java.util.concurrent.ExecutorService; 18 import java.util.concurrent.Future; 19 import java.util.concurrent.RejectedExecutionException; 20 import java.util.concurrent.RunnableFuture; 21 import java.util.concurrent.ThreadLocalRandom; 22 import java.util.concurrent.TimeUnit; 23 24 /** 25 * An {@link ExecutorService} for running {@link ForkJoinTask}s. 26 * A {@code ForkJoinPool} provides the entry point for submissions 27 * from non-{@code ForkJoinTask} clients, as well as management and 28 * monitoring operations. 29 * 30 * <p>A {@code ForkJoinPool} differs from other kinds of {@link 31 * ExecutorService} mainly by virtue of employing 32 * <em>work-stealing</em>: all threads in the pool attempt to find and 33 * execute tasks submitted to the pool and/or created by other active 34 * tasks (eventually blocking waiting for work if none exist). This 35 * enables efficient processing when most tasks spawn other subtasks 36 * (as do most {@code ForkJoinTask}s), as well as when many small 37 * tasks are submitted to the pool from external clients. Especially 38 * when setting <em>asyncMode</em> to true in constructors, {@code 39 * ForkJoinPool}s may also be appropriate for use with event-style 40 * tasks that are never joined. 41 * 42 * <p>A static {@code commonPool()} is available and appropriate for 43 * most applications. The common pool is used by any ForkJoinTask that 44 * is not explicitly submitted to a specified pool. Using the common 45 * pool normally reduces resource usage (its threads are slowly 46 * reclaimed during periods of non-use, and reinstated upon subsequent 47 * use). 48 * 49 * <p>For applications that require separate or custom pools, a {@code 50 * ForkJoinPool} may be constructed with a given target parallelism 51 * level; by default, equal to the number of available processors. The 52 * pool attempts to maintain enough active (or available) threads by 53 * dynamically adding, suspending, or resuming internal worker 54 * threads, even if some tasks are stalled waiting to join others. 55 * However, no such adjustments are guaranteed in the face of blocked 56 * I/O or other unmanaged synchronization. The nested {@link 57 * ManagedBlocker} interface enables extension of the kinds of 58 * synchronization accommodated. 59 * 60 * <p>In addition to execution and lifecycle control methods, this 61 * class provides status check methods (for example 62 * {@link #getStealCount}) that are intended to aid in developing, 63 * tuning, and monitoring fork/join applications. Also, method 64 * {@link #toString} returns indications of pool state in a 65 * convenient form for informal monitoring. 66 * 67 * <p>As is the case with other ExecutorServices, there are three 68 * main task execution methods summarized in the following table. 69 * These are designed to be used primarily by clients not already 70 * engaged in fork/join computations in the current pool. The main 71 * forms of these methods accept instances of {@code ForkJoinTask}, 72 * but overloaded forms also allow mixed execution of plain {@code 73 * Runnable}- or {@code Callable}- based activities as well. However, 74 * tasks that are already executing in a pool should normally instead 75 * use the within-computation forms listed in the table unless using 76 * async event-style tasks that are not usually joined, in which case 77 * there is little difference among choice of methods. 78 * 79 * <table BORDER CELLPADDING=3 CELLSPACING=1> 80 * <caption>Summary of task execution methods</caption> 81 * <tr> 82 * <td></td> 83 * <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td> 84 * <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td> 85 * </tr> 86 * <tr> 87 * <td> <b>Arrange async execution</b></td> 88 * <td> {@link #execute(ForkJoinTask)}</td> 89 * <td> {@link ForkJoinTask#fork}</td> 90 * </tr> 91 * <tr> 92 * <td> <b>Await and obtain result</b></td> 93 * <td> {@link #invoke(ForkJoinTask)}</td> 94 * <td> {@link ForkJoinTask#invoke}</td> 95 * </tr> 96 * <tr> 97 * <td> <b>Arrange exec and obtain Future</b></td> 98 * <td> {@link #submit(ForkJoinTask)}</td> 99 * <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td> 100 * </tr> 101 * </table> 102 * 103 * <p>The common pool is by default constructed with default 104 * parameters, but these may be controlled by setting three 105 * {@linkplain System#getProperty system properties}: 106 * <ul> 107 * <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism} 108 * - the parallelism level, a non-negative integer 109 * <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory} 110 * - the class name of a {@link ForkJoinWorkerThreadFactory} 111 * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler} 112 * - the class name of a {@link UncaughtExceptionHandler} 113 * </ul> 114 * The system class loader is used to load these classes. 115 * Upon any error in establishing these settings, default parameters 116 * are used. It is possible to disable or limit the use of threads in 117 * the common pool by setting the parallelism property to zero, and/or 118 * using a factory that may return {@code null}. 119 * 120 * <p><b>Implementation notes</b>: This implementation restricts the 121 * maximum number of running threads to 32767. Attempts to create 122 * pools with greater than the maximum number result in 123 * {@code IllegalArgumentException}. 124 * 125 * <p>This implementation rejects submitted tasks (that is, by throwing 126 * {@link RejectedExecutionException}) only when the pool is shut down 127 * or internal resources have been exhausted. 128 * 129 * @since 1.7 130 * @author Doug Lea 131 */ 132 public class ForkJoinPool extends AbstractExecutorService { 133 134 /* 135 * Implementation Overview 136 * 137 * This class and its nested classes provide the main 138 * functionality and control for a set of worker threads: 139 * Submissions from non-FJ threads enter into submission queues. 140 * Workers take these tasks and typically split them into subtasks 141 * that may be stolen by other workers. Preference rules give 142 * first priority to processing tasks from their own queues (LIFO 143 * or FIFO, depending on mode), then to randomized FIFO steals of 144 * tasks in other queues. 145 * 146 * WorkQueues 147 * ========== 148 * 149 * Most operations occur within work-stealing queues (in nested 150 * class WorkQueue). These are special forms of Deques that 151 * support only three of the four possible end-operations -- push, 152 * pop, and poll (aka steal), under the further constraints that 153 * push and pop are called only from the owning thread (or, as 154 * extended here, under a lock), while poll may be called from 155 * other threads. (If you are unfamiliar with them, you probably 156 * want to read Herlihy and Shavit's book "The Art of 157 * Multiprocessor programming", chapter 16 describing these in 158 * more detail before proceeding.) The main work-stealing queue 159 * design is roughly similar to those in the papers "Dynamic 160 * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005 161 * (http://research.sun.com/scalable/pubs/index.html) and 162 * "Idempotent work stealing" by Michael, Saraswat, and Vechev, 163 * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186). 164 * See also "Correct and Efficient Work-Stealing for Weak Memory 165 * Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013 166 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an 167 * analysis of memory ordering (atomic, volatile etc) issues. The 168 * main differences ultimately stem from GC requirements that we 169 * null out taken slots as soon as we can, to maintain as small a 170 * footprint as possible even in programs generating huge numbers 171 * of tasks. To accomplish this, we shift the CAS arbitrating pop 172 * vs poll (steal) from being on the indices ("base" and "top") to 173 * the slots themselves. So, both a successful pop and poll 174 * mainly entail a CAS of a slot from non-null to null. Because 175 * we rely on CASes of references, we do not need tag bits on base 176 * or top. They are simple ints as used in any circular 177 * array-based queue (see for example ArrayDeque). Updates to the 178 * indices must still be ordered in a way that guarantees that top 179 * == base means the queue is empty, but otherwise may err on the 180 * side of possibly making the queue appear nonempty when a push, 181 * pop, or poll have not fully committed. Note that this means 182 * that the poll operation, considered individually, is not 183 * wait-free. One thief cannot successfully continue until another 184 * in-progress one (or, if previously empty, a push) completes. 185 * However, in the aggregate, we ensure at least probabilistic 186 * non-blockingness. If an attempted steal fails, a thief always 187 * chooses a different random victim target to try next. So, in 188 * order for one thief to progress, it suffices for any 189 * in-progress poll or new push on any empty queue to 190 * complete. (This is why we normally use method pollAt and its 191 * variants that try once at the apparent base index, else 192 * consider alternative actions, rather than method poll.) 193 * 194 * This approach also enables support of a user mode in which local 195 * task processing is in FIFO, not LIFO order, simply by using 196 * poll rather than pop. This can be useful in message-passing 197 * frameworks in which tasks are never joined. However neither 198 * mode considers affinities, loads, cache localities, etc, so 199 * rarely provide the best possible performance on a given 200 * machine, but portably provide good throughput by averaging over 201 * these factors. (Further, even if we did try to use such 202 * information, we do not usually have a basis for exploiting it. 203 * For example, some sets of tasks profit from cache affinities, 204 * but others are harmed by cache pollution effects.) 205 * 206 * WorkQueues are also used in a similar way for tasks submitted 207 * to the pool. We cannot mix these tasks in the same queues used 208 * for work-stealing (this would contaminate lifo/fifo 209 * processing). Instead, we randomly associate submission queues 210 * with submitting threads, using a form of hashing. The 211 * Submitter probe value serves as a hash code for 212 * choosing existing queues, and may be randomly repositioned upon 213 * contention with other submitters. In essence, submitters act 214 * like workers except that they are restricted to executing local 215 * tasks that they submitted. However, because most 216 * shared/external queue operations are more expensive than 217 * internal, and because, at steady state, external submitters 218 * will compete for CPU with workers, ForkJoinTask.join and 219 * related methods disable them from repeatedly helping to process 220 * tasks if all workers are active. Insertion of tasks in shared 221 * mode requires a lock (mainly to protect in the case of 222 * resizing) but we use only a simple spinlock (using bits in 223 * field qlock), because submitters encountering a busy queue move 224 * on to try or create other queues -- they block only when 225 * creating and registering new queues. 226 * 227 * Management 228 * ========== 229 * 230 * The main throughput advantages of work-stealing stem from 231 * decentralized control -- workers mostly take tasks from 232 * themselves or each other. We cannot negate this in the 233 * implementation of other management responsibilities. The main 234 * tactic for avoiding bottlenecks is packing nearly all 235 * essentially atomic control state into two volatile variables 236 * that are by far most often read (not written) as status and 237 * consistency checks. 238 * 239 * Field "ctl" contains 64 bits holding all the information needed 240 * to atomically decide to add, inactivate, enqueue (on an event 241 * queue), dequeue, and/or re-activate workers. To enable this 242 * packing, we restrict maximum parallelism to (1<<15)-1 (which is 243 * far in excess of normal operating range) to allow ids, counts, 244 * and their negations (used for thresholding) to fit into 16bit 245 * fields. 246 * 247 * Field "plock" is a form of sequence lock with a saturating 248 * shutdown bit (similarly for per-queue "qlocks"), mainly 249 * protecting updates to the workQueues array, as well as to 250 * enable shutdown. When used as a lock, it is normally only very 251 * briefly held, so is nearly always available after at most a 252 * brief spin, but we use a monitor-based backup strategy to 253 * block when needed. 254 * 255 * Recording WorkQueues. WorkQueues are recorded in the 256 * "workQueues" array that is created upon first use and expanded 257 * if necessary. Updates to the array while recording new workers 258 * and unrecording terminated ones are protected from each other 259 * by a lock but the array is otherwise concurrently readable, and 260 * accessed directly. To simplify index-based operations, the 261 * array size is always a power of two, and all readers must 262 * tolerate null slots. Worker queues are at odd indices. Shared 263 * (submission) queues are at even indices, up to a maximum of 64 264 * slots, to limit growth even if array needs to expand to add 265 * more workers. Grouping them together in this way simplifies and 266 * speeds up task scanning. 267 * 268 * All worker thread creation is on-demand, triggered by task 269 * submissions, replacement of terminated workers, and/or 270 * compensation for blocked workers. However, all other support 271 * code is set up to work with other policies. To ensure that we 272 * do not hold on to worker references that would prevent GC, ALL 273 * accesses to workQueues are via indices into the workQueues 274 * array (which is one source of some of the messy code 275 * constructions here). In essence, the workQueues array serves as 276 * a weak reference mechanism. Thus for example the wait queue 277 * field of ctl stores indices, not references. Access to the 278 * workQueues in associated methods (for example signalWork) must 279 * both index-check and null-check the IDs. All such accesses 280 * ignore bad IDs by returning out early from what they are doing, 281 * since this can only be associated with termination, in which 282 * case it is OK to give up. All uses of the workQueues array 283 * also check that it is non-null (even if previously 284 * non-null). This allows nulling during termination, which is 285 * currently not necessary, but remains an option for 286 * resource-revocation-based shutdown schemes. It also helps 287 * reduce JIT issuance of uncommon-trap code, which tends to 288 * unnecessarily complicate control flow in some methods. 289 * 290 * Event Queuing. Unlike HPC work-stealing frameworks, we cannot 291 * let workers spin indefinitely scanning for tasks when none can 292 * be found immediately, and we cannot start/resume workers unless 293 * there appear to be tasks available. On the other hand, we must 294 * quickly prod them into action when new tasks are submitted or 295 * generated. In many usages, ramp-up time to activate workers is 296 * the main limiting factor in overall performance (this is 297 * compounded at program start-up by JIT compilation and 298 * allocation). So we try to streamline this as much as possible. 299 * We park/unpark workers after placing in an event wait queue 300 * when they cannot find work. This "queue" is actually a simple 301 * Treiber stack, headed by the "id" field of ctl, plus a 15bit 302 * counter value (that reflects the number of times a worker has 303 * been inactivated) to avoid ABA effects (we need only as many 304 * version numbers as worker threads). Successors are held in 305 * field WorkQueue.nextWait. Queuing deals with several intrinsic 306 * races, mainly that a task-producing thread can miss seeing (and 307 * signalling) another thread that gave up looking for work but 308 * has not yet entered the wait queue. We solve this by requiring 309 * a full sweep of all workers (via repeated calls to method 310 * scan()) both before and after a newly waiting worker is added 311 * to the wait queue. Because enqueued workers may actually be 312 * rescanning rather than waiting, we set and clear the "parker" 313 * field of WorkQueues to reduce unnecessary calls to unpark. 314 * (This requires a secondary recheck to avoid missed signals.) 315 * Note the unusual conventions about Thread.interrupts 316 * surrounding parking and other blocking: Because interrupts are 317 * used solely to alert threads to check termination, which is 318 * checked anyway upon blocking, we clear status (using 319 * Thread.interrupted) before any call to park, so that park does 320 * not immediately return due to status being set via some other 321 * unrelated call to interrupt in user code. 322 * 323 * Signalling. We create or wake up workers only when there 324 * appears to be at least one task they might be able to find and 325 * execute. When a submission is added or another worker adds a 326 * task to a queue that has fewer than two tasks, they signal 327 * waiting workers (or trigger creation of new ones if fewer than 328 * the given parallelism level -- signalWork). These primary 329 * signals are buttressed by others whenever other threads remove 330 * a task from a queue and notice that there are other tasks there 331 * as well. So in general, pools will be over-signalled. On most 332 * platforms, signalling (unpark) overhead time is noticeably 333 * long, and the time between signalling a thread and it actually 334 * making progress can be very noticeably long, so it is worth 335 * offloading these delays from critical paths as much as 336 * possible. Additionally, workers spin-down gradually, by staying 337 * alive so long as they see the ctl state changing. Similar 338 * stability-sensing techniques are also used before blocking in 339 * awaitJoin and helpComplete. 340 * 341 * Trimming workers. To release resources after periods of lack of 342 * use, a worker starting to wait when the pool is quiescent will 343 * time out and terminate if the pool has remained quiescent for a 344 * given period -- a short period if there are more threads than 345 * parallelism, longer as the number of threads decreases. This 346 * will slowly propagate, eventually terminating all workers after 347 * periods of non-use. 348 * 349 * Shutdown and Termination. A call to shutdownNow atomically sets 350 * a plock bit and then (non-atomically) sets each worker's 351 * qlock status, cancels all unprocessed tasks, and wakes up 352 * all waiting workers. Detecting whether termination should 353 * commence after a non-abrupt shutdown() call requires more work 354 * and bookkeeping. We need consensus about quiescence (i.e., that 355 * there is no more work). The active count provides a primary 356 * indication but non-abrupt shutdown still requires a rechecking 357 * scan for any workers that are inactive but not queued. 358 * 359 * Joining Tasks 360 * ============= 361 * 362 * Any of several actions may be taken when one worker is waiting 363 * to join a task stolen (or always held) by another. Because we 364 * are multiplexing many tasks on to a pool of workers, we can't 365 * just let them block (as in Thread.join). We also cannot just 366 * reassign the joiner's run-time stack with another and replace 367 * it later, which would be a form of "continuation", that even if 368 * possible is not necessarily a good idea since we sometimes need 369 * both an unblocked task and its continuation to progress. 370 * Instead we combine two tactics: 371 * 372 * Helping: Arranging for the joiner to execute some task that it 373 * would be running if the steal had not occurred. 374 * 375 * Compensating: Unless there are already enough live threads, 376 * method tryCompensate() may create or re-activate a spare 377 * thread to compensate for blocked joiners until they unblock. 378 * 379 * A third form (implemented in tryRemoveAndExec) amounts to 380 * helping a hypothetical compensator: If we can readily tell that 381 * a possible action of a compensator is to steal and execute the 382 * task being joined, the joining thread can do so directly, 383 * without the need for a compensation thread (although at the 384 * expense of larger run-time stacks, but the tradeoff is 385 * typically worthwhile). 386 * 387 * The ManagedBlocker extension API can't use helping so relies 388 * only on compensation in method awaitBlocker. 389 * 390 * The algorithm in tryHelpStealer entails a form of "linear" 391 * helping: Each worker records (in field currentSteal) the most 392 * recent task it stole from some other worker. Plus, it records 393 * (in field currentJoin) the task it is currently actively 394 * joining. Method tryHelpStealer uses these markers to try to 395 * find a worker to help (i.e., steal back a task from and execute 396 * it) that could hasten completion of the actively joined task. 397 * In essence, the joiner executes a task that would be on its own 398 * local deque had the to-be-joined task not been stolen. This may 399 * be seen as a conservative variant of the approach in Wagner & 400 * Calder "Leapfrogging: a portable technique for implementing 401 * efficient futures" SIGPLAN Notices, 1993 402 * (http://portal.acm.org/citation.cfm?id=155354). It differs in 403 * that: (1) We only maintain dependency links across workers upon 404 * steals, rather than use per-task bookkeeping. This sometimes 405 * requires a linear scan of workQueues array to locate stealers, 406 * but often doesn't because stealers leave hints (that may become 407 * stale/wrong) of where to locate them. It is only a hint 408 * because a worker might have had multiple steals and the hint 409 * records only one of them (usually the most current). Hinting 410 * isolates cost to when it is needed, rather than adding to 411 * per-task overhead. (2) It is "shallow", ignoring nesting and 412 * potentially cyclic mutual steals. (3) It is intentionally 413 * racy: field currentJoin is updated only while actively joining, 414 * which means that we miss links in the chain during long-lived 415 * tasks, GC stalls etc (which is OK since blocking in such cases 416 * is usually a good idea). (4) We bound the number of attempts 417 * to find work (see MAX_HELP) and fall back to suspending the 418 * worker and if necessary replacing it with another. 419 * 420 * It is impossible to keep exactly the target parallelism number 421 * of threads running at any given time. Determining the 422 * existence of conservatively safe helping targets, the 423 * availability of already-created spares, and the apparent need 424 * to create new spares are all racy, so we rely on multiple 425 * retries of each. Compensation in the apparent absence of 426 * helping opportunities is challenging to control on JVMs, where 427 * GC and other activities can stall progress of tasks that in 428 * turn stall out many other dependent tasks, without us being 429 * able to determine whether they will ever require compensation. 430 * Even though work-stealing otherwise encounters little 431 * degradation in the presence of more threads than cores, 432 * aggressively adding new threads in such cases entails risk of 433 * unwanted positive feedback control loops in which more threads 434 * cause more dependent stalls (as well as delayed progress of 435 * unblocked threads to the point that we know they are available) 436 * leading to more situations requiring more threads, and so 437 * on. This aspect of control can be seen as an (analytically 438 * intractable) game with an opponent that may choose the worst 439 * (for us) active thread to stall at any time. We take several 440 * precautions to bound losses (and thus bound gains), mainly in 441 * methods tryCompensate and awaitJoin. 442 * 443 * Common Pool 444 * =========== 445 * 446 * The static common pool always exists after static 447 * initialization. Since it (or any other created pool) need 448 * never be used, we minimize initial construction overhead and 449 * footprint to the setup of about a dozen fields, with no nested 450 * allocation. Most bootstrapping occurs within method 451 * fullExternalPush during the first submission to the pool. 452 * 453 * When external threads submit to the common pool, they can 454 * perform subtask processing (see externalHelpJoin and related 455 * methods). This caller-helps policy makes it sensible to set 456 * common pool parallelism level to one (or more) less than the 457 * total number of available cores, or even zero for pure 458 * caller-runs. We do not need to record whether external 459 * submissions are to the common pool -- if not, externalHelpJoin 460 * returns quickly (at the most helping to signal some common pool 461 * workers). These submitters would otherwise be blocked waiting 462 * for completion, so the extra effort (with liberally sprinkled 463 * task status checks) in inapplicable cases amounts to an odd 464 * form of limited spin-wait before blocking in ForkJoinTask.join. 465 * 466 * Style notes 467 * =========== 468 * 469 * There is a lot of representation-level coupling among classes 470 * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The 471 * fields of WorkQueue maintain data structures managed by 472 * ForkJoinPool, so are directly accessed. There is little point 473 * trying to reduce this, since any associated future changes in 474 * representations will need to be accompanied by algorithmic 475 * changes anyway. Several methods intrinsically sprawl because 476 * they must accumulate sets of consistent reads of volatiles held 477 * in local variables. Methods signalWork() and scan() are the 478 * main bottlenecks, so are especially heavily 479 * micro-optimized/mangled. There are lots of inline assignments 480 * (of form "while ((local = field) != 0)") which are usually the 481 * simplest way to ensure the required read orderings (which are 482 * sometimes critical). This leads to a "C"-like style of listing 483 * declarations of these locals at the heads of methods or blocks. 484 * There are several occurrences of the unusual "do {} while 485 * (!cas...)" which is the simplest way to force an update of a 486 * CAS'ed variable. There are also other coding oddities (including 487 * several unnecessary-looking hoisted null checks) that help 488 * some methods perform reasonably even when interpreted (not 489 * compiled). 490 * 491 * The order of declarations in this file is: 492 * (1) Static utility functions 493 * (2) Nested (static) classes 494 * (3) Static fields 495 * (4) Fields, along with constants used when unpacking some of them 496 * (5) Internal control methods 497 * (6) Callbacks and other support for ForkJoinTask methods 498 * (7) Exported methods 499 * (8) Static block initializing statics in minimally dependent order 500 */ 501 502 // Static utilities 503 504 /** 505 * If there is a security manager, makes sure caller has 506 * permission to modify threads. 507 */ checkPermission()508 private static void checkPermission() { 509 SecurityManager security = System.getSecurityManager(); 510 if (security != null) 511 security.checkPermission(modifyThreadPermission); 512 } 513 514 // Nested classes 515 516 /** 517 * Factory for creating new {@link ForkJoinWorkerThread}s. 518 * A {@code ForkJoinWorkerThreadFactory} must be defined and used 519 * for {@code ForkJoinWorkerThread} subclasses that extend base 520 * functionality or initialize threads with different contexts. 521 */ 522 public static interface ForkJoinWorkerThreadFactory { 523 /** 524 * Returns a new worker thread operating in the given pool. 525 * 526 * @param pool the pool this thread works in 527 * @throws NullPointerException if the pool is null 528 * @return the new worker thread 529 */ newThread(ForkJoinPool pool)530 public ForkJoinWorkerThread newThread(ForkJoinPool pool); 531 } 532 533 /** 534 * Default ForkJoinWorkerThreadFactory implementation; creates a 535 * new ForkJoinWorkerThread. 536 */ 537 static final class DefaultForkJoinWorkerThreadFactory 538 implements ForkJoinWorkerThreadFactory { newThread(ForkJoinPool pool)539 public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { 540 return new ForkJoinWorkerThread(pool); 541 } 542 } 543 544 /** 545 * Class for artificial tasks that are used to replace the target 546 * of local joins if they are removed from an interior queue slot 547 * in WorkQueue.tryRemoveAndExec. We don't need the proxy to 548 * actually do anything beyond having a unique identity. 549 */ 550 static final class EmptyTask extends ForkJoinTask<Void> { 551 private static final long serialVersionUID = -7721805057305804111L; EmptyTask()552 EmptyTask() { status = ForkJoinTask.NORMAL; } // force done getRawResult()553 public final Void getRawResult() { return null; } setRawResult(Void x)554 public final void setRawResult(Void x) {} exec()555 public final boolean exec() { return true; } 556 } 557 558 /** 559 * Queues supporting work-stealing as well as external task 560 * submission. See above for main rationale and algorithms. 561 * Implementation relies heavily on "Unsafe" intrinsics 562 * and selective use of "volatile": 563 * 564 * Field "base" is the index (mod array.length) of the least valid 565 * queue slot, which is always the next position to steal (poll) 566 * from if nonempty. Reads and writes require volatile orderings 567 * but not CAS, because updates are only performed after slot 568 * CASes. 569 * 570 * Field "top" is the index (mod array.length) of the next queue 571 * slot to push to or pop from. It is written only by owner thread 572 * for push, or under lock for external/shared push, and accessed 573 * by other threads only after reading (volatile) base. Both top 574 * and base are allowed to wrap around on overflow, but (top - 575 * base) (or more commonly -(base - top) to force volatile read of 576 * base before top) still estimates size. The lock ("qlock") is 577 * forced to -1 on termination, causing all further lock attempts 578 * to fail. (Note: we don't need CAS for termination state because 579 * upon pool shutdown, all shared-queues will stop being used 580 * anyway.) Nearly all lock bodies are set up so that exceptions 581 * within lock bodies are "impossible" (modulo JVM errors that 582 * would cause failure anyway.) 583 * 584 * The array slots are read and written using the emulation of 585 * volatiles/atomics provided by Unsafe. Insertions must in 586 * general use putOrderedObject as a form of releasing store to 587 * ensure that all writes to the task object are ordered before 588 * its publication in the queue. All removals entail a CAS to 589 * null. The array is always a power of two. To ensure safety of 590 * Unsafe array operations, all accesses perform explicit null 591 * checks and implicit bounds checks via power-of-two masking. 592 * 593 * In addition to basic queuing support, this class contains 594 * fields described elsewhere to control execution. It turns out 595 * to work better memory-layout-wise to include them in this class 596 * rather than a separate class. 597 * 598 * Performance on most platforms is very sensitive to placement of 599 * instances of both WorkQueues and their arrays -- we absolutely 600 * do not want multiple WorkQueue instances or multiple queue 601 * arrays sharing cache lines. (It would be best for queue objects 602 * and their arrays to share, but there is nothing available to 603 * help arrange that). The @Contended annotation alerts JVMs to 604 * try to keep instances apart. 605 */ 606 static final class WorkQueue { 607 /** 608 * Capacity of work-stealing queue array upon initialization. 609 * Must be a power of two; at least 4, but should be larger to 610 * reduce or eliminate cacheline sharing among queues. 611 * Currently, it is much larger, as a partial workaround for 612 * the fact that JVMs often place arrays in locations that 613 * share GC bookkeeping (especially cardmarks) such that 614 * per-write accesses encounter serious memory contention. 615 */ 616 static final int INITIAL_QUEUE_CAPACITY = 1 << 13; 617 618 /** 619 * Maximum size for queue arrays. Must be a power of two less 620 * than or equal to 1 << (31 - width of array entry) to ensure 621 * lack of wraparound of index calculations, but defined to a 622 * value a bit less than this to help users trap runaway 623 * programs before saturating systems. 624 */ 625 static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M 626 627 // Heuristic padding to ameliorate unfortunate memory placements 628 volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06; 629 630 volatile int eventCount; // encoded inactivation count; < 0 if inactive 631 int nextWait; // encoded record of next event waiter 632 int nsteals; // number of steals 633 int hint; // steal index hint 634 short poolIndex; // index of this queue in pool 635 final short mode; // 0: lifo, > 0: fifo, < 0: shared 636 volatile int qlock; // 1: locked, -1: terminate; else 0 637 volatile int base; // index of next slot for poll 638 int top; // index of next slot for push 639 ForkJoinTask<?>[] array; // the elements (initially unallocated) 640 final ForkJoinPool pool; // the containing pool (may be null) 641 final ForkJoinWorkerThread owner; // owning thread or null if shared 642 volatile Thread parker; // == owner during call to park; else null 643 volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin 644 ForkJoinTask<?> currentSteal; // current non-local task being executed 645 646 volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17; 647 volatile Object pad18, pad19, pad1a, pad1b, pad1c, pad1d; 648 WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner, int mode, int seed)649 WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner, int mode, 650 int seed) { 651 this.pool = pool; 652 this.owner = owner; 653 this.mode = (short)mode; 654 this.hint = seed; // store initial seed for runWorker 655 // Place indices in the center of array (that is not yet allocated) 656 base = top = INITIAL_QUEUE_CAPACITY >>> 1; 657 } 658 659 /** 660 * Returns the approximate number of tasks in the queue. 661 */ queueSize()662 final int queueSize() { 663 int n = base - top; // non-owner callers must read base first 664 return (n >= 0) ? 0 : -n; // ignore transient negative 665 } 666 667 /** 668 * Provides a more accurate estimate of whether this queue has 669 * any tasks than does queueSize, by checking whether a 670 * near-empty queue has at least one unclaimed task. 671 */ isEmpty()672 final boolean isEmpty() { 673 ForkJoinTask<?>[] a; int m, s; 674 int n = base - (s = top); 675 return (n >= 0 || 676 (n == -1 && 677 ((a = array) == null || 678 (m = a.length - 1) < 0 || 679 U.getObject 680 (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null))); 681 } 682 683 /** 684 * Pushes a task. Call only by owner in unshared queues. (The 685 * shared-queue version is embedded in method externalPush.) 686 * 687 * @param task the task. Caller must ensure non-null. 688 * @throws RejectedExecutionException if array cannot be resized 689 */ push(ForkJoinTask<?> task)690 final void push(ForkJoinTask<?> task) { 691 ForkJoinTask<?>[] a; ForkJoinPool p; 692 int s = top, n; 693 if ((a = array) != null) { // ignore if queue removed 694 int m = a.length - 1; 695 U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); 696 if ((n = (top = s + 1) - base) <= 2) 697 (p = pool).signalWork(p.workQueues, this); 698 else if (n >= m) 699 growArray(); 700 } 701 } 702 703 /** 704 * Initializes or doubles the capacity of array. Call either 705 * by owner or with lock held -- it is OK for base, but not 706 * top, to move while resizings are in progress. 707 */ growArray()708 final ForkJoinTask<?>[] growArray() { 709 ForkJoinTask<?>[] oldA = array; 710 int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY; 711 if (size > MAXIMUM_QUEUE_CAPACITY) 712 throw new RejectedExecutionException("Queue capacity exceeded"); 713 int oldMask, t, b; 714 ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size]; 715 if (oldA != null && (oldMask = oldA.length - 1) >= 0 && 716 (t = top) - (b = base) > 0) { 717 int mask = size - 1; 718 do { 719 ForkJoinTask<?> x; 720 int oldj = ((b & oldMask) << ASHIFT) + ABASE; 721 int j = ((b & mask) << ASHIFT) + ABASE; 722 x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj); 723 if (x != null && 724 U.compareAndSwapObject(oldA, oldj, x, null)) 725 U.putObjectVolatile(a, j, x); 726 } while (++b != t); 727 } 728 return a; 729 } 730 731 /** 732 * Takes next task, if one exists, in LIFO order. Call only 733 * by owner in unshared queues. 734 */ pop()735 final ForkJoinTask<?> pop() { 736 ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m; 737 if ((a = array) != null && (m = a.length - 1) >= 0) { 738 for (int s; (s = top - 1) - base >= 0;) { 739 long j = ((m & s) << ASHIFT) + ABASE; 740 if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null) 741 break; 742 if (U.compareAndSwapObject(a, j, t, null)) { 743 top = s; 744 return t; 745 } 746 } 747 } 748 return null; 749 } 750 751 /** 752 * Takes a task in FIFO order if b is base of queue and a task 753 * can be claimed without contention. Specialized versions 754 * appear in ForkJoinPool methods scan and tryHelpStealer. 755 */ pollAt(int b)756 final ForkJoinTask<?> pollAt(int b) { 757 ForkJoinTask<?> t; ForkJoinTask<?>[] a; 758 if ((a = array) != null) { 759 int j = (((a.length - 1) & b) << ASHIFT) + ABASE; 760 if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null && 761 base == b && U.compareAndSwapObject(a, j, t, null)) { 762 U.putOrderedInt(this, QBASE, b + 1); 763 return t; 764 } 765 } 766 return null; 767 } 768 769 /** 770 * Takes next task, if one exists, in FIFO order. 771 */ poll()772 final ForkJoinTask<?> poll() { 773 ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t; 774 while ((b = base) - top < 0 && (a = array) != null) { 775 int j = (((a.length - 1) & b) << ASHIFT) + ABASE; 776 t = (ForkJoinTask<?>)U.getObjectVolatile(a, j); 777 if (t != null) { 778 if (U.compareAndSwapObject(a, j, t, null)) { 779 U.putOrderedInt(this, QBASE, b + 1); 780 return t; 781 } 782 } 783 else if (base == b) { 784 if (b + 1 == top) 785 break; 786 Thread.yield(); // wait for lagging update (very rare) 787 } 788 } 789 return null; 790 } 791 792 /** 793 * Takes next task, if one exists, in order specified by mode. 794 */ nextLocalTask()795 final ForkJoinTask<?> nextLocalTask() { 796 return mode == 0 ? pop() : poll(); 797 } 798 799 /** 800 * Returns next task, if one exists, in order specified by mode. 801 */ peek()802 final ForkJoinTask<?> peek() { 803 ForkJoinTask<?>[] a = array; int m; 804 if (a == null || (m = a.length - 1) < 0) 805 return null; 806 int i = mode == 0 ? top - 1 : base; 807 int j = ((i & m) << ASHIFT) + ABASE; 808 return (ForkJoinTask<?>)U.getObjectVolatile(a, j); 809 } 810 811 /** 812 * Pops the given task only if it is at the current top. 813 * (A shared version is available only via FJP.tryExternalUnpush) 814 */ tryUnpush(ForkJoinTask<?> t)815 final boolean tryUnpush(ForkJoinTask<?> t) { 816 ForkJoinTask<?>[] a; int s; 817 if ((a = array) != null && (s = top) != base && 818 U.compareAndSwapObject 819 (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) { 820 top = s; 821 return true; 822 } 823 return false; 824 } 825 826 /** 827 * Removes and cancels all known tasks, ignoring any exceptions. 828 */ cancelAll()829 final void cancelAll() { 830 ForkJoinTask.cancelIgnoringExceptions(currentJoin); 831 ForkJoinTask.cancelIgnoringExceptions(currentSteal); 832 for (ForkJoinTask<?> t; (t = poll()) != null; ) 833 ForkJoinTask.cancelIgnoringExceptions(t); 834 } 835 836 // Specialized execution methods 837 838 /** 839 * Polls and runs tasks until empty. 840 */ pollAndExecAll()841 final void pollAndExecAll() { 842 for (ForkJoinTask<?> t; (t = poll()) != null;) 843 t.doExec(); 844 } 845 846 /** 847 * Executes a top-level task and any local tasks remaining 848 * after execution. 849 */ runTask(ForkJoinTask<?> task)850 final void runTask(ForkJoinTask<?> task) { 851 if ((currentSteal = task) != null) { 852 task.doExec(); 853 ForkJoinTask<?>[] a = array; 854 int md = mode; 855 ++nsteals; 856 currentSteal = null; 857 if (md != 0) 858 pollAndExecAll(); 859 else if (a != null) { 860 int s, m = a.length - 1; 861 while ((s = top - 1) - base >= 0) { 862 long i = ((m & s) << ASHIFT) + ABASE; 863 ForkJoinTask<?> t = (ForkJoinTask<?>)U.getObject(a, i); 864 if (t == null) 865 break; 866 if (U.compareAndSwapObject(a, i, t, null)) { 867 top = s; 868 t.doExec(); 869 } 870 } 871 } 872 } 873 } 874 875 /** 876 * If present, removes from queue and executes the given task, 877 * or any other cancelled task. Returns (true) on any CAS 878 * or consistency check failure so caller can retry. 879 * 880 * @return false if no progress can be made, else true 881 */ tryRemoveAndExec(ForkJoinTask<?> task)882 final boolean tryRemoveAndExec(ForkJoinTask<?> task) { 883 boolean stat; 884 ForkJoinTask<?>[] a; int m, s, b, n; 885 if (task != null && (a = array) != null && (m = a.length - 1) >= 0 && 886 (n = (s = top) - (b = base)) > 0) { 887 boolean removed = false, empty = true; 888 stat = true; 889 for (ForkJoinTask<?> t;;) { // traverse from s to b 890 long j = ((--s & m) << ASHIFT) + ABASE; 891 t = (ForkJoinTask<?>)U.getObject(a, j); 892 if (t == null) // inconsistent length 893 break; 894 else if (t == task) { 895 if (s + 1 == top) { // pop 896 if (!U.compareAndSwapObject(a, j, task, null)) 897 break; 898 top = s; 899 removed = true; 900 } 901 else if (base == b) // replace with proxy 902 removed = U.compareAndSwapObject(a, j, task, 903 new EmptyTask()); 904 break; 905 } 906 else if (t.status >= 0) 907 empty = false; 908 else if (s + 1 == top) { // pop and throw away 909 if (U.compareAndSwapObject(a, j, t, null)) 910 top = s; 911 break; 912 } 913 if (--n == 0) { 914 if (!empty && base == b) 915 stat = false; 916 break; 917 } 918 } 919 if (removed) 920 task.doExec(); 921 } 922 else 923 stat = false; 924 return stat; 925 } 926 927 /** 928 * Tries to poll for and execute the given task or any other 929 * task in its CountedCompleter computation. 930 */ pollAndExecCC(CountedCompleter<?> root)931 final boolean pollAndExecCC(CountedCompleter<?> root) { 932 ForkJoinTask<?>[] a; int b; Object o; CountedCompleter<?> t, r; 933 if ((b = base) - top < 0 && (a = array) != null) { 934 long j = (((a.length - 1) & b) << ASHIFT) + ABASE; 935 if ((o = U.getObjectVolatile(a, j)) == null) 936 return true; // retry 937 if (o instanceof CountedCompleter) { 938 for (t = (CountedCompleter<?>)o, r = t;;) { 939 if (r == root) { 940 if (base == b && 941 U.compareAndSwapObject(a, j, t, null)) { 942 U.putOrderedInt(this, QBASE, b + 1); 943 t.doExec(); 944 } 945 return true; 946 } 947 else if ((r = r.completer) == null) 948 break; // not part of root computation 949 } 950 } 951 } 952 return false; 953 } 954 955 /** 956 * Tries to pop and execute the given task or any other task 957 * in its CountedCompleter computation. 958 */ externalPopAndExecCC(CountedCompleter<?> root)959 final boolean externalPopAndExecCC(CountedCompleter<?> root) { 960 ForkJoinTask<?>[] a; int s; Object o; CountedCompleter<?> t, r; 961 if (base - (s = top) < 0 && (a = array) != null) { 962 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; 963 if ((o = U.getObject(a, j)) instanceof CountedCompleter) { 964 for (t = (CountedCompleter<?>)o, r = t;;) { 965 if (r == root) { 966 if (U.compareAndSwapInt(this, QLOCK, 0, 1)) { 967 if (top == s && array == a && 968 U.compareAndSwapObject(a, j, t, null)) { 969 top = s - 1; 970 qlock = 0; 971 t.doExec(); 972 } 973 else 974 qlock = 0; 975 } 976 return true; 977 } 978 else if ((r = r.completer) == null) 979 break; 980 } 981 } 982 } 983 return false; 984 } 985 986 /** 987 * Internal version 988 */ internalPopAndExecCC(CountedCompleter<?> root)989 final boolean internalPopAndExecCC(CountedCompleter<?> root) { 990 ForkJoinTask<?>[] a; int s; Object o; CountedCompleter<?> t, r; 991 if (base - (s = top) < 0 && (a = array) != null) { 992 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; 993 if ((o = U.getObject(a, j)) instanceof CountedCompleter) { 994 for (t = (CountedCompleter<?>)o, r = t;;) { 995 if (r == root) { 996 if (U.compareAndSwapObject(a, j, t, null)) { 997 top = s - 1; 998 t.doExec(); 999 } 1000 return true; 1001 } 1002 else if ((r = r.completer) == null) 1003 break; 1004 } 1005 } 1006 } 1007 return false; 1008 } 1009 1010 /** 1011 * Returns true if owned and not known to be blocked. 1012 */ isApparentlyUnblocked()1013 final boolean isApparentlyUnblocked() { 1014 Thread wt; Thread.State s; 1015 return (eventCount >= 0 && 1016 (wt = owner) != null && 1017 (s = wt.getState()) != Thread.State.BLOCKED && 1018 s != Thread.State.WAITING && 1019 s != Thread.State.TIMED_WAITING); 1020 } 1021 1022 // Unsafe mechanics 1023 private static final sun.misc.Unsafe U; 1024 private static final long QBASE; 1025 private static final long QLOCK; 1026 private static final int ABASE; 1027 private static final int ASHIFT; 1028 static { 1029 try { 1030 U = sun.misc.Unsafe.getUnsafe(); 1031 Class<?> k = WorkQueue.class; 1032 Class<?> ak = ForkJoinTask[].class; 1033 QBASE = U.objectFieldOffset 1034 (k.getDeclaredField("base")); 1035 QLOCK = U.objectFieldOffset 1036 (k.getDeclaredField("qlock")); 1037 ABASE = U.arrayBaseOffset(ak); 1038 int scale = U.arrayIndexScale(ak); 1039 if ((scale & (scale - 1)) != 0) 1040 throw new Error("data type scale not a power of two"); 1041 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); 1042 } catch (Exception e) { 1043 throw new Error(e); 1044 } 1045 } 1046 } 1047 1048 // static fields (initialized in static initializer below) 1049 1050 /** 1051 * Per-thread submission bookkeeping. Shared across all pools 1052 * to reduce ThreadLocal pollution and because random motion 1053 * to avoid contention in one pool is likely to hold for others. 1054 * Lazily initialized on first submission (but null-checked 1055 * in other contexts to avoid unnecessary initialization). 1056 */ 1057 static final ThreadLocal<Submitter> submitters; 1058 1059 /** 1060 * Creates a new ForkJoinWorkerThread. This factory is used unless 1061 * overridden in ForkJoinPool constructors. 1062 */ 1063 public static final ForkJoinWorkerThreadFactory 1064 defaultForkJoinWorkerThreadFactory; 1065 1066 /** 1067 * Permission required for callers of methods that may start or 1068 * kill threads. 1069 */ 1070 private static final RuntimePermission modifyThreadPermission; 1071 1072 /** 1073 * Common (static) pool. Non-null for public use unless a static 1074 * construction exception, but internal usages null-check on use 1075 * to paranoically avoid potential initialization circularities 1076 * as well as to simplify generated code. 1077 */ 1078 static final ForkJoinPool common; 1079 1080 /** 1081 * Common pool parallelism. To allow simpler use and management 1082 * when common pool threads are disabled, we allow the underlying 1083 * common.parallelism field to be zero, but in that case still report 1084 * parallelism as 1 to reflect resulting caller-runs mechanics. 1085 */ 1086 static final int commonParallelism; 1087 1088 /** 1089 * Sequence number for creating workerNamePrefix. 1090 */ 1091 private static int poolNumberSequence; 1092 1093 /** 1094 * Returns the next sequence number. We don't expect this to 1095 * ever contend, so use simple builtin sync. 1096 */ nextPoolId()1097 private static final synchronized int nextPoolId() { 1098 return ++poolNumberSequence; 1099 } 1100 1101 // static constants 1102 1103 /** 1104 * Initial timeout value (in nanoseconds) for the thread 1105 * triggering quiescence to park waiting for new work. On timeout, 1106 * the thread will instead try to shrink the number of 1107 * workers. The value should be large enough to avoid overly 1108 * aggressive shrinkage during most transient stalls (long GCs 1109 * etc). 1110 */ 1111 private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec 1112 1113 /** 1114 * Timeout value when there are more threads than parallelism level 1115 */ 1116 private static final long FAST_IDLE_TIMEOUT = 200L * 1000L * 1000L; 1117 1118 /** 1119 * Tolerance for idle timeouts, to cope with timer undershoots 1120 */ 1121 private static final long TIMEOUT_SLOP = 2000000L; 1122 1123 /** 1124 * The maximum stolen->joining link depth allowed in method 1125 * tryHelpStealer. Must be a power of two. Depths for legitimate 1126 * chains are unbounded, but we use a fixed constant to avoid 1127 * (otherwise unchecked) cycles and to bound staleness of 1128 * traversal parameters at the expense of sometimes blocking when 1129 * we could be helping. 1130 */ 1131 private static final int MAX_HELP = 64; 1132 1133 /** 1134 * Increment for seed generators. See class ThreadLocal for 1135 * explanation. 1136 */ 1137 private static final int SEED_INCREMENT = 0x61c88647; 1138 1139 /* 1140 * Bits and masks for control variables 1141 * 1142 * Field ctl is a long packed with: 1143 * AC: Number of active running workers minus target parallelism (16 bits) 1144 * TC: Number of total workers minus target parallelism (16 bits) 1145 * ST: true if pool is terminating (1 bit) 1146 * EC: the wait count of top waiting thread (15 bits) 1147 * ID: poolIndex of top of Treiber stack of waiters (16 bits) 1148 * 1149 * When convenient, we can extract the upper 32 bits of counts and 1150 * the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e = 1151 * (int)ctl. The ec field is never accessed alone, but always 1152 * together with id and st. The offsets of counts by the target 1153 * parallelism and the positionings of fields makes it possible to 1154 * perform the most common checks via sign tests of fields: When 1155 * ac is negative, there are not enough active workers, when tc is 1156 * negative, there are not enough total workers, and when e is 1157 * negative, the pool is terminating. To deal with these possibly 1158 * negative fields, we use casts in and out of "short" and/or 1159 * signed shifts to maintain signedness. 1160 * 1161 * When a thread is queued (inactivated), its eventCount field is 1162 * set negative, which is the only way to tell if a worker is 1163 * prevented from executing tasks, even though it must continue to 1164 * scan for them to avoid queuing races. Note however that 1165 * eventCount updates lag releases so usage requires care. 1166 * 1167 * Field plock is an int packed with: 1168 * SHUTDOWN: true if shutdown is enabled (1 bit) 1169 * SEQ: a sequence lock, with PL_LOCK bit set if locked (30 bits) 1170 * SIGNAL: set when threads may be waiting on the lock (1 bit) 1171 * 1172 * The sequence number enables simple consistency checks: 1173 * Staleness of read-only operations on the workQueues array can 1174 * be checked by comparing plock before vs after the reads. 1175 */ 1176 1177 // bit positions/shifts for fields 1178 private static final int AC_SHIFT = 48; 1179 private static final int TC_SHIFT = 32; 1180 private static final int ST_SHIFT = 31; 1181 private static final int EC_SHIFT = 16; 1182 1183 // bounds 1184 private static final int SMASK = 0xffff; // short bits 1185 private static final int MAX_CAP = 0x7fff; // max #workers - 1 1186 private static final int EVENMASK = 0xfffe; // even short bits 1187 private static final int SQMASK = 0x007e; // max 64 (even) slots 1188 private static final int SHORT_SIGN = 1 << 15; 1189 private static final int INT_SIGN = 1 << 31; 1190 1191 // masks 1192 private static final long STOP_BIT = 0x0001L << ST_SHIFT; 1193 private static final long AC_MASK = ((long)SMASK) << AC_SHIFT; 1194 private static final long TC_MASK = ((long)SMASK) << TC_SHIFT; 1195 1196 // units for incrementing and decrementing 1197 private static final long TC_UNIT = 1L << TC_SHIFT; 1198 private static final long AC_UNIT = 1L << AC_SHIFT; 1199 1200 // masks and units for dealing with u = (int)(ctl >>> 32) 1201 private static final int UAC_SHIFT = AC_SHIFT - 32; 1202 private static final int UTC_SHIFT = TC_SHIFT - 32; 1203 private static final int UAC_MASK = SMASK << UAC_SHIFT; 1204 private static final int UTC_MASK = SMASK << UTC_SHIFT; 1205 private static final int UAC_UNIT = 1 << UAC_SHIFT; 1206 private static final int UTC_UNIT = 1 << UTC_SHIFT; 1207 1208 // masks and units for dealing with e = (int)ctl 1209 private static final int E_MASK = 0x7fffffff; // no STOP_BIT 1210 private static final int E_SEQ = 1 << EC_SHIFT; 1211 1212 // plock bits 1213 private static final int SHUTDOWN = 1 << 31; 1214 private static final int PL_LOCK = 2; 1215 private static final int PL_SIGNAL = 1; 1216 private static final int PL_SPINS = 1 << 8; 1217 1218 // access mode for WorkQueue 1219 static final int LIFO_QUEUE = 0; 1220 static final int FIFO_QUEUE = 1; 1221 static final int SHARED_QUEUE = -1; 1222 1223 // Heuristic padding to ameliorate unfortunate memory placements 1224 volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06; 1225 1226 // Instance fields 1227 volatile long stealCount; // collects worker counts 1228 volatile long ctl; // main pool control 1229 volatile int plock; // shutdown status and seqLock 1230 volatile int indexSeed; // worker/submitter index seed 1231 final short parallelism; // parallelism level 1232 final short mode; // LIFO/FIFO 1233 WorkQueue[] workQueues; // main registry 1234 final ForkJoinWorkerThreadFactory factory; 1235 final UncaughtExceptionHandler ueh; // per-worker UEH 1236 final String workerNamePrefix; // to create worker name string 1237 1238 volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17; 1239 volatile Object pad18, pad19, pad1a, pad1b; 1240 1241 /** 1242 * Acquires the plock lock to protect worker array and related 1243 * updates. This method is called only if an initial CAS on plock 1244 * fails. This acts as a spinlock for normal cases, but falls back 1245 * to builtin monitor to block when (rarely) needed. This would be 1246 * a terrible idea for a highly contended lock, but works fine as 1247 * a more conservative alternative to a pure spinlock. 1248 */ acquirePlock()1249 private int acquirePlock() { 1250 int spins = PL_SPINS, ps, nps; 1251 for (;;) { 1252 if (((ps = plock) & PL_LOCK) == 0 && 1253 U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK)) 1254 return nps; 1255 else if (spins >= 0) { 1256 if (ThreadLocalRandom.current().nextInt() >= 0) 1257 --spins; 1258 } 1259 else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) { 1260 synchronized (this) { 1261 if ((plock & PL_SIGNAL) != 0) { 1262 try { 1263 wait(); 1264 } catch (InterruptedException ie) { 1265 try { 1266 Thread.currentThread().interrupt(); 1267 } catch (SecurityException ignore) { 1268 } 1269 } 1270 } 1271 else 1272 notifyAll(); 1273 } 1274 } 1275 } 1276 } 1277 1278 /** 1279 * Unlocks and signals any thread waiting for plock. Called only 1280 * when CAS of seq value for unlock fails. 1281 */ releasePlock(int ps)1282 private void releasePlock(int ps) { 1283 plock = ps; 1284 synchronized (this) { notifyAll(); } 1285 } 1286 1287 /** 1288 * Tries to create and start one worker if fewer than target 1289 * parallelism level exist. Adjusts counts etc on failure. 1290 */ tryAddWorker()1291 private void tryAddWorker() { 1292 long c; int u, e; 1293 while ((u = (int)((c = ctl) >>> 32)) < 0 && 1294 (u & SHORT_SIGN) != 0 && (e = (int)c) >= 0) { 1295 long nc = ((long)(((u + UTC_UNIT) & UTC_MASK) | 1296 ((u + UAC_UNIT) & UAC_MASK)) << 32) | (long)e; 1297 if (U.compareAndSwapLong(this, CTL, c, nc)) { 1298 ForkJoinWorkerThreadFactory fac; 1299 Throwable ex = null; 1300 ForkJoinWorkerThread wt = null; 1301 try { 1302 if ((fac = factory) != null && 1303 (wt = fac.newThread(this)) != null) { 1304 wt.start(); 1305 break; 1306 } 1307 } catch (Throwable rex) { 1308 ex = rex; 1309 } 1310 deregisterWorker(wt, ex); 1311 break; 1312 } 1313 } 1314 } 1315 1316 // Registering and deregistering workers 1317 1318 /** 1319 * Callback from ForkJoinWorkerThread to establish and record its 1320 * WorkQueue. To avoid scanning bias due to packing entries in 1321 * front of the workQueues array, we treat the array as a simple 1322 * power-of-two hash table using per-thread seed as hash, 1323 * expanding as needed. 1324 * 1325 * @param wt the worker thread 1326 * @return the worker's queue 1327 */ registerWorker(ForkJoinWorkerThread wt)1328 final WorkQueue registerWorker(ForkJoinWorkerThread wt) { 1329 UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps; 1330 wt.setDaemon(true); 1331 if ((handler = ueh) != null) 1332 wt.setUncaughtExceptionHandler(handler); 1333 do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed, 1334 s += SEED_INCREMENT) || 1335 s == 0); // skip 0 1336 WorkQueue w = new WorkQueue(this, wt, mode, s); 1337 if (((ps = plock) & PL_LOCK) != 0 || 1338 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) 1339 ps = acquirePlock(); 1340 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); 1341 try { 1342 if ((ws = workQueues) != null) { // skip if shutting down 1343 int n = ws.length, m = n - 1; 1344 int r = (s << 1) | 1; // use odd-numbered indices 1345 if (ws[r &= m] != null) { // collision 1346 int probes = 0; // step by approx half size 1347 int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; 1348 while (ws[r = (r + step) & m] != null) { 1349 if (++probes >= n) { 1350 workQueues = ws = Arrays.copyOf(ws, n <<= 1); 1351 m = n - 1; 1352 probes = 0; 1353 } 1354 } 1355 } 1356 w.poolIndex = (short)r; 1357 w.eventCount = r; // volatile write orders 1358 ws[r] = w; 1359 } 1360 } finally { 1361 if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) 1362 releasePlock(nps); 1363 } 1364 wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex >>> 1))); 1365 return w; 1366 } 1367 1368 /** 1369 * Final callback from terminating worker, as well as upon failure 1370 * to construct or start a worker. Removes record of worker from 1371 * array, and adjusts counts. If pool is shutting down, tries to 1372 * complete termination. 1373 * 1374 * @param wt the worker thread, or null if construction failed 1375 * @param ex the exception causing failure, or null if none 1376 */ deregisterWorker(ForkJoinWorkerThread wt, Throwable ex)1377 final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { 1378 WorkQueue w = null; 1379 if (wt != null && (w = wt.workQueue) != null) { 1380 int ps; long sc; 1381 w.qlock = -1; // ensure set 1382 do {} while (!U.compareAndSwapLong(this, STEALCOUNT, 1383 sc = stealCount, 1384 sc + w.nsteals)); 1385 if (((ps = plock) & PL_LOCK) != 0 || 1386 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) 1387 ps = acquirePlock(); 1388 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); 1389 try { 1390 int idx = w.poolIndex; 1391 WorkQueue[] ws = workQueues; 1392 if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w) 1393 ws[idx] = null; 1394 } finally { 1395 if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) 1396 releasePlock(nps); 1397 } 1398 } 1399 1400 long c; // adjust ctl counts 1401 do {} while (!U.compareAndSwapLong 1402 (this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) | 1403 ((c - TC_UNIT) & TC_MASK) | 1404 (c & ~(AC_MASK|TC_MASK))))); 1405 1406 if (!tryTerminate(false, false) && w != null && w.array != null) { 1407 w.cancelAll(); // cancel remaining tasks 1408 WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e; 1409 while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) { 1410 if (e > 0) { // activate or create replacement 1411 if ((ws = workQueues) == null || 1412 (i = e & SMASK) >= ws.length || 1413 (v = ws[i]) == null) 1414 break; 1415 long nc = (((long)(v.nextWait & E_MASK)) | 1416 ((long)(u + UAC_UNIT) << 32)); 1417 if (v.eventCount != (e | INT_SIGN)) 1418 break; 1419 if (U.compareAndSwapLong(this, CTL, c, nc)) { 1420 v.eventCount = (e + E_SEQ) & E_MASK; 1421 if ((p = v.parker) != null) 1422 U.unpark(p); 1423 break; 1424 } 1425 } 1426 else { 1427 if ((short)u < 0) 1428 tryAddWorker(); 1429 break; 1430 } 1431 } 1432 } 1433 if (ex == null) // help clean refs on way out 1434 ForkJoinTask.helpExpungeStaleExceptions(); 1435 else // rethrow 1436 ForkJoinTask.rethrow(ex); 1437 } 1438 1439 // Submissions 1440 1441 /** 1442 * Per-thread records for threads that submit to pools. Currently 1443 * holds only pseudo-random seed / index that is used to choose 1444 * submission queues in method externalPush. In the future, this may 1445 * also incorporate a means to implement different task rejection 1446 * and resubmission policies. 1447 * 1448 * Seeds for submitters and workers/workQueues work in basically 1449 * the same way but are initialized and updated using slightly 1450 * different mechanics. Both are initialized using the same 1451 * approach as in class ThreadLocal, where successive values are 1452 * unlikely to collide with previous values. Seeds are then 1453 * randomly modified upon collisions using xorshifts, which 1454 * requires a non-zero seed. 1455 */ 1456 static final class Submitter { 1457 int seed; Submitter(int s)1458 Submitter(int s) { seed = s; } 1459 } 1460 1461 /** 1462 * Unless shutting down, adds the given task to a submission queue 1463 * at submitter's current queue index (modulo submission 1464 * range). Only the most common path is directly handled in this 1465 * method. All others are relayed to fullExternalPush. 1466 * 1467 * @param task the task. Caller must ensure non-null. 1468 */ externalPush(ForkJoinTask<?> task)1469 final void externalPush(ForkJoinTask<?> task) { 1470 Submitter z = submitters.get(); 1471 WorkQueue q; int r, m, s, n, am; ForkJoinTask<?>[] a; 1472 int ps = plock; 1473 WorkQueue[] ws = workQueues; 1474 if (z != null && ps > 0 && ws != null && (m = (ws.length - 1)) >= 0 && 1475 (q = ws[m & (r = z.seed) & SQMASK]) != null && r != 0 && 1476 U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock 1477 if ((a = q.array) != null && 1478 (am = a.length - 1) > (n = (s = q.top) - q.base)) { 1479 int j = ((am & s) << ASHIFT) + ABASE; 1480 U.putOrderedObject(a, j, task); 1481 q.top = s + 1; // push on to deque 1482 q.qlock = 0; 1483 if (n <= 1) 1484 signalWork(ws, q); 1485 return; 1486 } 1487 q.qlock = 0; 1488 } 1489 fullExternalPush(task); 1490 } 1491 1492 /** 1493 * Full version of externalPush. This method is called, among 1494 * other times, upon the first submission of the first task to the 1495 * pool, so must perform secondary initialization. It also 1496 * detects first submission by an external thread by looking up 1497 * its ThreadLocal, and creates a new shared queue if the one at 1498 * index if empty or contended. The plock lock body must be 1499 * exception-free (so no try/finally) so we optimistically 1500 * allocate new queues outside the lock and throw them away if 1501 * (very rarely) not needed. 1502 * 1503 * Secondary initialization occurs when plock is zero, to create 1504 * workQueue array and set plock to a valid value. This lock body 1505 * must also be exception-free. Because the plock seq value can 1506 * eventually wrap around zero, this method harmlessly fails to 1507 * reinitialize if workQueues exists, while still advancing plock. 1508 */ fullExternalPush(ForkJoinTask<?> task)1509 private void fullExternalPush(ForkJoinTask<?> task) { 1510 int r = 0; // random index seed 1511 for (Submitter z = submitters.get();;) { 1512 WorkQueue[] ws; WorkQueue q; int ps, m, k; 1513 if (z == null) { 1514 if (U.compareAndSwapInt(this, INDEXSEED, r = indexSeed, 1515 r += SEED_INCREMENT) && r != 0) 1516 submitters.set(z = new Submitter(r)); 1517 } 1518 else if (r == 0) { // move to a different index 1519 r = z.seed; 1520 r ^= r << 13; // same xorshift as WorkQueues 1521 r ^= r >>> 17; 1522 z.seed = r ^= (r << 5); 1523 } 1524 if ((ps = plock) < 0) 1525 throw new RejectedExecutionException(); 1526 else if (ps == 0 || (ws = workQueues) == null || 1527 (m = ws.length - 1) < 0) { // initialize workQueues 1528 int p = parallelism; // find power of two table size 1529 int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots 1530 n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; 1531 n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; 1532 WorkQueue[] nws = ((ws = workQueues) == null || ws.length == 0 ? 1533 new WorkQueue[n] : null); 1534 if (((ps = plock) & PL_LOCK) != 0 || 1535 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) 1536 ps = acquirePlock(); 1537 if (((ws = workQueues) == null || ws.length == 0) && nws != null) 1538 workQueues = nws; 1539 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); 1540 if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) 1541 releasePlock(nps); 1542 } 1543 else if ((q = ws[k = r & m & SQMASK]) != null) { 1544 if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { 1545 ForkJoinTask<?>[] a = q.array; 1546 int s = q.top; 1547 boolean submitted = false; 1548 try { // locked version of push 1549 if ((a != null && a.length > s + 1 - q.base) || 1550 (a = q.growArray()) != null) { // must presize 1551 int j = (((a.length - 1) & s) << ASHIFT) + ABASE; 1552 U.putOrderedObject(a, j, task); 1553 q.top = s + 1; 1554 submitted = true; 1555 } 1556 } finally { 1557 q.qlock = 0; // unlock 1558 } 1559 if (submitted) { 1560 signalWork(ws, q); 1561 return; 1562 } 1563 } 1564 r = 0; // move on failure 1565 } 1566 else if (((ps = plock) & PL_LOCK) == 0) { // create new queue 1567 q = new WorkQueue(this, null, SHARED_QUEUE, r); 1568 q.poolIndex = (short)k; 1569 if (((ps = plock) & PL_LOCK) != 0 || 1570 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) 1571 ps = acquirePlock(); 1572 if ((ws = workQueues) != null && k < ws.length && ws[k] == null) 1573 ws[k] = q; 1574 int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); 1575 if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) 1576 releasePlock(nps); 1577 } 1578 else 1579 r = 0; 1580 } 1581 } 1582 1583 // Maintaining ctl counts 1584 1585 /** 1586 * Increments active count; mainly called upon return from blocking. 1587 */ incrementActiveCount()1588 final void incrementActiveCount() { 1589 long c; 1590 do {} while (!U.compareAndSwapLong 1591 (this, CTL, c = ctl, ((c & ~AC_MASK) | 1592 ((c & AC_MASK) + AC_UNIT)))); 1593 } 1594 1595 /** 1596 * Tries to create or activate a worker if too few are active. 1597 * 1598 * @param ws the worker array to use to find signallees 1599 * @param q if non-null, the queue holding tasks to be processed 1600 */ signalWork(WorkQueue[] ws, WorkQueue q)1601 final void signalWork(WorkQueue[] ws, WorkQueue q) { 1602 for (;;) { 1603 long c; int e, u, i; WorkQueue w; Thread p; 1604 if ((u = (int)((c = ctl) >>> 32)) >= 0) 1605 break; 1606 if ((e = (int)c) <= 0) { 1607 if ((short)u < 0) 1608 tryAddWorker(); 1609 break; 1610 } 1611 if (ws == null || ws.length <= (i = e & SMASK) || 1612 (w = ws[i]) == null) 1613 break; 1614 long nc = (((long)(w.nextWait & E_MASK)) | 1615 ((long)(u + UAC_UNIT)) << 32); 1616 int ne = (e + E_SEQ) & E_MASK; 1617 if (w.eventCount == (e | INT_SIGN) && 1618 U.compareAndSwapLong(this, CTL, c, nc)) { 1619 w.eventCount = ne; 1620 if ((p = w.parker) != null) 1621 U.unpark(p); 1622 break; 1623 } 1624 if (q != null && q.base >= q.top) 1625 break; 1626 } 1627 } 1628 1629 // Scanning for tasks 1630 1631 /** 1632 * Top-level runloop for workers, called by ForkJoinWorkerThread.run. 1633 */ runWorker(WorkQueue w)1634 final void runWorker(WorkQueue w) { 1635 w.growArray(); // allocate queue 1636 for (int r = w.hint; scan(w, r) == 0; ) { 1637 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift 1638 } 1639 } 1640 1641 /** 1642 * Scans for and, if found, runs one task, else possibly 1643 * inactivates the worker. This method operates on single reads of 1644 * volatile state and is designed to be re-invoked continuously, 1645 * in part because it returns upon detecting inconsistencies, 1646 * contention, or state changes that indicate possible success on 1647 * re-invocation. 1648 * 1649 * The scan searches for tasks across queues starting at a random 1650 * index, checking each at least twice. The scan terminates upon 1651 * either finding a non-empty queue, or completing the sweep. If 1652 * the worker is not inactivated, it takes and runs a task from 1653 * this queue. Otherwise, if not activated, it tries to activate 1654 * itself or some other worker by signalling. On failure to find a 1655 * task, returns (for retry) if pool state may have changed during 1656 * an empty scan, or tries to inactivate if active, else possibly 1657 * blocks or terminates via method awaitWork. 1658 * 1659 * @param w the worker (via its WorkQueue) 1660 * @param r a random seed 1661 * @return worker qlock status if would have waited, else 0 1662 */ scan(WorkQueue w, int r)1663 private final int scan(WorkQueue w, int r) { 1664 WorkQueue[] ws; int m; 1665 long c = ctl; // for consistency check 1666 if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && w != null) { 1667 for (int j = m + m + 1, ec = w.eventCount;;) { 1668 WorkQueue q; int b, e; ForkJoinTask<?>[] a; ForkJoinTask<?> t; 1669 if ((q = ws[(r - j) & m]) != null && 1670 (b = q.base) - q.top < 0 && (a = q.array) != null) { 1671 long i = (((a.length - 1) & b) << ASHIFT) + ABASE; 1672 if ((t = ((ForkJoinTask<?>) 1673 U.getObjectVolatile(a, i))) != null) { 1674 if (ec < 0) 1675 helpRelease(c, ws, w, q, b); 1676 else if (q.base == b && 1677 U.compareAndSwapObject(a, i, t, null)) { 1678 U.putOrderedInt(q, QBASE, b + 1); 1679 if ((b + 1) - q.top < 0) 1680 signalWork(ws, q); 1681 w.runTask(t); 1682 } 1683 } 1684 break; 1685 } 1686 else if (--j < 0) { 1687 if ((ec | (e = (int)c)) < 0) // inactive or terminating 1688 return awaitWork(w, c, ec); 1689 else if (ctl == c) { // try to inactivate and enqueue 1690 long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK)); 1691 w.nextWait = e; 1692 w.eventCount = ec | INT_SIGN; 1693 if (!U.compareAndSwapLong(this, CTL, c, nc)) 1694 w.eventCount = ec; // back out 1695 } 1696 break; 1697 } 1698 } 1699 } 1700 return 0; 1701 } 1702 1703 /** 1704 * A continuation of scan(), possibly blocking or terminating 1705 * worker w. Returns without blocking if pool state has apparently 1706 * changed since last invocation. Also, if inactivating w has 1707 * caused the pool to become quiescent, checks for pool 1708 * termination, and, so long as this is not the only worker, waits 1709 * for event for up to a given duration. On timeout, if ctl has 1710 * not changed, terminates the worker, which will in turn wake up 1711 * another worker to possibly repeat this process. 1712 * 1713 * @param w the calling worker 1714 * @param c the ctl value on entry to scan 1715 * @param ec the worker's eventCount on entry to scan 1716 */ awaitWork(WorkQueue w, long c, int ec)1717 private final int awaitWork(WorkQueue w, long c, int ec) { 1718 int stat, ns; long parkTime, deadline; 1719 if ((stat = w.qlock) >= 0 && w.eventCount == ec && ctl == c && 1720 !Thread.interrupted()) { 1721 int e = (int)c; 1722 int u = (int)(c >>> 32); 1723 int d = (u >> UAC_SHIFT) + parallelism; // active count 1724 1725 if (e < 0 || (d <= 0 && tryTerminate(false, false))) 1726 stat = w.qlock = -1; // pool is terminating 1727 else if ((ns = w.nsteals) != 0) { // collect steals and retry 1728 long sc; 1729 w.nsteals = 0; 1730 do {} while (!U.compareAndSwapLong(this, STEALCOUNT, 1731 sc = stealCount, sc + ns)); 1732 } 1733 else { 1734 long pc = ((d > 0 || ec != (e | INT_SIGN)) ? 0L : 1735 ((long)(w.nextWait & E_MASK)) | // ctl to restore 1736 ((long)(u + UAC_UNIT)) << 32); 1737 if (pc != 0L) { // timed wait if last waiter 1738 int dc = -(short)(c >>> TC_SHIFT); 1739 parkTime = (dc < 0 ? FAST_IDLE_TIMEOUT: 1740 (dc + 1) * IDLE_TIMEOUT); 1741 deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP; 1742 } 1743 else 1744 parkTime = deadline = 0L; 1745 if (w.eventCount == ec && ctl == c) { 1746 Thread wt = Thread.currentThread(); 1747 U.putObject(wt, PARKBLOCKER, this); 1748 w.parker = wt; // emulate LockSupport.park 1749 if (w.eventCount == ec && ctl == c) 1750 U.park(false, parkTime); // must recheck before park 1751 w.parker = null; 1752 U.putObject(wt, PARKBLOCKER, null); 1753 if (parkTime != 0L && ctl == c && 1754 deadline - System.nanoTime() <= 0L && 1755 U.compareAndSwapLong(this, CTL, c, pc)) 1756 stat = w.qlock = -1; // shrink pool 1757 } 1758 } 1759 } 1760 return stat; 1761 } 1762 1763 /** 1764 * Possibly releases (signals) a worker. Called only from scan() 1765 * when a worker with apparently inactive status finds a non-empty 1766 * queue. This requires revalidating all of the associated state 1767 * from caller. 1768 */ helpRelease(long c, WorkQueue[] ws, WorkQueue w, WorkQueue q, int b)1769 private final void helpRelease(long c, WorkQueue[] ws, WorkQueue w, 1770 WorkQueue q, int b) { 1771 WorkQueue v; int e, i; Thread p; 1772 if (w != null && w.eventCount < 0 && (e = (int)c) > 0 && 1773 ws != null && ws.length > (i = e & SMASK) && 1774 (v = ws[i]) != null && ctl == c) { 1775 long nc = (((long)(v.nextWait & E_MASK)) | 1776 ((long)((int)(c >>> 32) + UAC_UNIT)) << 32); 1777 int ne = (e + E_SEQ) & E_MASK; 1778 if (q != null && q.base == b && w.eventCount < 0 && 1779 v.eventCount == (e | INT_SIGN) && 1780 U.compareAndSwapLong(this, CTL, c, nc)) { 1781 v.eventCount = ne; 1782 if ((p = v.parker) != null) 1783 U.unpark(p); 1784 } 1785 } 1786 } 1787 1788 /** 1789 * Tries to locate and execute tasks for a stealer of the given 1790 * task, or in turn one of its stealers, Traces currentSteal -> 1791 * currentJoin links looking for a thread working on a descendant 1792 * of the given task and with a non-empty queue to steal back and 1793 * execute tasks from. The first call to this method upon a 1794 * waiting join will often entail scanning/search, (which is OK 1795 * because the joiner has nothing better to do), but this method 1796 * leaves hints in workers to speed up subsequent calls. The 1797 * implementation is very branchy to cope with potential 1798 * inconsistencies or loops encountering chains that are stale, 1799 * unknown, or so long that they are likely cyclic. 1800 * 1801 * @param joiner the joining worker 1802 * @param task the task to join 1803 * @return 0 if no progress can be made, negative if task 1804 * known complete, else positive 1805 */ tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task)1806 private int tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) { 1807 int stat = 0, steps = 0; // bound to avoid cycles 1808 if (task != null && joiner != null && 1809 joiner.base - joiner.top >= 0) { // hoist checks 1810 restart: for (;;) { 1811 ForkJoinTask<?> subtask = task; // current target 1812 for (WorkQueue j = joiner, v;;) { // v is stealer of subtask 1813 WorkQueue[] ws; int m, s, h; 1814 if ((s = task.status) < 0) { 1815 stat = s; 1816 break restart; 1817 } 1818 if ((ws = workQueues) == null || (m = ws.length - 1) <= 0) 1819 break restart; // shutting down 1820 if ((v = ws[h = (j.hint | 1) & m]) == null || 1821 v.currentSteal != subtask) { 1822 for (int origin = h;;) { // find stealer 1823 if (((h = (h + 2) & m) & 15) == 1 && 1824 (subtask.status < 0 || j.currentJoin != subtask)) 1825 continue restart; // occasional staleness check 1826 if ((v = ws[h]) != null && 1827 v.currentSteal == subtask) { 1828 j.hint = h; // save hint 1829 break; 1830 } 1831 if (h == origin) 1832 break restart; // cannot find stealer 1833 } 1834 } 1835 for (;;) { // help stealer or descend to its stealer 1836 ForkJoinTask[] a; int b; 1837 if (subtask.status < 0) // surround probes with 1838 continue restart; // consistency checks 1839 if ((b = v.base) - v.top < 0 && (a = v.array) != null) { 1840 int i = (((a.length - 1) & b) << ASHIFT) + ABASE; 1841 ForkJoinTask<?> t = 1842 (ForkJoinTask<?>)U.getObjectVolatile(a, i); 1843 if (subtask.status < 0 || j.currentJoin != subtask || 1844 v.currentSteal != subtask) 1845 continue restart; // stale 1846 stat = 1; // apparent progress 1847 if (v.base == b) { 1848 if (t == null) 1849 break restart; 1850 if (U.compareAndSwapObject(a, i, t, null)) { 1851 U.putOrderedInt(v, QBASE, b + 1); 1852 ForkJoinTask<?> ps = joiner.currentSteal; 1853 int jt = joiner.top; 1854 do { 1855 joiner.currentSteal = t; 1856 t.doExec(); // clear local tasks too 1857 } while (task.status >= 0 && 1858 joiner.top != jt && 1859 (t = joiner.pop()) != null); 1860 joiner.currentSteal = ps; 1861 break restart; 1862 } 1863 } 1864 } 1865 else { // empty -- try to descend 1866 ForkJoinTask<?> next = v.currentJoin; 1867 if (subtask.status < 0 || j.currentJoin != subtask || 1868 v.currentSteal != subtask) 1869 continue restart; // stale 1870 else if (next == null || ++steps == MAX_HELP) 1871 break restart; // dead-end or maybe cyclic 1872 else { 1873 subtask = next; 1874 j = v; 1875 break; 1876 } 1877 } 1878 } 1879 } 1880 } 1881 } 1882 return stat; 1883 } 1884 1885 /** 1886 * Analog of tryHelpStealer for CountedCompleters. Tries to steal 1887 * and run tasks within the target's computation. 1888 * 1889 * @param task the task to join 1890 */ helpComplete(WorkQueue joiner, CountedCompleter<?> task)1891 private int helpComplete(WorkQueue joiner, CountedCompleter<?> task) { 1892 WorkQueue[] ws; int m; 1893 int s = 0; 1894 if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && 1895 joiner != null && task != null) { 1896 int j = joiner.poolIndex; 1897 int scans = m + m + 1; 1898 long c = 0L; // for stability check 1899 for (int k = scans; ; j += 2) { 1900 WorkQueue q; 1901 if ((s = task.status) < 0) 1902 break; 1903 else if (joiner.internalPopAndExecCC(task)) 1904 k = scans; 1905 else if ((s = task.status) < 0) 1906 break; 1907 else if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) 1908 k = scans; 1909 else if (--k < 0) { 1910 if (c == (c = ctl)) 1911 break; 1912 k = scans; 1913 } 1914 } 1915 } 1916 return s; 1917 } 1918 1919 /** 1920 * Tries to decrement active count (sometimes implicitly) and 1921 * possibly release or create a compensating worker in preparation 1922 * for blocking. Fails on contention or termination. Otherwise, 1923 * adds a new thread if no idle workers are available and pool 1924 * may become starved. 1925 * 1926 * @param c the assumed ctl value 1927 */ tryCompensate(long c)1928 final boolean tryCompensate(long c) { 1929 WorkQueue[] ws = workQueues; 1930 int pc = parallelism, e = (int)c, m, tc; 1931 if (ws != null && (m = ws.length - 1) >= 0 && e >= 0 && ctl == c) { 1932 WorkQueue w = ws[e & m]; 1933 if (e != 0 && w != null) { 1934 Thread p; 1935 long nc = ((long)(w.nextWait & E_MASK) | 1936 (c & (AC_MASK|TC_MASK))); 1937 int ne = (e + E_SEQ) & E_MASK; 1938 if (w.eventCount == (e | INT_SIGN) && 1939 U.compareAndSwapLong(this, CTL, c, nc)) { 1940 w.eventCount = ne; 1941 if ((p = w.parker) != null) 1942 U.unpark(p); 1943 return true; // replace with idle worker 1944 } 1945 } 1946 else if ((tc = (short)(c >>> TC_SHIFT)) >= 0 && 1947 (int)(c >> AC_SHIFT) + pc > 1) { 1948 long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK); 1949 if (U.compareAndSwapLong(this, CTL, c, nc)) 1950 return true; // no compensation 1951 } 1952 else if (tc + pc < MAX_CAP) { 1953 long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); 1954 if (U.compareAndSwapLong(this, CTL, c, nc)) { 1955 ForkJoinWorkerThreadFactory fac; 1956 Throwable ex = null; 1957 ForkJoinWorkerThread wt = null; 1958 try { 1959 if ((fac = factory) != null && 1960 (wt = fac.newThread(this)) != null) { 1961 wt.start(); 1962 return true; 1963 } 1964 } catch (Throwable rex) { 1965 ex = rex; 1966 } 1967 deregisterWorker(wt, ex); // clean up and return false 1968 } 1969 } 1970 } 1971 return false; 1972 } 1973 1974 /** 1975 * Helps and/or blocks until the given task is done. 1976 * 1977 * @param joiner the joining worker 1978 * @param task the task 1979 * @return task status on exit 1980 */ awaitJoin(WorkQueue joiner, ForkJoinTask<?> task)1981 final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) { 1982 int s = 0; 1983 if (task != null && (s = task.status) >= 0 && joiner != null) { 1984 ForkJoinTask<?> prevJoin = joiner.currentJoin; 1985 joiner.currentJoin = task; 1986 do {} while (joiner.tryRemoveAndExec(task) && // process local tasks 1987 (s = task.status) >= 0); 1988 if (s >= 0 && (task instanceof CountedCompleter)) 1989 s = helpComplete(joiner, (CountedCompleter<?>)task); 1990 long cc = 0; // for stability checks 1991 while (s >= 0 && (s = task.status) >= 0) { 1992 if ((s = tryHelpStealer(joiner, task)) == 0 && 1993 (s = task.status) >= 0) { 1994 if (!tryCompensate(cc)) 1995 cc = ctl; 1996 else { 1997 if (task.trySetSignal() && (s = task.status) >= 0) { 1998 synchronized (task) { 1999 if (task.status >= 0) { 2000 try { // see ForkJoinTask 2001 task.wait(); // for explanation 2002 } catch (InterruptedException ie) { 2003 } 2004 } 2005 else 2006 task.notifyAll(); 2007 } 2008 } 2009 long c; // reactivate 2010 do {} while (!U.compareAndSwapLong 2011 (this, CTL, c = ctl, 2012 ((c & ~AC_MASK) | 2013 ((c & AC_MASK) + AC_UNIT)))); 2014 } 2015 } 2016 } 2017 joiner.currentJoin = prevJoin; 2018 } 2019 return s; 2020 } 2021 2022 /** 2023 * Stripped-down variant of awaitJoin used by timed joins. Tries 2024 * to help join only while there is continuous progress. (Caller 2025 * will then enter a timed wait.) 2026 * 2027 * @param joiner the joining worker 2028 * @param task the task 2029 */ helpJoinOnce(WorkQueue joiner, ForkJoinTask<?> task)2030 final void helpJoinOnce(WorkQueue joiner, ForkJoinTask<?> task) { 2031 int s; 2032 if (joiner != null && task != null && (s = task.status) >= 0) { 2033 ForkJoinTask<?> prevJoin = joiner.currentJoin; 2034 joiner.currentJoin = task; 2035 do {} while (joiner.tryRemoveAndExec(task) && // process local tasks 2036 (s = task.status) >= 0); 2037 if (s >= 0) { 2038 if (task instanceof CountedCompleter) 2039 helpComplete(joiner, (CountedCompleter<?>)task); 2040 do {} while (task.status >= 0 && 2041 tryHelpStealer(joiner, task) > 0); 2042 } 2043 joiner.currentJoin = prevJoin; 2044 } 2045 } 2046 2047 /** 2048 * Returns a (probably) non-empty steal queue, if one is found 2049 * during a scan, else null. This method must be retried by 2050 * caller if, by the time it tries to use the queue, it is empty. 2051 */ findNonEmptyStealQueue()2052 private WorkQueue findNonEmptyStealQueue() { 2053 int r = ThreadLocalRandom.current().nextInt(); 2054 for (;;) { 2055 int ps = plock, m; WorkQueue[] ws; WorkQueue q; 2056 if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) { 2057 for (int j = (m + 1) << 2; j >= 0; --j) { 2058 if ((q = ws[(((r - j) << 1) | 1) & m]) != null && 2059 q.base - q.top < 0) 2060 return q; 2061 } 2062 } 2063 if (plock == ps) 2064 return null; 2065 } 2066 } 2067 2068 /** 2069 * Runs tasks until {@code isQuiescent()}. We piggyback on 2070 * active count ctl maintenance, but rather than blocking 2071 * when tasks cannot be found, we rescan until all others cannot 2072 * find tasks either. 2073 */ helpQuiescePool(WorkQueue w)2074 final void helpQuiescePool(WorkQueue w) { 2075 ForkJoinTask<?> ps = w.currentSteal; 2076 for (boolean active = true;;) { 2077 long c; WorkQueue q; ForkJoinTask<?> t; int b; 2078 while ((t = w.nextLocalTask()) != null) 2079 t.doExec(); 2080 if ((q = findNonEmptyStealQueue()) != null) { 2081 if (!active) { // re-establish active count 2082 active = true; 2083 do {} while (!U.compareAndSwapLong 2084 (this, CTL, c = ctl, 2085 ((c & ~AC_MASK) | 2086 ((c & AC_MASK) + AC_UNIT)))); 2087 } 2088 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) { 2089 (w.currentSteal = t).doExec(); 2090 w.currentSteal = ps; 2091 } 2092 } 2093 else if (active) { // decrement active count without queuing 2094 long nc = ((c = ctl) & ~AC_MASK) | ((c & AC_MASK) - AC_UNIT); 2095 if ((int)(nc >> AC_SHIFT) + parallelism == 0) 2096 break; // bypass decrement-then-increment 2097 if (U.compareAndSwapLong(this, CTL, c, nc)) 2098 active = false; 2099 } 2100 else if ((int)((c = ctl) >> AC_SHIFT) + parallelism <= 0 && 2101 U.compareAndSwapLong 2102 (this, CTL, c, ((c & ~AC_MASK) | 2103 ((c & AC_MASK) + AC_UNIT)))) 2104 break; 2105 } 2106 } 2107 2108 /** 2109 * Gets and removes a local or stolen task for the given worker. 2110 * 2111 * @return a task, if available 2112 */ nextTaskFor(WorkQueue w)2113 final ForkJoinTask<?> nextTaskFor(WorkQueue w) { 2114 for (ForkJoinTask<?> t;;) { 2115 WorkQueue q; int b; 2116 if ((t = w.nextLocalTask()) != null) 2117 return t; 2118 if ((q = findNonEmptyStealQueue()) == null) 2119 return null; 2120 if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) 2121 return t; 2122 } 2123 } 2124 2125 /** 2126 * Returns a cheap heuristic guide for task partitioning when 2127 * programmers, frameworks, tools, or languages have little or no 2128 * idea about task granularity. In essence by offering this 2129 * method, we ask users only about tradeoffs in overhead vs 2130 * expected throughput and its variance, rather than how finely to 2131 * partition tasks. 2132 * 2133 * In a steady state strict (tree-structured) computation, each 2134 * thread makes available for stealing enough tasks for other 2135 * threads to remain active. Inductively, if all threads play by 2136 * the same rules, each thread should make available only a 2137 * constant number of tasks. 2138 * 2139 * The minimum useful constant is just 1. But using a value of 1 2140 * would require immediate replenishment upon each steal to 2141 * maintain enough tasks, which is infeasible. Further, 2142 * partitionings/granularities of offered tasks should minimize 2143 * steal rates, which in general means that threads nearer the top 2144 * of computation tree should generate more than those nearer the 2145 * bottom. In perfect steady state, each thread is at 2146 * approximately the same level of computation tree. However, 2147 * producing extra tasks amortizes the uncertainty of progress and 2148 * diffusion assumptions. 2149 * 2150 * So, users will want to use values larger (but not much larger) 2151 * than 1 to both smooth over transient shortages and hedge 2152 * against uneven progress; as traded off against the cost of 2153 * extra task overhead. We leave the user to pick a threshold 2154 * value to compare with the results of this call to guide 2155 * decisions, but recommend values such as 3. 2156 * 2157 * When all threads are active, it is on average OK to estimate 2158 * surplus strictly locally. In steady-state, if one thread is 2159 * maintaining say 2 surplus tasks, then so are others. So we can 2160 * just use estimated queue length. However, this strategy alone 2161 * leads to serious mis-estimates in some non-steady-state 2162 * conditions (ramp-up, ramp-down, other stalls). We can detect 2163 * many of these by further considering the number of "idle" 2164 * threads, that are known to have zero queued tasks, so 2165 * compensate by a factor of (#idle/#active) threads. 2166 * 2167 * Note: The approximation of #busy workers as #active workers is 2168 * not very good under current signalling scheme, and should be 2169 * improved. 2170 */ getSurplusQueuedTaskCount()2171 static int getSurplusQueuedTaskCount() { 2172 Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q; 2173 if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) { 2174 int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).parallelism; 2175 int n = (q = wt.workQueue).top - q.base; 2176 int a = (int)(pool.ctl >> AC_SHIFT) + p; 2177 return n - (a > (p >>>= 1) ? 0 : 2178 a > (p >>>= 1) ? 1 : 2179 a > (p >>>= 1) ? 2 : 2180 a > (p >>>= 1) ? 4 : 2181 8); 2182 } 2183 return 0; 2184 } 2185 2186 // Termination 2187 2188 /** 2189 * Possibly initiates and/or completes termination. The caller 2190 * triggering termination runs three passes through workQueues: 2191 * (0) Setting termination status, followed by wakeups of queued 2192 * workers; (1) cancelling all tasks; (2) interrupting lagging 2193 * threads (likely in external tasks, but possibly also blocked in 2194 * joins). Each pass repeats previous steps because of potential 2195 * lagging thread creation. 2196 * 2197 * @param now if true, unconditionally terminate, else only 2198 * if no work and no active workers 2199 * @param enable if true, enable shutdown when next possible 2200 * @return true if now terminating or terminated 2201 */ tryTerminate(boolean now, boolean enable)2202 private boolean tryTerminate(boolean now, boolean enable) { 2203 int ps; 2204 if (this == common) // cannot shut down 2205 return false; 2206 if ((ps = plock) >= 0) { // enable by setting plock 2207 if (!enable) 2208 return false; 2209 if ((ps & PL_LOCK) != 0 || 2210 !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) 2211 ps = acquirePlock(); 2212 int nps = ((ps + PL_LOCK) & ~SHUTDOWN) | SHUTDOWN; 2213 if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) 2214 releasePlock(nps); 2215 } 2216 for (long c;;) { 2217 if (((c = ctl) & STOP_BIT) != 0) { // already terminating 2218 if ((short)(c >>> TC_SHIFT) + parallelism <= 0) { 2219 synchronized (this) { 2220 notifyAll(); // signal when 0 workers 2221 } 2222 } 2223 return true; 2224 } 2225 if (!now) { // check if idle & no tasks 2226 WorkQueue[] ws; WorkQueue w; 2227 if ((int)(c >> AC_SHIFT) + parallelism > 0) 2228 return false; 2229 if ((ws = workQueues) != null) { 2230 for (int i = 0; i < ws.length; ++i) { 2231 if ((w = ws[i]) != null && 2232 (!w.isEmpty() || 2233 ((i & 1) != 0 && w.eventCount >= 0))) { 2234 signalWork(ws, w); 2235 return false; 2236 } 2237 } 2238 } 2239 } 2240 if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) { 2241 for (int pass = 0; pass < 3; ++pass) { 2242 WorkQueue[] ws; WorkQueue w; Thread wt; 2243 if ((ws = workQueues) != null) { 2244 int n = ws.length; 2245 for (int i = 0; i < n; ++i) { 2246 if ((w = ws[i]) != null) { 2247 w.qlock = -1; 2248 if (pass > 0) { 2249 w.cancelAll(); 2250 if (pass > 1 && (wt = w.owner) != null) { 2251 if (!wt.isInterrupted()) { 2252 try { 2253 wt.interrupt(); 2254 } catch (Throwable ignore) { 2255 } 2256 } 2257 U.unpark(wt); 2258 } 2259 } 2260 } 2261 } 2262 // Wake up workers parked on event queue 2263 int i, e; long cc; Thread p; 2264 while ((e = (int)(cc = ctl) & E_MASK) != 0 && 2265 (i = e & SMASK) < n && i >= 0 && 2266 (w = ws[i]) != null) { 2267 long nc = ((long)(w.nextWait & E_MASK) | 2268 ((cc + AC_UNIT) & AC_MASK) | 2269 (cc & (TC_MASK|STOP_BIT))); 2270 if (w.eventCount == (e | INT_SIGN) && 2271 U.compareAndSwapLong(this, CTL, cc, nc)) { 2272 w.eventCount = (e + E_SEQ) & E_MASK; 2273 w.qlock = -1; 2274 if ((p = w.parker) != null) 2275 U.unpark(p); 2276 } 2277 } 2278 } 2279 } 2280 } 2281 } 2282 } 2283 2284 // external operations on common pool 2285 2286 /** 2287 * Returns common pool queue for a thread that has submitted at 2288 * least one task. 2289 */ commonSubmitterQueue()2290 static WorkQueue commonSubmitterQueue() { 2291 Submitter z; ForkJoinPool p; WorkQueue[] ws; int m, r; 2292 return ((z = submitters.get()) != null && 2293 (p = common) != null && 2294 (ws = p.workQueues) != null && 2295 (m = ws.length - 1) >= 0) ? 2296 ws[m & z.seed & SQMASK] : null; 2297 } 2298 2299 /** 2300 * Tries to pop the given task from submitter's queue in common pool. 2301 */ tryExternalUnpush(ForkJoinTask<?> task)2302 final boolean tryExternalUnpush(ForkJoinTask<?> task) { 2303 WorkQueue joiner; ForkJoinTask<?>[] a; int m, s; 2304 Submitter z = submitters.get(); 2305 WorkQueue[] ws = workQueues; 2306 boolean popped = false; 2307 if (z != null && ws != null && (m = ws.length - 1) >= 0 && 2308 (joiner = ws[z.seed & m & SQMASK]) != null && 2309 joiner.base != (s = joiner.top) && 2310 (a = joiner.array) != null) { 2311 long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; 2312 if (U.getObject(a, j) == task && 2313 U.compareAndSwapInt(joiner, QLOCK, 0, 1)) { 2314 if (joiner.top == s && joiner.array == a && 2315 U.compareAndSwapObject(a, j, task, null)) { 2316 joiner.top = s - 1; 2317 popped = true; 2318 } 2319 joiner.qlock = 0; 2320 } 2321 } 2322 return popped; 2323 } 2324 externalHelpComplete(CountedCompleter<?> task)2325 final int externalHelpComplete(CountedCompleter<?> task) { 2326 WorkQueue joiner; int m, j; 2327 Submitter z = submitters.get(); 2328 WorkQueue[] ws = workQueues; 2329 int s = 0; 2330 if (z != null && ws != null && (m = ws.length - 1) >= 0 && 2331 (joiner = ws[(j = z.seed) & m & SQMASK]) != null && task != null) { 2332 int scans = m + m + 1; 2333 long c = 0L; // for stability check 2334 j |= 1; // poll odd queues 2335 for (int k = scans; ; j += 2) { 2336 WorkQueue q; 2337 if ((s = task.status) < 0) 2338 break; 2339 else if (joiner.externalPopAndExecCC(task)) 2340 k = scans; 2341 else if ((s = task.status) < 0) 2342 break; 2343 else if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) 2344 k = scans; 2345 else if (--k < 0) { 2346 if (c == (c = ctl)) 2347 break; 2348 k = scans; 2349 } 2350 } 2351 } 2352 return s; 2353 } 2354 2355 // Exported methods 2356 2357 // Constructors 2358 2359 /** 2360 * Creates a {@code ForkJoinPool} with parallelism equal to {@link 2361 * java.lang.Runtime#availableProcessors}, using the {@linkplain 2362 * #defaultForkJoinWorkerThreadFactory default thread factory}, 2363 * no UncaughtExceptionHandler, and non-async LIFO processing mode. 2364 */ ForkJoinPool()2365 public ForkJoinPool() { 2366 this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), 2367 defaultForkJoinWorkerThreadFactory, null, false); 2368 } 2369 2370 /** 2371 * Creates a {@code ForkJoinPool} with the indicated parallelism 2372 * level, the {@linkplain 2373 * #defaultForkJoinWorkerThreadFactory default thread factory}, 2374 * no UncaughtExceptionHandler, and non-async LIFO processing mode. 2375 * 2376 * @param parallelism the parallelism level 2377 * @throws IllegalArgumentException if parallelism less than or 2378 * equal to zero, or greater than implementation limit 2379 */ ForkJoinPool(int parallelism)2380 public ForkJoinPool(int parallelism) { 2381 this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); 2382 } 2383 2384 /** 2385 * Creates a {@code ForkJoinPool} with the given parameters. 2386 * 2387 * @param parallelism the parallelism level. For default value, 2388 * use {@link java.lang.Runtime#availableProcessors}. 2389 * @param factory the factory for creating new threads. For default value, 2390 * use {@link #defaultForkJoinWorkerThreadFactory}. 2391 * @param handler the handler for internal worker threads that 2392 * terminate due to unrecoverable errors encountered while executing 2393 * tasks. For default value, use {@code null}. 2394 * @param asyncMode if true, 2395 * establishes local first-in-first-out scheduling mode for forked 2396 * tasks that are never joined. This mode may be more appropriate 2397 * than default locally stack-based mode in applications in which 2398 * worker threads only process event-style asynchronous tasks. 2399 * For default value, use {@code false}. 2400 * @throws IllegalArgumentException if parallelism less than or 2401 * equal to zero, or greater than implementation limit 2402 * @throws NullPointerException if the factory is null 2403 */ ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode)2404 public ForkJoinPool(int parallelism, 2405 ForkJoinWorkerThreadFactory factory, 2406 UncaughtExceptionHandler handler, 2407 boolean asyncMode) { 2408 this(checkParallelism(parallelism), 2409 checkFactory(factory), 2410 handler, 2411 (asyncMode ? FIFO_QUEUE : LIFO_QUEUE), 2412 "ForkJoinPool-" + nextPoolId() + "-worker-"); 2413 checkPermission(); 2414 } 2415 checkParallelism(int parallelism)2416 private static int checkParallelism(int parallelism) { 2417 if (parallelism <= 0 || parallelism > MAX_CAP) 2418 throw new IllegalArgumentException(); 2419 return parallelism; 2420 } 2421 checkFactory(ForkJoinWorkerThreadFactory factory)2422 private static ForkJoinWorkerThreadFactory checkFactory 2423 (ForkJoinWorkerThreadFactory factory) { 2424 if (factory == null) 2425 throw new NullPointerException(); 2426 return factory; 2427 } 2428 2429 /** 2430 * Creates a {@code ForkJoinPool} with the given parameters, without 2431 * any security checks or parameter validation. Invoked directly by 2432 * makeCommonPool. 2433 */ ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix)2434 private ForkJoinPool(int parallelism, 2435 ForkJoinWorkerThreadFactory factory, 2436 UncaughtExceptionHandler handler, 2437 int mode, 2438 String workerNamePrefix) { 2439 this.workerNamePrefix = workerNamePrefix; 2440 this.factory = factory; 2441 this.ueh = handler; 2442 this.mode = (short)mode; 2443 this.parallelism = (short)parallelism; 2444 long np = (long)(-parallelism); // offset ctl counts 2445 this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); 2446 } 2447 2448 /** 2449 * Returns the common pool instance. This pool is statically 2450 * constructed; its run state is unaffected by attempts to {@link 2451 * #shutdown} or {@link #shutdownNow}. However this pool and any 2452 * ongoing processing are automatically terminated upon program 2453 * {@link System#exit}. Any program that relies on asynchronous 2454 * task processing to complete before program termination should 2455 * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence}, 2456 * before exit. 2457 * 2458 * @return the common pool instance 2459 * @since 1.8 2460 * @hide 2461 */ commonPool()2462 public static ForkJoinPool commonPool() { 2463 // assert common != null : "static init error"; 2464 return common; 2465 } 2466 2467 // Execution methods 2468 2469 /** 2470 * Performs the given task, returning its result upon completion. 2471 * If the computation encounters an unchecked Exception or Error, 2472 * it is rethrown as the outcome of this invocation. Rethrown 2473 * exceptions behave in the same way as regular exceptions, but, 2474 * when possible, contain stack traces (as displayed for example 2475 * using {@code ex.printStackTrace()}) of both the current thread 2476 * as well as the thread actually encountering the exception; 2477 * minimally only the latter. 2478 * 2479 * @param task the task 2480 * @return the task's result 2481 * @throws NullPointerException if the task is null 2482 * @throws RejectedExecutionException if the task cannot be 2483 * scheduled for execution 2484 */ invoke(ForkJoinTask<T> task)2485 public <T> T invoke(ForkJoinTask<T> task) { 2486 if (task == null) 2487 throw new NullPointerException(); 2488 externalPush(task); 2489 return task.join(); 2490 } 2491 2492 /** 2493 * Arranges for (asynchronous) execution of the given task. 2494 * 2495 * @param task the task 2496 * @throws NullPointerException if the task is null 2497 * @throws RejectedExecutionException if the task cannot be 2498 * scheduled for execution 2499 */ execute(ForkJoinTask<?> task)2500 public void execute(ForkJoinTask<?> task) { 2501 if (task == null) 2502 throw new NullPointerException(); 2503 externalPush(task); 2504 } 2505 2506 // AbstractExecutorService methods 2507 2508 /** 2509 * @throws NullPointerException if the task is null 2510 * @throws RejectedExecutionException if the task cannot be 2511 * scheduled for execution 2512 */ execute(Runnable task)2513 public void execute(Runnable task) { 2514 if (task == null) 2515 throw new NullPointerException(); 2516 ForkJoinTask<?> job; 2517 if (task instanceof ForkJoinTask<?>) // avoid re-wrap 2518 job = (ForkJoinTask<?>) task; 2519 else 2520 job = new ForkJoinTask.RunnableExecuteAction(task); 2521 externalPush(job); 2522 } 2523 2524 /** 2525 * Submits a ForkJoinTask for execution. 2526 * 2527 * @param task the task to submit 2528 * @return the task 2529 * @throws NullPointerException if the task is null 2530 * @throws RejectedExecutionException if the task cannot be 2531 * scheduled for execution 2532 */ submit(ForkJoinTask<T> task)2533 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { 2534 if (task == null) 2535 throw new NullPointerException(); 2536 externalPush(task); 2537 return task; 2538 } 2539 2540 /** 2541 * @throws NullPointerException if the task is null 2542 * @throws RejectedExecutionException if the task cannot be 2543 * scheduled for execution 2544 */ submit(Callable<T> task)2545 public <T> ForkJoinTask<T> submit(Callable<T> task) { 2546 ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task); 2547 externalPush(job); 2548 return job; 2549 } 2550 2551 /** 2552 * @throws NullPointerException if the task is null 2553 * @throws RejectedExecutionException if the task cannot be 2554 * scheduled for execution 2555 */ submit(Runnable task, T result)2556 public <T> ForkJoinTask<T> submit(Runnable task, T result) { 2557 ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result); 2558 externalPush(job); 2559 return job; 2560 } 2561 2562 /** 2563 * @throws NullPointerException if the task is null 2564 * @throws RejectedExecutionException if the task cannot be 2565 * scheduled for execution 2566 */ submit(Runnable task)2567 public ForkJoinTask<?> submit(Runnable task) { 2568 if (task == null) 2569 throw new NullPointerException(); 2570 ForkJoinTask<?> job; 2571 if (task instanceof ForkJoinTask<?>) // avoid re-wrap 2572 job = (ForkJoinTask<?>) task; 2573 else 2574 job = new ForkJoinTask.AdaptedRunnableAction(task); 2575 externalPush(job); 2576 return job; 2577 } 2578 2579 /** 2580 * @throws NullPointerException {@inheritDoc} 2581 * @throws RejectedExecutionException {@inheritDoc} 2582 */ invokeAll(Collection<? extends Callable<T>> tasks)2583 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { 2584 // In previous versions of this class, this method constructed 2585 // a task to run ForkJoinTask.invokeAll, but now external 2586 // invocation of multiple tasks is at least as efficient. 2587 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 2588 2589 boolean done = false; 2590 try { 2591 for (Callable<T> t : tasks) { 2592 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t); 2593 futures.add(f); 2594 externalPush(f); 2595 } 2596 for (int i = 0, size = futures.size(); i < size; i++) 2597 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin(); 2598 done = true; 2599 return futures; 2600 } finally { 2601 if (!done) 2602 for (int i = 0, size = futures.size(); i < size; i++) 2603 futures.get(i).cancel(false); 2604 } 2605 } 2606 2607 /** 2608 * Returns the factory used for constructing new workers. 2609 * 2610 * @return the factory used for constructing new workers 2611 */ getFactory()2612 public ForkJoinWorkerThreadFactory getFactory() { 2613 return factory; 2614 } 2615 2616 /** 2617 * Returns the handler for internal worker threads that terminate 2618 * due to unrecoverable errors encountered while executing tasks. 2619 * 2620 * @return the handler, or {@code null} if none 2621 */ getUncaughtExceptionHandler()2622 public UncaughtExceptionHandler getUncaughtExceptionHandler() { 2623 return ueh; 2624 } 2625 2626 /** 2627 * Returns the targeted parallelism level of this pool. 2628 * 2629 * @return the targeted parallelism level of this pool 2630 */ getParallelism()2631 public int getParallelism() { 2632 int par; 2633 return ((par = parallelism) > 0) ? par : 1; 2634 } 2635 2636 /** 2637 * Returns the targeted parallelism level of the common pool. 2638 * 2639 * @return the targeted parallelism level of the common pool 2640 * @since 1.8 2641 * @hide 2642 */ getCommonPoolParallelism()2643 public static int getCommonPoolParallelism() { 2644 return commonParallelism; 2645 } 2646 2647 /** 2648 * Returns the number of worker threads that have started but not 2649 * yet terminated. The result returned by this method may differ 2650 * from {@link #getParallelism} when threads are created to 2651 * maintain parallelism when others are cooperatively blocked. 2652 * 2653 * @return the number of worker threads 2654 */ getPoolSize()2655 public int getPoolSize() { 2656 return parallelism + (short)(ctl >>> TC_SHIFT); 2657 } 2658 2659 /** 2660 * Returns {@code true} if this pool uses local first-in-first-out 2661 * scheduling mode for forked tasks that are never joined. 2662 * 2663 * @return {@code true} if this pool uses async mode 2664 */ getAsyncMode()2665 public boolean getAsyncMode() { 2666 return mode == FIFO_QUEUE; 2667 } 2668 2669 /** 2670 * Returns an estimate of the number of worker threads that are 2671 * not blocked waiting to join tasks or for other managed 2672 * synchronization. This method may overestimate the 2673 * number of running threads. 2674 * 2675 * @return the number of worker threads 2676 */ getRunningThreadCount()2677 public int getRunningThreadCount() { 2678 int rc = 0; 2679 WorkQueue[] ws; WorkQueue w; 2680 if ((ws = workQueues) != null) { 2681 for (int i = 1; i < ws.length; i += 2) { 2682 if ((w = ws[i]) != null && w.isApparentlyUnblocked()) 2683 ++rc; 2684 } 2685 } 2686 return rc; 2687 } 2688 2689 /** 2690 * Returns an estimate of the number of threads that are currently 2691 * stealing or executing tasks. This method may overestimate the 2692 * number of active threads. 2693 * 2694 * @return the number of active threads 2695 */ getActiveThreadCount()2696 public int getActiveThreadCount() { 2697 int r = parallelism + (int)(ctl >> AC_SHIFT); 2698 return (r <= 0) ? 0 : r; // suppress momentarily negative values 2699 } 2700 2701 /** 2702 * Returns {@code true} if all worker threads are currently idle. 2703 * An idle worker is one that cannot obtain a task to execute 2704 * because none are available to steal from other threads, and 2705 * there are no pending submissions to the pool. This method is 2706 * conservative; it might not return {@code true} immediately upon 2707 * idleness of all threads, but will eventually become true if 2708 * threads remain inactive. 2709 * 2710 * @return {@code true} if all threads are currently idle 2711 */ isQuiescent()2712 public boolean isQuiescent() { 2713 return parallelism + (int)(ctl >> AC_SHIFT) <= 0; 2714 } 2715 2716 /** 2717 * Returns an estimate of the total number of tasks stolen from 2718 * one thread's work queue by another. The reported value 2719 * underestimates the actual total number of steals when the pool 2720 * is not quiescent. This value may be useful for monitoring and 2721 * tuning fork/join programs: in general, steal counts should be 2722 * high enough to keep threads busy, but low enough to avoid 2723 * overhead and contention across threads. 2724 * 2725 * @return the number of steals 2726 */ getStealCount()2727 public long getStealCount() { 2728 long count = stealCount; 2729 WorkQueue[] ws; WorkQueue w; 2730 if ((ws = workQueues) != null) { 2731 for (int i = 1; i < ws.length; i += 2) { 2732 if ((w = ws[i]) != null) 2733 count += w.nsteals; 2734 } 2735 } 2736 return count; 2737 } 2738 2739 /** 2740 * Returns an estimate of the total number of tasks currently held 2741 * in queues by worker threads (but not including tasks submitted 2742 * to the pool that have not begun executing). This value is only 2743 * an approximation, obtained by iterating across all threads in 2744 * the pool. This method may be useful for tuning task 2745 * granularities. 2746 * 2747 * @return the number of queued tasks 2748 */ getQueuedTaskCount()2749 public long getQueuedTaskCount() { 2750 long count = 0; 2751 WorkQueue[] ws; WorkQueue w; 2752 if ((ws = workQueues) != null) { 2753 for (int i = 1; i < ws.length; i += 2) { 2754 if ((w = ws[i]) != null) 2755 count += w.queueSize(); 2756 } 2757 } 2758 return count; 2759 } 2760 2761 /** 2762 * Returns an estimate of the number of tasks submitted to this 2763 * pool that have not yet begun executing. This method may take 2764 * time proportional to the number of submissions. 2765 * 2766 * @return the number of queued submissions 2767 */ getQueuedSubmissionCount()2768 public int getQueuedSubmissionCount() { 2769 int count = 0; 2770 WorkQueue[] ws; WorkQueue w; 2771 if ((ws = workQueues) != null) { 2772 for (int i = 0; i < ws.length; i += 2) { 2773 if ((w = ws[i]) != null) 2774 count += w.queueSize(); 2775 } 2776 } 2777 return count; 2778 } 2779 2780 /** 2781 * Returns {@code true} if there are any tasks submitted to this 2782 * pool that have not yet begun executing. 2783 * 2784 * @return {@code true} if there are any queued submissions 2785 */ hasQueuedSubmissions()2786 public boolean hasQueuedSubmissions() { 2787 WorkQueue[] ws; WorkQueue w; 2788 if ((ws = workQueues) != null) { 2789 for (int i = 0; i < ws.length; i += 2) { 2790 if ((w = ws[i]) != null && !w.isEmpty()) 2791 return true; 2792 } 2793 } 2794 return false; 2795 } 2796 2797 /** 2798 * Removes and returns the next unexecuted submission if one is 2799 * available. This method may be useful in extensions to this 2800 * class that re-assign work in systems with multiple pools. 2801 * 2802 * @return the next submission, or {@code null} if none 2803 */ pollSubmission()2804 protected ForkJoinTask<?> pollSubmission() { 2805 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t; 2806 if ((ws = workQueues) != null) { 2807 for (int i = 0; i < ws.length; i += 2) { 2808 if ((w = ws[i]) != null && (t = w.poll()) != null) 2809 return t; 2810 } 2811 } 2812 return null; 2813 } 2814 2815 /** 2816 * Removes all available unexecuted submitted and forked tasks 2817 * from scheduling queues and adds them to the given collection, 2818 * without altering their execution status. These may include 2819 * artificially generated or wrapped tasks. This method is 2820 * designed to be invoked only when the pool is known to be 2821 * quiescent. Invocations at other times may not remove all 2822 * tasks. A failure encountered while attempting to add elements 2823 * to collection {@code c} may result in elements being in 2824 * neither, either or both collections when the associated 2825 * exception is thrown. The behavior of this operation is 2826 * undefined if the specified collection is modified while the 2827 * operation is in progress. 2828 * 2829 * @param c the collection to transfer elements into 2830 * @return the number of elements transferred 2831 */ drainTasksTo(Collection<? super ForkJoinTask<?>> c)2832 protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) { 2833 int count = 0; 2834 WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t; 2835 if ((ws = workQueues) != null) { 2836 for (int i = 0; i < ws.length; ++i) { 2837 if ((w = ws[i]) != null) { 2838 while ((t = w.poll()) != null) { 2839 c.add(t); 2840 ++count; 2841 } 2842 } 2843 } 2844 } 2845 return count; 2846 } 2847 2848 /** 2849 * Returns a string identifying this pool, as well as its state, 2850 * including indications of run state, parallelism level, and 2851 * worker and task counts. 2852 * 2853 * @return a string identifying this pool, as well as its state 2854 */ toString()2855 public String toString() { 2856 // Use a single pass through workQueues to collect counts 2857 long qt = 0L, qs = 0L; int rc = 0; 2858 long st = stealCount; 2859 long c = ctl; 2860 WorkQueue[] ws; WorkQueue w; 2861 if ((ws = workQueues) != null) { 2862 for (int i = 0; i < ws.length; ++i) { 2863 if ((w = ws[i]) != null) { 2864 int size = w.queueSize(); 2865 if ((i & 1) == 0) 2866 qs += size; 2867 else { 2868 qt += size; 2869 st += w.nsteals; 2870 if (w.isApparentlyUnblocked()) 2871 ++rc; 2872 } 2873 } 2874 } 2875 } 2876 int pc = parallelism; 2877 int tc = pc + (short)(c >>> TC_SHIFT); 2878 int ac = pc + (int)(c >> AC_SHIFT); 2879 if (ac < 0) // ignore transient negative 2880 ac = 0; 2881 String level; 2882 if ((c & STOP_BIT) != 0) 2883 level = (tc == 0) ? "Terminated" : "Terminating"; 2884 else 2885 level = plock < 0 ? "Shutting down" : "Running"; 2886 return super.toString() + 2887 "[" + level + 2888 ", parallelism = " + pc + 2889 ", size = " + tc + 2890 ", active = " + ac + 2891 ", running = " + rc + 2892 ", steals = " + st + 2893 ", tasks = " + qt + 2894 ", submissions = " + qs + 2895 "]"; 2896 } 2897 2898 /** 2899 * Possibly initiates an orderly shutdown in which previously 2900 * submitted tasks are executed, but no new tasks will be 2901 * accepted. Invocation has no effect on execution state if this 2902 * is the {@code commonPool()}, and no additional effect if 2903 * already shut down. Tasks that are in the process of being 2904 * submitted concurrently during the course of this method may or 2905 * may not be rejected. 2906 */ 2907 public void shutdown() { 2908 checkPermission(); 2909 tryTerminate(false, true); 2910 } 2911 2912 /** 2913 * Possibly attempts to cancel and/or stop all tasks, and reject 2914 * all subsequently submitted tasks. Invocation has no effect on 2915 * execution state if this is the {@code commonPool()}, and no 2916 * additional effect if already shut down. Otherwise, tasks that 2917 * are in the process of being submitted or executed concurrently 2918 * during the course of this method may or may not be 2919 * rejected. This method cancels both existing and unexecuted 2920 * tasks, in order to permit termination in the presence of task 2921 * dependencies. So the method always returns an empty list 2922 * (unlike the case for some other Executors). 2923 * 2924 * @return an empty list 2925 */ 2926 public List<Runnable> shutdownNow() { 2927 checkPermission(); 2928 tryTerminate(true, true); 2929 return Collections.emptyList(); 2930 } 2931 2932 /** 2933 * Returns {@code true} if all tasks have completed following shut down. 2934 * 2935 * @return {@code true} if all tasks have completed following shut down 2936 */ 2937 public boolean isTerminated() { 2938 long c = ctl; 2939 return ((c & STOP_BIT) != 0L && 2940 (short)(c >>> TC_SHIFT) + parallelism <= 0); 2941 } 2942 2943 /** 2944 * Returns {@code true} if the process of termination has 2945 * commenced but not yet completed. This method may be useful for 2946 * debugging. A return of {@code true} reported a sufficient 2947 * period after shutdown may indicate that submitted tasks have 2948 * ignored or suppressed interruption, or are waiting for I/O, 2949 * causing this executor not to properly terminate. (See the 2950 * advisory notes for class {@link ForkJoinTask} stating that 2951 * tasks should not normally entail blocking operations. But if 2952 * they do, they must abort them on interrupt.) 2953 * 2954 * @return {@code true} if terminating but not yet terminated 2955 */ isTerminating()2956 public boolean isTerminating() { 2957 long c = ctl; 2958 return ((c & STOP_BIT) != 0L && 2959 (short)(c >>> TC_SHIFT) + parallelism > 0); 2960 } 2961 2962 /** 2963 * Returns {@code true} if this pool has been shut down. 2964 * 2965 * @return {@code true} if this pool has been shut down 2966 */ isShutdown()2967 public boolean isShutdown() { 2968 return plock < 0; 2969 } 2970 2971 /** 2972 * Blocks until all tasks have completed execution after a 2973 * shutdown request, or the timeout occurs, or the current thread 2974 * is interrupted, whichever happens first. Because the {@code 2975 * commonPool()} never terminates until program shutdown, when 2976 * applied to the common pool, this method is equivalent to {@link 2977 * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}. 2978 * 2979 * @param timeout the maximum time to wait 2980 * @param unit the time unit of the timeout argument 2981 * @return {@code true} if this executor terminated and 2982 * {@code false} if the timeout elapsed before termination 2983 * @throws InterruptedException if interrupted while waiting 2984 */ awaitTermination(long timeout, TimeUnit unit)2985 public boolean awaitTermination(long timeout, TimeUnit unit) 2986 throws InterruptedException { 2987 if (Thread.interrupted()) 2988 throw new InterruptedException(); 2989 if (this == common) { 2990 awaitQuiescence(timeout, unit); 2991 return false; 2992 } 2993 long nanos = unit.toNanos(timeout); 2994 if (isTerminated()) 2995 return true; 2996 if (nanos <= 0L) 2997 return false; 2998 long deadline = System.nanoTime() + nanos; 2999 synchronized (this) { 3000 for (;;) { 3001 if (isTerminated()) 3002 return true; 3003 if (nanos <= 0L) 3004 return false; 3005 long millis = TimeUnit.NANOSECONDS.toMillis(nanos); 3006 wait(millis > 0L ? millis : 1L); 3007 nanos = deadline - System.nanoTime(); 3008 } 3009 } 3010 } 3011 3012 /** 3013 * If called by a ForkJoinTask operating in this pool, equivalent 3014 * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise, 3015 * waits and/or attempts to assist performing tasks until this 3016 * pool {@link #isQuiescent} or the indicated timeout elapses. 3017 * 3018 * @param timeout the maximum time to wait 3019 * @param unit the time unit of the timeout argument 3020 * @return {@code true} if quiescent; {@code false} if the 3021 * timeout elapsed. 3022 */ awaitQuiescence(long timeout, TimeUnit unit)3023 public boolean awaitQuiescence(long timeout, TimeUnit unit) { 3024 long nanos = unit.toNanos(timeout); 3025 ForkJoinWorkerThread wt; 3026 Thread thread = Thread.currentThread(); 3027 if ((thread instanceof ForkJoinWorkerThread) && 3028 (wt = (ForkJoinWorkerThread)thread).pool == this) { 3029 helpQuiescePool(wt.workQueue); 3030 return true; 3031 } 3032 long startTime = System.nanoTime(); 3033 WorkQueue[] ws; 3034 int r = 0, m; 3035 boolean found = true; 3036 while (!isQuiescent() && (ws = workQueues) != null && 3037 (m = ws.length - 1) >= 0) { 3038 if (!found) { 3039 if ((System.nanoTime() - startTime) > nanos) 3040 return false; 3041 Thread.yield(); // cannot block 3042 } 3043 found = false; 3044 for (int j = (m + 1) << 2; j >= 0; --j) { 3045 ForkJoinTask<?> t; WorkQueue q; int b; 3046 if ((q = ws[r++ & m]) != null && (b = q.base) - q.top < 0) { 3047 found = true; 3048 if ((t = q.pollAt(b)) != null) 3049 t.doExec(); 3050 break; 3051 } 3052 } 3053 } 3054 return true; 3055 } 3056 3057 /** 3058 * Waits and/or attempts to assist performing tasks indefinitely 3059 * until the {@code commonPool()} {@link #isQuiescent}. 3060 */ quiesceCommonPool()3061 static void quiesceCommonPool() { 3062 common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 3063 } 3064 3065 /** 3066 * Interface for extending managed parallelism for tasks running 3067 * in {@link ForkJoinPool}s. 3068 * 3069 * <p>A {@code ManagedBlocker} provides two methods. Method 3070 * {@code isReleasable} must return {@code true} if blocking is 3071 * not necessary. Method {@code block} blocks the current thread 3072 * if necessary (perhaps internally invoking {@code isReleasable} 3073 * before actually blocking). These actions are performed by any 3074 * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}. 3075 * The unusual methods in this API accommodate synchronizers that 3076 * may, but don't usually, block for long periods. Similarly, they 3077 * allow more efficient internal handling of cases in which 3078 * additional workers may be, but usually are not, needed to 3079 * ensure sufficient parallelism. Toward this end, 3080 * implementations of method {@code isReleasable} must be amenable 3081 * to repeated invocation. 3082 * 3083 * <p>For example, here is a ManagedBlocker based on a 3084 * ReentrantLock: 3085 * <pre> {@code 3086 * class ManagedLocker implements ManagedBlocker { 3087 * final ReentrantLock lock; 3088 * boolean hasLock = false; 3089 * ManagedLocker(ReentrantLock lock) { this.lock = lock; } 3090 * public boolean block() { 3091 * if (!hasLock) 3092 * lock.lock(); 3093 * return true; 3094 * } 3095 * public boolean isReleasable() { 3096 * return hasLock || (hasLock = lock.tryLock()); 3097 * } 3098 * }}</pre> 3099 * 3100 * <p>Here is a class that possibly blocks waiting for an 3101 * item on a given queue: 3102 * <pre> {@code 3103 * class QueueTaker<E> implements ManagedBlocker { 3104 * final BlockingQueue<E> queue; 3105 * volatile E item = null; 3106 * QueueTaker(BlockingQueue<E> q) { this.queue = q; } 3107 * public boolean block() throws InterruptedException { 3108 * if (item == null) 3109 * item = queue.take(); 3110 * return true; 3111 * } 3112 * public boolean isReleasable() { 3113 * return item != null || (item = queue.poll()) != null; 3114 * } 3115 * public E getItem() { // call after pool.managedBlock completes 3116 * return item; 3117 * } 3118 * }}</pre> 3119 */ 3120 public static interface ManagedBlocker { 3121 /** 3122 * Possibly blocks the current thread, for example waiting for 3123 * a lock or condition. 3124 * 3125 * @return {@code true} if no additional blocking is necessary 3126 * (i.e., if isReleasable would return true) 3127 * @throws InterruptedException if interrupted while waiting 3128 * (the method is not required to do so, but is allowed to) 3129 */ block()3130 boolean block() throws InterruptedException; 3131 3132 /** 3133 * Returns {@code true} if blocking is unnecessary. 3134 * @return {@code true} if blocking is unnecessary 3135 */ isReleasable()3136 boolean isReleasable(); 3137 } 3138 3139 /** 3140 * Blocks in accord with the given blocker. If the current thread 3141 * is a {@link ForkJoinWorkerThread}, this method possibly 3142 * arranges for a spare thread to be activated if necessary to 3143 * ensure sufficient parallelism while the current thread is blocked. 3144 * 3145 * <p>If the caller is not a {@link ForkJoinTask}, this method is 3146 * behaviorally equivalent to 3147 * <pre> {@code 3148 * while (!blocker.isReleasable()) 3149 * if (blocker.block()) 3150 * return; 3151 * }</pre> 3152 * 3153 * If the caller is a {@code ForkJoinTask}, then the pool may 3154 * first be expanded to ensure parallelism, and later adjusted. 3155 * 3156 * @param blocker the blocker 3157 * @throws InterruptedException if blocker.block did so 3158 */ managedBlock(ManagedBlocker blocker)3159 public static void managedBlock(ManagedBlocker blocker) 3160 throws InterruptedException { 3161 Thread t = Thread.currentThread(); 3162 if (t instanceof ForkJoinWorkerThread) { 3163 ForkJoinPool p = ((ForkJoinWorkerThread)t).pool; 3164 while (!blocker.isReleasable()) { 3165 if (p.tryCompensate(p.ctl)) { 3166 try { 3167 do {} while (!blocker.isReleasable() && 3168 !blocker.block()); 3169 } finally { 3170 p.incrementActiveCount(); 3171 } 3172 break; 3173 } 3174 } 3175 } 3176 else { 3177 do {} while (!blocker.isReleasable() && 3178 !blocker.block()); 3179 } 3180 } 3181 3182 // AbstractExecutorService overrides. These rely on undocumented 3183 // fact that ForkJoinTask.adapt returns ForkJoinTasks that also 3184 // implement RunnableFuture. 3185 newTaskFor(Runnable runnable, T value)3186 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 3187 return new ForkJoinTask.AdaptedRunnable<T>(runnable, value); 3188 } 3189 newTaskFor(Callable<T> callable)3190 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 3191 return new ForkJoinTask.AdaptedCallable<T>(callable); 3192 } 3193 3194 // Unsafe mechanics 3195 private static final sun.misc.Unsafe U; 3196 private static final long CTL; 3197 private static final long PARKBLOCKER; 3198 private static final int ABASE; 3199 private static final int ASHIFT; 3200 private static final long STEALCOUNT; 3201 private static final long PLOCK; 3202 private static final long INDEXSEED; 3203 private static final long QBASE; 3204 private static final long QLOCK; 3205 3206 static { 3207 // initialize field offsets for CAS etc 3208 try { 3209 U = sun.misc.Unsafe.getUnsafe(); 3210 Class<?> k = ForkJoinPool.class; 3211 CTL = U.objectFieldOffset 3212 (k.getDeclaredField("ctl")); 3213 STEALCOUNT = U.objectFieldOffset 3214 (k.getDeclaredField("stealCount")); 3215 PLOCK = U.objectFieldOffset 3216 (k.getDeclaredField("plock")); 3217 INDEXSEED = U.objectFieldOffset 3218 (k.getDeclaredField("indexSeed")); 3219 Class<?> tk = Thread.class; 3220 PARKBLOCKER = U.objectFieldOffset 3221 (tk.getDeclaredField("parkBlocker")); 3222 Class<?> wk = WorkQueue.class; 3223 QBASE = U.objectFieldOffset 3224 (wk.getDeclaredField("base")); 3225 QLOCK = U.objectFieldOffset 3226 (wk.getDeclaredField("qlock")); 3227 Class<?> ak = ForkJoinTask[].class; 3228 ABASE = U.arrayBaseOffset(ak); 3229 int scale = U.arrayIndexScale(ak); 3230 if ((scale & (scale - 1)) != 0) 3231 throw new Error("data type scale not a power of two"); 3232 ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); 3233 } catch (Exception e) { 3234 throw new Error(e); 3235 } 3236 3237 submitters = new ThreadLocal<Submitter>(); 3238 defaultForkJoinWorkerThreadFactory = 3239 new DefaultForkJoinWorkerThreadFactory(); 3240 modifyThreadPermission = new RuntimePermission("modifyThread"); 3241 3242 common = java.security.AccessController.doPrivileged 3243 (new java.security.PrivilegedAction<ForkJoinPool>() { 3244 public ForkJoinPool run() { return makeCommonPool(); }}); 3245 int par = common.parallelism; // report 1 even if threads disabled 3246 commonParallelism = par > 0 ? par : 1; 3247 } 3248 3249 /** 3250 * Creates and returns the common pool, respecting user settings 3251 * specified via system properties. 3252 */ makeCommonPool()3253 private static ForkJoinPool makeCommonPool() { 3254 int parallelism = -1; 3255 ForkJoinWorkerThreadFactory factory 3256 = defaultForkJoinWorkerThreadFactory; 3257 UncaughtExceptionHandler handler = null; 3258 try { // ignore exceptions in accessing/parsing properties 3259 String pp = System.getProperty 3260 ("java.util.concurrent.ForkJoinPool.common.parallelism"); 3261 String fp = System.getProperty 3262 ("java.util.concurrent.ForkJoinPool.common.threadFactory"); 3263 String hp = System.getProperty 3264 ("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); 3265 if (pp != null) 3266 parallelism = Integer.parseInt(pp); 3267 if (fp != null) 3268 factory = ((ForkJoinWorkerThreadFactory)ClassLoader. 3269 getSystemClassLoader().loadClass(fp).newInstance()); 3270 if (hp != null) 3271 handler = ((UncaughtExceptionHandler)ClassLoader. 3272 getSystemClassLoader().loadClass(hp).newInstance()); 3273 } catch (Exception ignore) { 3274 } 3275 3276 if (parallelism < 0 && // default 1 less than #cores 3277 (parallelism = Runtime.getRuntime().availableProcessors() - 1) < 0) 3278 parallelism = 0; 3279 if (parallelism > MAX_CAP) 3280 parallelism = MAX_CAP; 3281 return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, 3282 "ForkJoinPool.commonPool-worker-"); 3283 } 3284 3285 } 3286