1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/publicdomain/zero/1.0/
5  */
6 
7 package java.util.concurrent;
8 
9 import java.util.concurrent.atomic.AtomicReference;
10 import java.util.concurrent.locks.LockSupport;
11 
12 /**
13  * A reusable synchronization barrier, similar in functionality to
14  * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and
15  * {@link java.util.concurrent.CountDownLatch CountDownLatch}
16  * but supporting more flexible usage.
17  *
18  * <p><b>Registration.</b> Unlike the case for other barriers, the
19  * number of parties <em>registered</em> to synchronize on a phaser
20  * may vary over time.  Tasks may be registered at any time (using
21  * methods {@link #register}, {@link #bulkRegister}, or forms of
22  * constructors establishing initial numbers of parties), and
23  * optionally deregistered upon any arrival (using {@link
24  * #arriveAndDeregister}).  As is the case with most basic
25  * synchronization constructs, registration and deregistration affect
26  * only internal counts; they do not establish any further internal
27  * bookkeeping, so tasks cannot query whether they are registered.
28  * (However, you can introduce such bookkeeping by subclassing this
29  * class.)
30  *
31  * <p><b>Synchronization.</b> Like a {@code CyclicBarrier}, a {@code
32  * Phaser} may be repeatedly awaited.  Method {@link
33  * #arriveAndAwaitAdvance} has effect analogous to {@link
34  * java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each
35  * generation of a phaser has an associated phase number. The phase
36  * number starts at zero, and advances when all parties arrive at the
37  * phaser, wrapping around to zero after reaching {@code
38  * Integer.MAX_VALUE}. The use of phase numbers enables independent
39  * control of actions upon arrival at a phaser and upon awaiting
40  * others, via two kinds of methods that may be invoked by any
41  * registered party:
42  *
43  * <ul>
44  *
45  *   <li><b>Arrival.</b> Methods {@link #arrive} and
46  *       {@link #arriveAndDeregister} record arrival.  These methods
47  *       do not block, but return an associated <em>arrival phase
48  *       number</em>; that is, the phase number of the phaser to which
49  *       the arrival applied. When the final party for a given phase
50  *       arrives, an optional action is performed and the phase
51  *       advances.  These actions are performed by the party
52  *       triggering a phase advance, and are arranged by overriding
53  *       method {@link #onAdvance(int, int)}, which also controls
54  *       termination. Overriding this method is similar to, but more
55  *       flexible than, providing a barrier action to a {@code
56  *       CyclicBarrier}.
57  *
58  *   <li><b>Waiting.</b> Method {@link #awaitAdvance} requires an
59  *       argument indicating an arrival phase number, and returns when
60  *       the phaser advances to (or is already at) a different phase.
61  *       Unlike similar constructions using {@code CyclicBarrier},
62  *       method {@code awaitAdvance} continues to wait even if the
63  *       waiting thread is interrupted. Interruptible and timeout
64  *       versions are also available, but exceptions encountered while
65  *       tasks wait interruptibly or with timeout do not change the
66  *       state of the phaser. If necessary, you can perform any
67  *       associated recovery within handlers of those exceptions,
68  *       often after invoking {@code forceTermination}.  Phasers may
69  *       also be used by tasks executing in a {@link ForkJoinPool}.
70  *       Progress is ensured if the pool's parallelismLevel can
71  *       accommodate the maximum number of simultaneously blocked
72  *       parties.
73  *
74  * </ul>
75  *
76  * <p><b>Termination.</b> A phaser may enter a <em>termination</em>
77  * state, that may be checked using method {@link #isTerminated}. Upon
78  * termination, all synchronization methods immediately return without
79  * waiting for advance, as indicated by a negative return value.
80  * Similarly, attempts to register upon termination have no effect.
81  * Termination is triggered when an invocation of {@code onAdvance}
82  * returns {@code true}. The default implementation returns {@code
83  * true} if a deregistration has caused the number of registered
84  * parties to become zero.  As illustrated below, when phasers control
85  * actions with a fixed number of iterations, it is often convenient
86  * to override this method to cause termination when the current phase
87  * number reaches a threshold. Method {@link #forceTermination} is
88  * also available to abruptly release waiting threads and allow them
89  * to terminate.
90  *
91  * <p><b>Tiering.</b> Phasers may be <em>tiered</em> (i.e.,
92  * constructed in tree structures) to reduce contention. Phasers with
93  * large numbers of parties that would otherwise experience heavy
94  * synchronization contention costs may instead be set up so that
95  * groups of sub-phasers share a common parent.  This may greatly
96  * increase throughput even though it incurs greater per-operation
97  * overhead.
98  *
99  * <p>In a tree of tiered phasers, registration and deregistration of
100  * child phasers with their parent are managed automatically.
101  * Whenever the number of registered parties of a child phaser becomes
102  * non-zero (as established in the {@link #Phaser(Phaser,int)}
103  * constructor, {@link #register}, or {@link #bulkRegister}), the
104  * child phaser is registered with its parent.  Whenever the number of
105  * registered parties becomes zero as the result of an invocation of
106  * {@link #arriveAndDeregister}, the child phaser is deregistered
107  * from its parent.
108  *
109  * <p><b>Monitoring.</b> While synchronization methods may be invoked
110  * only by registered parties, the current state of a phaser may be
111  * monitored by any caller.  At any given moment there are {@link
112  * #getRegisteredParties} parties in total, of which {@link
113  * #getArrivedParties} have arrived at the current phase ({@link
114  * #getPhase}).  When the remaining ({@link #getUnarrivedParties})
115  * parties arrive, the phase advances.  The values returned by these
116  * methods may reflect transient states and so are not in general
117  * useful for synchronization control.  Method {@link #toString}
118  * returns snapshots of these state queries in a form convenient for
119  * informal monitoring.
120  *
121  * <p><b>Sample usages:</b>
122  *
123  * <p>A {@code Phaser} may be used instead of a {@code CountDownLatch}
124  * to control a one-shot action serving a variable number of parties.
125  * The typical idiom is for the method setting this up to first
126  * register, then start the actions, then deregister, as in:
127  *
128  * <pre> {@code
129  * void runTasks(List<Runnable> tasks) {
130  *   final Phaser phaser = new Phaser(1); // "1" to register self
131  *   // create and start threads
132  *   for (final Runnable task : tasks) {
133  *     phaser.register();
134  *     new Thread() {
135  *       public void run() {
136  *         phaser.arriveAndAwaitAdvance(); // await all creation
137  *         task.run();
138  *       }
139  *     }.start();
140  *   }
141  *
142  *   // allow threads to start and deregister self
143  *   phaser.arriveAndDeregister();
144  * }}</pre>
145  *
146  * <p>One way to cause a set of threads to repeatedly perform actions
147  * for a given number of iterations is to override {@code onAdvance}:
148  *
149  * <pre> {@code
150  * void startTasks(List<Runnable> tasks, final int iterations) {
151  *   final Phaser phaser = new Phaser() {
152  *     protected boolean onAdvance(int phase, int registeredParties) {
153  *       return phase >= iterations || registeredParties == 0;
154  *     }
155  *   };
156  *   phaser.register();
157  *   for (final Runnable task : tasks) {
158  *     phaser.register();
159  *     new Thread() {
160  *       public void run() {
161  *         do {
162  *           task.run();
163  *           phaser.arriveAndAwaitAdvance();
164  *         } while (!phaser.isTerminated());
165  *       }
166  *     }.start();
167  *   }
168  *   phaser.arriveAndDeregister(); // deregister self, don't wait
169  * }}</pre>
170  *
171  * If the main task must later await termination, it
172  * may re-register and then execute a similar loop:
173  * <pre> {@code
174  *   // ...
175  *   phaser.register();
176  *   while (!phaser.isTerminated())
177  *     phaser.arriveAndAwaitAdvance();}</pre>
178  *
179  * <p>Related constructions may be used to await particular phase numbers
180  * in contexts where you are sure that the phase will never wrap around
181  * {@code Integer.MAX_VALUE}. For example:
182  *
183  * <pre> {@code
184  * void awaitPhase(Phaser phaser, int phase) {
185  *   int p = phaser.register(); // assumes caller not already registered
186  *   while (p < phase) {
187  *     if (phaser.isTerminated())
188  *       // ... deal with unexpected termination
189  *     else
190  *       p = phaser.arriveAndAwaitAdvance();
191  *   }
192  *   phaser.arriveAndDeregister();
193  * }}</pre>
194  *
195  *
196  * <p>To create a set of {@code n} tasks using a tree of phasers, you
197  * could use code of the following form, assuming a Task class with a
198  * constructor accepting a {@code Phaser} that it registers with upon
199  * construction. After invocation of {@code build(new Task[n], 0, n,
200  * new Phaser())}, these tasks could then be started, for example by
201  * submitting to a pool:
202  *
203  * <pre> {@code
204  * void build(Task[] tasks, int lo, int hi, Phaser ph) {
205  *   if (hi - lo > TASKS_PER_PHASER) {
206  *     for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
207  *       int j = Math.min(i + TASKS_PER_PHASER, hi);
208  *       build(tasks, i, j, new Phaser(ph));
209  *     }
210  *   } else {
211  *     for (int i = lo; i < hi; ++i)
212  *       tasks[i] = new Task(ph);
213  *       // assumes new Task(ph) performs ph.register()
214  *   }
215  * }}</pre>
216  *
217  * The best value of {@code TASKS_PER_PHASER} depends mainly on
218  * expected synchronization rates. A value as low as four may
219  * be appropriate for extremely small per-phase task bodies (thus
220  * high rates), or up to hundreds for extremely large ones.
221  *
222  * <p><b>Implementation notes</b>: This implementation restricts the
223  * maximum number of parties to 65535. Attempts to register additional
224  * parties result in {@code IllegalStateException}. However, you can and
225  * should create tiered phasers to accommodate arbitrarily large sets
226  * of participants.
227  *
228  * @since 1.7
229  * @author Doug Lea
230  */
231 public class Phaser {
232     /*
233      * This class implements an extension of X10 "clocks".  Thanks to
234      * Vijay Saraswat for the idea, and to Vivek Sarkar for
235      * enhancements to extend functionality.
236      */
237 
238     /**
239      * Primary state representation, holding four bit-fields:
240      *
241      * unarrived  -- the number of parties yet to hit barrier (bits  0-15)
242      * parties    -- the number of parties to wait            (bits 16-31)
243      * phase      -- the generation of the barrier            (bits 32-62)
244      * terminated -- set if barrier is terminated             (bit  63 / sign)
245      *
246      * Except that a phaser with no registered parties is
247      * distinguished by the otherwise illegal state of having zero
248      * parties and one unarrived parties (encoded as EMPTY below).
249      *
250      * To efficiently maintain atomicity, these values are packed into
251      * a single (atomic) long. Good performance relies on keeping
252      * state decoding and encoding simple, and keeping race windows
253      * short.
254      *
255      * All state updates are performed via CAS except initial
256      * registration of a sub-phaser (i.e., one with a non-null
257      * parent).  In this (relatively rare) case, we use built-in
258      * synchronization to lock while first registering with its
259      * parent.
260      *
261      * The phase of a subphaser is allowed to lag that of its
262      * ancestors until it is actually accessed -- see method
263      * reconcileState.
264      */
265     private volatile long state;
266 
267     private static final int  MAX_PARTIES     = 0xffff;
268     private static final int  MAX_PHASE       = Integer.MAX_VALUE;
269     private static final int  PARTIES_SHIFT   = 16;
270     private static final int  PHASE_SHIFT     = 32;
271     private static final int  UNARRIVED_MASK  = 0xffff;      // to mask ints
272     private static final long PARTIES_MASK    = 0xffff0000L; // to mask longs
273     private static final long COUNTS_MASK     = 0xffffffffL;
274     private static final long TERMINATION_BIT = 1L << 63;
275 
276     // some special values
277     private static final int  ONE_ARRIVAL     = 1;
278     private static final int  ONE_PARTY       = 1 << PARTIES_SHIFT;
279     private static final int  ONE_DEREGISTER  = ONE_ARRIVAL|ONE_PARTY;
280     private static final int  EMPTY           = 1;
281 
282     // The following unpacking methods are usually manually inlined
283 
unarrivedOf(long s)284     private static int unarrivedOf(long s) {
285         int counts = (int)s;
286         return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
287     }
288 
partiesOf(long s)289     private static int partiesOf(long s) {
290         return (int)s >>> PARTIES_SHIFT;
291     }
292 
phaseOf(long s)293     private static int phaseOf(long s) {
294         return (int)(s >>> PHASE_SHIFT);
295     }
296 
arrivedOf(long s)297     private static int arrivedOf(long s) {
298         int counts = (int)s;
299         return (counts == EMPTY) ? 0 :
300             (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
301     }
302 
303     /**
304      * The parent of this phaser, or null if none.
305      */
306     private final Phaser parent;
307 
308     /**
309      * The root of phaser tree. Equals this if not in a tree.
310      */
311     private final Phaser root;
312 
313     /**
314      * Heads of Treiber stacks for waiting threads. To eliminate
315      * contention when releasing some threads while adding others, we
316      * use two of them, alternating across even and odd phases.
317      * Subphasers share queues with root to speed up releases.
318      */
319     private final AtomicReference<QNode> evenQ;
320     private final AtomicReference<QNode> oddQ;
321 
queueFor(int phase)322     private AtomicReference<QNode> queueFor(int phase) {
323         return ((phase & 1) == 0) ? evenQ : oddQ;
324     }
325 
326     /**
327      * Returns message string for bounds exceptions on arrival.
328      */
badArrive(long s)329     private String badArrive(long s) {
330         return "Attempted arrival of unregistered party for " +
331             stateToString(s);
332     }
333 
334     /**
335      * Returns message string for bounds exceptions on registration.
336      */
badRegister(long s)337     private String badRegister(long s) {
338         return "Attempt to register more than " +
339             MAX_PARTIES + " parties for " + stateToString(s);
340     }
341 
342     /**
343      * Main implementation for methods arrive and arriveAndDeregister.
344      * Manually tuned to speed up and minimize race windows for the
345      * common case of just decrementing unarrived field.
346      *
347      * @param adjust value to subtract from state;
348      *               ONE_ARRIVAL for arrive,
349      *               ONE_DEREGISTER for arriveAndDeregister
350      */
doArrive(int adjust)351     private int doArrive(int adjust) {
352         final Phaser root = this.root;
353         for (;;) {
354             long s = (root == this) ? state : reconcileState();
355             int phase = (int)(s >>> PHASE_SHIFT);
356             if (phase < 0)
357                 return phase;
358             int counts = (int)s;
359             int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
360             if (unarrived <= 0)
361                 throw new IllegalStateException(badArrive(s));
362             if (U.compareAndSwapLong(this, STATE, s, s-=adjust)) {
363                 if (unarrived == 1) {
364                     long n = s & PARTIES_MASK;  // base of next state
365                     int nextUnarrived = (int)n >>> PARTIES_SHIFT;
366                     if (root == this) {
367                         if (onAdvance(phase, nextUnarrived))
368                             n |= TERMINATION_BIT;
369                         else if (nextUnarrived == 0)
370                             n |= EMPTY;
371                         else
372                             n |= nextUnarrived;
373                         int nextPhase = (phase + 1) & MAX_PHASE;
374                         n |= (long)nextPhase << PHASE_SHIFT;
375                         U.compareAndSwapLong(this, STATE, s, n);
376                         releaseWaiters(phase);
377                     }
378                     else if (nextUnarrived == 0) { // propagate deregistration
379                         phase = parent.doArrive(ONE_DEREGISTER);
380                         U.compareAndSwapLong(this, STATE, s, s | EMPTY);
381                     }
382                     else
383                         phase = parent.doArrive(ONE_ARRIVAL);
384                 }
385                 return phase;
386             }
387         }
388     }
389 
390     /**
391      * Implementation of register, bulkRegister.
392      *
393      * @param registrations number to add to both parties and
394      * unarrived fields. Must be greater than zero.
395      */
doRegister(int registrations)396     private int doRegister(int registrations) {
397         // adjustment to state
398         long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
399         final Phaser parent = this.parent;
400         int phase;
401         for (;;) {
402             long s = (parent == null) ? state : reconcileState();
403             int counts = (int)s;
404             int parties = counts >>> PARTIES_SHIFT;
405             int unarrived = counts & UNARRIVED_MASK;
406             if (registrations > MAX_PARTIES - parties)
407                 throw new IllegalStateException(badRegister(s));
408             phase = (int)(s >>> PHASE_SHIFT);
409             if (phase < 0)
410                 break;
411             if (counts != EMPTY) {                  // not 1st registration
412                 if (parent == null || reconcileState() == s) {
413                     if (unarrived == 0)             // wait out advance
414                         root.internalAwaitAdvance(phase, null);
415                     else if (U.compareAndSwapLong(this, STATE, s, s + adjust))
416                         break;
417                 }
418             }
419             else if (parent == null) {              // 1st root registration
420                 long next = ((long)phase << PHASE_SHIFT) | adjust;
421                 if (U.compareAndSwapLong(this, STATE, s, next))
422                     break;
423             }
424             else {
425                 synchronized (this) {               // 1st sub registration
426                     if (state == s) {               // recheck under lock
427                         phase = parent.doRegister(1);
428                         if (phase < 0)
429                             break;
430                         // finish registration whenever parent registration
431                         // succeeded, even when racing with termination,
432                         // since these are part of the same "transaction".
433                         while (!U.compareAndSwapLong
434                                (this, STATE, s,
435                                 ((long)phase << PHASE_SHIFT) | adjust)) {
436                             s = state;
437                             phase = (int)(root.state >>> PHASE_SHIFT);
438                             // assert (int)s == EMPTY;
439                         }
440                         break;
441                     }
442                 }
443             }
444         }
445         return phase;
446     }
447 
448     /**
449      * Resolves lagged phase propagation from root if necessary.
450      * Reconciliation normally occurs when root has advanced but
451      * subphasers have not yet done so, in which case they must finish
452      * their own advance by setting unarrived to parties (or if
453      * parties is zero, resetting to unregistered EMPTY state).
454      *
455      * @return reconciled state
456      */
reconcileState()457     private long reconcileState() {
458         final Phaser root = this.root;
459         long s = state;
460         if (root != this) {
461             int phase, p;
462             // CAS to root phase with current parties, tripping unarrived
463             while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
464                    (int)(s >>> PHASE_SHIFT) &&
465                    !U.compareAndSwapLong
466                    (this, STATE, s,
467                     s = (((long)phase << PHASE_SHIFT) |
468                          ((phase < 0) ? (s & COUNTS_MASK) :
469                           (((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY :
470                            ((s & PARTIES_MASK) | p))))))
471                 s = state;
472         }
473         return s;
474     }
475 
476     /**
477      * Creates a new phaser with no initially registered parties, no
478      * parent, and initial phase number 0. Any thread using this
479      * phaser will need to first register for it.
480      */
Phaser()481     public Phaser() {
482         this(null, 0);
483     }
484 
485     /**
486      * Creates a new phaser with the given number of registered
487      * unarrived parties, no parent, and initial phase number 0.
488      *
489      * @param parties the number of parties required to advance to the
490      * next phase
491      * @throws IllegalArgumentException if parties less than zero
492      * or greater than the maximum number of parties supported
493      */
Phaser(int parties)494     public Phaser(int parties) {
495         this(null, parties);
496     }
497 
498     /**
499      * Equivalent to {@link #Phaser(Phaser, int) Phaser(parent, 0)}.
500      *
501      * @param parent the parent phaser
502      */
Phaser(Phaser parent)503     public Phaser(Phaser parent) {
504         this(parent, 0);
505     }
506 
507     /**
508      * Creates a new phaser with the given parent and number of
509      * registered unarrived parties.  When the given parent is non-null
510      * and the given number of parties is greater than zero, this
511      * child phaser is registered with its parent.
512      *
513      * @param parent the parent phaser
514      * @param parties the number of parties required to advance to the
515      * next phase
516      * @throws IllegalArgumentException if parties less than zero
517      * or greater than the maximum number of parties supported
518      */
Phaser(Phaser parent, int parties)519     public Phaser(Phaser parent, int parties) {
520         if (parties >>> PARTIES_SHIFT != 0)
521             throw new IllegalArgumentException("Illegal number of parties");
522         int phase = 0;
523         this.parent = parent;
524         if (parent != null) {
525             final Phaser root = parent.root;
526             this.root = root;
527             this.evenQ = root.evenQ;
528             this.oddQ = root.oddQ;
529             if (parties != 0)
530                 phase = parent.doRegister(1);
531         }
532         else {
533             this.root = this;
534             this.evenQ = new AtomicReference<QNode>();
535             this.oddQ = new AtomicReference<QNode>();
536         }
537         this.state = (parties == 0) ? (long)EMPTY :
538             ((long)phase << PHASE_SHIFT) |
539             ((long)parties << PARTIES_SHIFT) |
540             ((long)parties);
541     }
542 
543     /**
544      * Adds a new unarrived party to this phaser.  If an ongoing
545      * invocation of {@link #onAdvance} is in progress, this method
546      * may await its completion before returning.  If this phaser has
547      * a parent, and this phaser previously had no registered parties,
548      * this child phaser is also registered with its parent. If
549      * this phaser is terminated, the attempt to register has
550      * no effect, and a negative value is returned.
551      *
552      * @return the arrival phase number to which this registration
553      * applied.  If this value is negative, then this phaser has
554      * terminated, in which case registration has no effect.
555      * @throws IllegalStateException if attempting to register more
556      * than the maximum supported number of parties
557      */
register()558     public int register() {
559         return doRegister(1);
560     }
561 
562     /**
563      * Adds the given number of new unarrived parties to this phaser.
564      * If an ongoing invocation of {@link #onAdvance} is in progress,
565      * this method may await its completion before returning.  If this
566      * phaser has a parent, and the given number of parties is greater
567      * than zero, and this phaser previously had no registered
568      * parties, this child phaser is also registered with its parent.
569      * If this phaser is terminated, the attempt to register has no
570      * effect, and a negative value is returned.
571      *
572      * @param parties the number of additional parties required to
573      * advance to the next phase
574      * @return the arrival phase number to which this registration
575      * applied.  If this value is negative, then this phaser has
576      * terminated, in which case registration has no effect.
577      * @throws IllegalStateException if attempting to register more
578      * than the maximum supported number of parties
579      * @throws IllegalArgumentException if {@code parties < 0}
580      */
bulkRegister(int parties)581     public int bulkRegister(int parties) {
582         if (parties < 0)
583             throw new IllegalArgumentException();
584         if (parties == 0)
585             return getPhase();
586         return doRegister(parties);
587     }
588 
589     /**
590      * Arrives at this phaser, without waiting for others to arrive.
591      *
592      * <p>It is a usage error for an unregistered party to invoke this
593      * method.  However, this error may result in an {@code
594      * IllegalStateException} only upon some subsequent operation on
595      * this phaser, if ever.
596      *
597      * @return the arrival phase number, or a negative value if terminated
598      * @throws IllegalStateException if not terminated and the number
599      * of unarrived parties would become negative
600      */
arrive()601     public int arrive() {
602         return doArrive(ONE_ARRIVAL);
603     }
604 
605     /**
606      * Arrives at this phaser and deregisters from it without waiting
607      * for others to arrive. Deregistration reduces the number of
608      * parties required to advance in future phases.  If this phaser
609      * has a parent, and deregistration causes this phaser to have
610      * zero parties, this phaser is also deregistered from its parent.
611      *
612      * <p>It is a usage error for an unregistered party to invoke this
613      * method.  However, this error may result in an {@code
614      * IllegalStateException} only upon some subsequent operation on
615      * this phaser, if ever.
616      *
617      * @return the arrival phase number, or a negative value if terminated
618      * @throws IllegalStateException if not terminated and the number
619      * of registered or unarrived parties would become negative
620      */
arriveAndDeregister()621     public int arriveAndDeregister() {
622         return doArrive(ONE_DEREGISTER);
623     }
624 
625     /**
626      * Arrives at this phaser and awaits others. Equivalent in effect
627      * to {@code awaitAdvance(arrive())}.  If you need to await with
628      * interruption or timeout, you can arrange this with an analogous
629      * construction using one of the other forms of the {@code
630      * awaitAdvance} method.  If instead you need to deregister upon
631      * arrival, use {@code awaitAdvance(arriveAndDeregister())}.
632      *
633      * <p>It is a usage error for an unregistered party to invoke this
634      * method.  However, this error may result in an {@code
635      * IllegalStateException} only upon some subsequent operation on
636      * this phaser, if ever.
637      *
638      * @return the arrival phase number, or the (negative)
639      * {@linkplain #getPhase() current phase} if terminated
640      * @throws IllegalStateException if not terminated and the number
641      * of unarrived parties would become negative
642      */
arriveAndAwaitAdvance()643     public int arriveAndAwaitAdvance() {
644         // Specialization of doArrive+awaitAdvance eliminating some reads/paths
645         final Phaser root = this.root;
646         for (;;) {
647             long s = (root == this) ? state : reconcileState();
648             int phase = (int)(s >>> PHASE_SHIFT);
649             if (phase < 0)
650                 return phase;
651             int counts = (int)s;
652             int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
653             if (unarrived <= 0)
654                 throw new IllegalStateException(badArrive(s));
655             if (U.compareAndSwapLong(this, STATE, s, s -= ONE_ARRIVAL)) {
656                 if (unarrived > 1)
657                     return root.internalAwaitAdvance(phase, null);
658                 if (root != this)
659                     return parent.arriveAndAwaitAdvance();
660                 long n = s & PARTIES_MASK;  // base of next state
661                 int nextUnarrived = (int)n >>> PARTIES_SHIFT;
662                 if (onAdvance(phase, nextUnarrived))
663                     n |= TERMINATION_BIT;
664                 else if (nextUnarrived == 0)
665                     n |= EMPTY;
666                 else
667                     n |= nextUnarrived;
668                 int nextPhase = (phase + 1) & MAX_PHASE;
669                 n |= (long)nextPhase << PHASE_SHIFT;
670                 if (!U.compareAndSwapLong(this, STATE, s, n))
671                     return (int)(state >>> PHASE_SHIFT); // terminated
672                 releaseWaiters(phase);
673                 return nextPhase;
674             }
675         }
676     }
677 
678     /**
679      * Awaits the phase of this phaser to advance from the given phase
680      * value, returning immediately if the current phase is not equal
681      * to the given phase value or this phaser is terminated.
682      *
683      * @param phase an arrival phase number, or negative value if
684      * terminated; this argument is normally the value returned by a
685      * previous call to {@code arrive} or {@code arriveAndDeregister}.
686      * @return the next arrival phase number, or the argument if it is
687      * negative, or the (negative) {@linkplain #getPhase() current phase}
688      * if terminated
689      */
awaitAdvance(int phase)690     public int awaitAdvance(int phase) {
691         final Phaser root = this.root;
692         long s = (root == this) ? state : reconcileState();
693         int p = (int)(s >>> PHASE_SHIFT);
694         if (phase < 0)
695             return phase;
696         if (p == phase)
697             return root.internalAwaitAdvance(phase, null);
698         return p;
699     }
700 
701     /**
702      * Awaits the phase of this phaser to advance from the given phase
703      * value, throwing {@code InterruptedException} if interrupted
704      * while waiting, or returning immediately if the current phase is
705      * not equal to the given phase value or this phaser is
706      * terminated.
707      *
708      * @param phase an arrival phase number, or negative value if
709      * terminated; this argument is normally the value returned by a
710      * previous call to {@code arrive} or {@code arriveAndDeregister}.
711      * @return the next arrival phase number, or the argument if it is
712      * negative, or the (negative) {@linkplain #getPhase() current phase}
713      * if terminated
714      * @throws InterruptedException if thread interrupted while waiting
715      */
awaitAdvanceInterruptibly(int phase)716     public int awaitAdvanceInterruptibly(int phase)
717         throws InterruptedException {
718         final Phaser root = this.root;
719         long s = (root == this) ? state : reconcileState();
720         int p = (int)(s >>> PHASE_SHIFT);
721         if (phase < 0)
722             return phase;
723         if (p == phase) {
724             QNode node = new QNode(this, phase, true, false, 0L);
725             p = root.internalAwaitAdvance(phase, node);
726             if (node.wasInterrupted)
727                 throw new InterruptedException();
728         }
729         return p;
730     }
731 
732     /**
733      * Awaits the phase of this phaser to advance from the given phase
734      * value or the given timeout to elapse, throwing {@code
735      * InterruptedException} if interrupted while waiting, or
736      * returning immediately if the current phase is not equal to the
737      * given phase value or this phaser is terminated.
738      *
739      * @param phase an arrival phase number, or negative value if
740      * terminated; this argument is normally the value returned by a
741      * previous call to {@code arrive} or {@code arriveAndDeregister}.
742      * @param timeout how long to wait before giving up, in units of
743      *        {@code unit}
744      * @param unit a {@code TimeUnit} determining how to interpret the
745      *        {@code timeout} parameter
746      * @return the next arrival phase number, or the argument if it is
747      * negative, or the (negative) {@linkplain #getPhase() current phase}
748      * if terminated
749      * @throws InterruptedException if thread interrupted while waiting
750      * @throws TimeoutException if timed out while waiting
751      */
awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)752     public int awaitAdvanceInterruptibly(int phase,
753                                          long timeout, TimeUnit unit)
754         throws InterruptedException, TimeoutException {
755         long nanos = unit.toNanos(timeout);
756         final Phaser root = this.root;
757         long s = (root == this) ? state : reconcileState();
758         int p = (int)(s >>> PHASE_SHIFT);
759         if (phase < 0)
760             return phase;
761         if (p == phase) {
762             QNode node = new QNode(this, phase, true, true, nanos);
763             p = root.internalAwaitAdvance(phase, node);
764             if (node.wasInterrupted)
765                 throw new InterruptedException();
766             else if (p == phase)
767                 throw new TimeoutException();
768         }
769         return p;
770     }
771 
772     /**
773      * Forces this phaser to enter termination state.  Counts of
774      * registered parties are unaffected.  If this phaser is a member
775      * of a tiered set of phasers, then all of the phasers in the set
776      * are terminated.  If this phaser is already terminated, this
777      * method has no effect.  This method may be useful for
778      * coordinating recovery after one or more tasks encounter
779      * unexpected exceptions.
780      */
forceTermination()781     public void forceTermination() {
782         // Only need to change root state
783         final Phaser root = this.root;
784         long s;
785         while ((s = root.state) >= 0) {
786             if (U.compareAndSwapLong(root, STATE, s, s | TERMINATION_BIT)) {
787                 // signal all threads
788                 releaseWaiters(0); // Waiters on evenQ
789                 releaseWaiters(1); // Waiters on oddQ
790                 return;
791             }
792         }
793     }
794 
795     /**
796      * Returns the current phase number. The maximum phase number is
797      * {@code Integer.MAX_VALUE}, after which it restarts at
798      * zero. Upon termination, the phase number is negative,
799      * in which case the prevailing phase prior to termination
800      * may be obtained via {@code getPhase() + Integer.MIN_VALUE}.
801      *
802      * @return the phase number, or a negative value if terminated
803      */
getPhase()804     public final int getPhase() {
805         return (int)(root.state >>> PHASE_SHIFT);
806     }
807 
808     /**
809      * Returns the number of parties registered at this phaser.
810      *
811      * @return the number of parties
812      */
getRegisteredParties()813     public int getRegisteredParties() {
814         return partiesOf(state);
815     }
816 
817     /**
818      * Returns the number of registered parties that have arrived at
819      * the current phase of this phaser. If this phaser has terminated,
820      * the returned value is meaningless and arbitrary.
821      *
822      * @return the number of arrived parties
823      */
getArrivedParties()824     public int getArrivedParties() {
825         return arrivedOf(reconcileState());
826     }
827 
828     /**
829      * Returns the number of registered parties that have not yet
830      * arrived at the current phase of this phaser. If this phaser has
831      * terminated, the returned value is meaningless and arbitrary.
832      *
833      * @return the number of unarrived parties
834      */
getUnarrivedParties()835     public int getUnarrivedParties() {
836         return unarrivedOf(reconcileState());
837     }
838 
839     /**
840      * Returns the parent of this phaser, or {@code null} if none.
841      *
842      * @return the parent of this phaser, or {@code null} if none
843      */
getParent()844     public Phaser getParent() {
845         return parent;
846     }
847 
848     /**
849      * Returns the root ancestor of this phaser, which is the same as
850      * this phaser if it has no parent.
851      *
852      * @return the root ancestor of this phaser
853      */
getRoot()854     public Phaser getRoot() {
855         return root;
856     }
857 
858     /**
859      * Returns {@code true} if this phaser has been terminated.
860      *
861      * @return {@code true} if this phaser has been terminated
862      */
isTerminated()863     public boolean isTerminated() {
864         return root.state < 0L;
865     }
866 
867     /**
868      * Overridable method to perform an action upon impending phase
869      * advance, and to control termination. This method is invoked
870      * upon arrival of the party advancing this phaser (when all other
871      * waiting parties are dormant).  If this method returns {@code
872      * true}, this phaser will be set to a final termination state
873      * upon advance, and subsequent calls to {@link #isTerminated}
874      * will return true. Any (unchecked) Exception or Error thrown by
875      * an invocation of this method is propagated to the party
876      * attempting to advance this phaser, in which case no advance
877      * occurs.
878      *
879      * <p>The arguments to this method provide the state of the phaser
880      * prevailing for the current transition.  The effects of invoking
881      * arrival, registration, and waiting methods on this phaser from
882      * within {@code onAdvance} are unspecified and should not be
883      * relied on.
884      *
885      * <p>If this phaser is a member of a tiered set of phasers, then
886      * {@code onAdvance} is invoked only for its root phaser on each
887      * advance.
888      *
889      * <p>To support the most common use cases, the default
890      * implementation of this method returns {@code true} when the
891      * number of registered parties has become zero as the result of a
892      * party invoking {@code arriveAndDeregister}.  You can disable
893      * this behavior, thus enabling continuation upon future
894      * registrations, by overriding this method to always return
895      * {@code false}:
896      *
897      * <pre> {@code
898      * Phaser phaser = new Phaser() {
899      *   protected boolean onAdvance(int phase, int parties) { return false; }
900      * }}</pre>
901      *
902      * @param phase the current phase number on entry to this method,
903      * before this phaser is advanced
904      * @param registeredParties the current number of registered parties
905      * @return {@code true} if this phaser should terminate
906      */
onAdvance(int phase, int registeredParties)907     protected boolean onAdvance(int phase, int registeredParties) {
908         return registeredParties == 0;
909     }
910 
911     /**
912      * Returns a string identifying this phaser, as well as its
913      * state.  The state, in brackets, includes the String {@code
914      * "phase = "} followed by the phase number, {@code "parties = "}
915      * followed by the number of registered parties, and {@code
916      * "arrived = "} followed by the number of arrived parties.
917      *
918      * @return a string identifying this phaser, as well as its state
919      */
toString()920     public String toString() {
921         return stateToString(reconcileState());
922     }
923 
924     /**
925      * Implementation of toString and string-based error messages.
926      */
stateToString(long s)927     private String stateToString(long s) {
928         return super.toString() +
929             "[phase = " + phaseOf(s) +
930             " parties = " + partiesOf(s) +
931             " arrived = " + arrivedOf(s) + "]";
932     }
933 
934     // Waiting mechanics
935 
936     /**
937      * Removes and signals threads from queue for phase.
938      */
releaseWaiters(int phase)939     private void releaseWaiters(int phase) {
940         QNode q;   // first element of queue
941         Thread t;  // its thread
942         AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
943         while ((q = head.get()) != null &&
944                q.phase != (int)(root.state >>> PHASE_SHIFT)) {
945             if (head.compareAndSet(q, q.next) &&
946                 (t = q.thread) != null) {
947                 q.thread = null;
948                 LockSupport.unpark(t);
949             }
950         }
951     }
952 
953     /**
954      * Variant of releaseWaiters that additionally tries to remove any
955      * nodes no longer waiting for advance due to timeout or
956      * interrupt. Currently, nodes are removed only if they are at
957      * head of queue, which suffices to reduce memory footprint in
958      * most usages.
959      *
960      * @return current phase on exit
961      */
abortWait(int phase)962     private int abortWait(int phase) {
963         AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
964         for (;;) {
965             Thread t;
966             QNode q = head.get();
967             int p = (int)(root.state >>> PHASE_SHIFT);
968             if (q == null || ((t = q.thread) != null && q.phase == p))
969                 return p;
970             if (head.compareAndSet(q, q.next) && t != null) {
971                 q.thread = null;
972                 LockSupport.unpark(t);
973             }
974         }
975     }
976 
977     /** The number of CPUs, for spin control */
978     private static final int NCPU = Runtime.getRuntime().availableProcessors();
979 
980     /**
981      * The number of times to spin before blocking while waiting for
982      * advance, per arrival while waiting. On multiprocessors, fully
983      * blocking and waking up a large number of threads all at once is
984      * usually a very slow process, so we use rechargeable spins to
985      * avoid it when threads regularly arrive: When a thread in
986      * internalAwaitAdvance notices another arrival before blocking,
987      * and there appear to be enough CPUs available, it spins
988      * SPINS_PER_ARRIVAL more times before blocking. The value trades
989      * off good-citizenship vs big unnecessary slowdowns.
990      */
991     static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;
992 
993     /**
994      * Possibly blocks and waits for phase to advance unless aborted.
995      * Call only on root phaser.
996      *
997      * @param phase current phase
998      * @param node if non-null, the wait node to track interrupt and timeout;
999      * if null, denotes noninterruptible wait
1000      * @return current phase
1001      */
internalAwaitAdvance(int phase, QNode node)1002     private int internalAwaitAdvance(int phase, QNode node) {
1003         // assert root == this;
1004         releaseWaiters(phase-1);          // ensure old queue clean
1005         boolean queued = false;           // true when node is enqueued
1006         int lastUnarrived = 0;            // to increase spins upon change
1007         int spins = SPINS_PER_ARRIVAL;
1008         long s;
1009         int p;
1010         while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
1011             if (node == null) {           // spinning in noninterruptible mode
1012                 int unarrived = (int)s & UNARRIVED_MASK;
1013                 if (unarrived != lastUnarrived &&
1014                     (lastUnarrived = unarrived) < NCPU)
1015                     spins += SPINS_PER_ARRIVAL;
1016                 boolean interrupted = Thread.interrupted();
1017                 if (interrupted || --spins < 0) { // need node to record intr
1018                     node = new QNode(this, phase, false, false, 0L);
1019                     node.wasInterrupted = interrupted;
1020                 }
1021             }
1022             else if (node.isReleasable()) // done or aborted
1023                 break;
1024             else if (!queued) {           // push onto queue
1025                 AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
1026                 QNode q = node.next = head.get();
1027                 if ((q == null || q.phase == phase) &&
1028                     (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
1029                     queued = head.compareAndSet(q, node);
1030             }
1031             else {
1032                 try {
1033                     ForkJoinPool.managedBlock(node);
1034                 } catch (InterruptedException cantHappen) {
1035                     node.wasInterrupted = true;
1036                 }
1037             }
1038         }
1039 
1040         if (node != null) {
1041             if (node.thread != null)
1042                 node.thread = null;       // avoid need for unpark()
1043             if (node.wasInterrupted && !node.interruptible)
1044                 Thread.currentThread().interrupt();
1045             if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
1046                 return abortWait(phase); // possibly clean up on abort
1047         }
1048         releaseWaiters(phase);
1049         return p;
1050     }
1051 
1052     /**
1053      * Wait nodes for Treiber stack representing wait queue.
1054      */
1055     static final class QNode implements ForkJoinPool.ManagedBlocker {
1056         final Phaser phaser;
1057         final int phase;
1058         final boolean interruptible;
1059         final boolean timed;
1060         boolean wasInterrupted;
1061         long nanos;
1062         final long deadline;
1063         volatile Thread thread; // nulled to cancel wait
1064         QNode next;
1065 
QNode(Phaser phaser, int phase, boolean interruptible, boolean timed, long nanos)1066         QNode(Phaser phaser, int phase, boolean interruptible,
1067               boolean timed, long nanos) {
1068             this.phaser = phaser;
1069             this.phase = phase;
1070             this.interruptible = interruptible;
1071             this.nanos = nanos;
1072             this.timed = timed;
1073             this.deadline = timed ? System.nanoTime() + nanos : 0L;
1074             thread = Thread.currentThread();
1075         }
1076 
isReleasable()1077         public boolean isReleasable() {
1078             if (thread == null)
1079                 return true;
1080             if (phaser.getPhase() != phase) {
1081                 thread = null;
1082                 return true;
1083             }
1084             if (Thread.interrupted())
1085                 wasInterrupted = true;
1086             if (wasInterrupted && interruptible) {
1087                 thread = null;
1088                 return true;
1089             }
1090             if (timed &&
1091                 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
1092                 thread = null;
1093                 return true;
1094             }
1095             return false;
1096         }
1097 
block()1098         public boolean block() {
1099             while (!isReleasable()) {
1100                 if (timed)
1101                     LockSupport.parkNanos(this, nanos);
1102                 else
1103                     LockSupport.park(this);
1104             }
1105             return true;
1106         }
1107     }
1108 
1109     // Unsafe mechanics
1110 
1111     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1112     private static final long STATE;
1113     static {
1114         try {
1115             STATE = U.objectFieldOffset
1116                 (Phaser.class.getDeclaredField("state"));
1117         } catch (ReflectiveOperationException e) {
1118             throw new Error(e);
1119         }
1120 
1121         // Reduce the risk of rare disastrous classloading in first call to
1122         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1123         Class<?> ensureLoaded = LockSupport.class;
1124     }
1125 }
1126