1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/publicdomain/zero/1.0/
5  */
6 
7 package java.util.concurrent;
8 
9 import java.util.concurrent.locks.Condition;
10 import java.util.concurrent.locks.ReentrantLock;
11 
12 /**
13  * A synchronization aid that allows a set of threads to all wait for
14  * each other to reach a common barrier point.  CyclicBarriers are
15  * useful in programs involving a fixed sized party of threads that
16  * must occasionally wait for each other. The barrier is called
17  * <em>cyclic</em> because it can be re-used after the waiting threads
18  * are released.
19  *
20  * <p>A {@code CyclicBarrier} supports an optional {@link Runnable} command
21  * that is run once per barrier point, after the last thread in the party
22  * arrives, but before any threads are released.
23  * This <em>barrier action</em> is useful
24  * for updating shared-state before any of the parties continue.
25  *
26  * <p><b>Sample usage:</b> Here is an example of using a barrier in a
27  * parallel decomposition design:
28  *
29  * <pre> {@code
30  * class Solver {
31  *   final int N;
32  *   final float[][] data;
33  *   final CyclicBarrier barrier;
34  *
35  *   class Worker implements Runnable {
36  *     int myRow;
37  *     Worker(int row) { myRow = row; }
38  *     public void run() {
39  *       while (!done()) {
40  *         processRow(myRow);
41  *
42  *         try {
43  *           barrier.await();
44  *         } catch (InterruptedException ex) {
45  *           return;
46  *         } catch (BrokenBarrierException ex) {
47  *           return;
48  *         }
49  *       }
50  *     }
51  *   }
52  *
53  *   public Solver(float[][] matrix) {
54  *     data = matrix;
55  *     N = matrix.length;
56  *     Runnable barrierAction =
57  *       new Runnable() { public void run() { mergeRows(...); }};
58  *     barrier = new CyclicBarrier(N, barrierAction);
59  *
60  *     List<Thread> threads = new ArrayList<>(N);
61  *     for (int i = 0; i < N; i++) {
62  *       Thread thread = new Thread(new Worker(i));
63  *       threads.add(thread);
64  *       thread.start();
65  *     }
66  *
67  *     // wait until done
68  *     for (Thread thread : threads)
69  *       thread.join();
70  *   }
71  * }}</pre>
72  *
73  * Here, each worker thread processes a row of the matrix then waits at the
74  * barrier until all rows have been processed. When all rows are processed
75  * the supplied {@link Runnable} barrier action is executed and merges the
76  * rows. If the merger
77  * determines that a solution has been found then {@code done()} will return
78  * {@code true} and each worker will terminate.
79  *
80  * <p>If the barrier action does not rely on the parties being suspended when
81  * it is executed, then any of the threads in the party could execute that
82  * action when it is released. To facilitate this, each invocation of
83  * {@link #await} returns the arrival index of that thread at the barrier.
84  * You can then choose which thread should execute the barrier action, for
85  * example:
86  * <pre> {@code
87  * if (barrier.await() == 0) {
88  *   // log the completion of this iteration
89  * }}</pre>
90  *
91  * <p>The {@code CyclicBarrier} uses an all-or-none breakage model
92  * for failed synchronization attempts: If a thread leaves a barrier
93  * point prematurely because of interruption, failure, or timeout, all
94  * other threads waiting at that barrier point will also leave
95  * abnormally via {@link BrokenBarrierException} (or
96  * {@link InterruptedException} if they too were interrupted at about
97  * the same time).
98  *
99  * <p>Memory consistency effects: Actions in a thread prior to calling
100  * {@code await()}
101  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
102  * actions that are part of the barrier action, which in turn
103  * <i>happen-before</i> actions following a successful return from the
104  * corresponding {@code await()} in other threads.
105  *
106  * @since 1.5
107  * @see CountDownLatch
108  *
109  * @author Doug Lea
110  */
111 public class CyclicBarrier {
112     /**
113      * Each use of the barrier is represented as a generation instance.
114      * The generation changes whenever the barrier is tripped, or
115      * is reset. There can be many generations associated with threads
116      * using the barrier - due to the non-deterministic way the lock
117      * may be allocated to waiting threads - but only one of these
118      * can be active at a time (the one to which {@code count} applies)
119      * and all the rest are either broken or tripped.
120      * There need not be an active generation if there has been a break
121      * but no subsequent reset.
122      */
123     private static class Generation {
124         boolean broken;         // initially false
125     }
126 
127     /** The lock for guarding barrier entry */
128     private final ReentrantLock lock = new ReentrantLock();
129     /** Condition to wait on until tripped */
130     private final Condition trip = lock.newCondition();
131     /** The number of parties */
132     private final int parties;
133     /** The command to run when tripped */
134     private final Runnable barrierCommand;
135     /** The current generation */
136     private Generation generation = new Generation();
137 
138     /**
139      * Number of parties still waiting. Counts down from parties to 0
140      * on each generation.  It is reset to parties on each new
141      * generation or when broken.
142      */
143     private int count;
144 
145     /**
146      * Updates state on barrier trip and wakes up everyone.
147      * Called only while holding lock.
148      */
nextGeneration()149     private void nextGeneration() {
150         // signal completion of last generation
151         trip.signalAll();
152         // set up next generation
153         count = parties;
154         generation = new Generation();
155     }
156 
157     /**
158      * Sets current barrier generation as broken and wakes up everyone.
159      * Called only while holding lock.
160      */
breakBarrier()161     private void breakBarrier() {
162         generation.broken = true;
163         count = parties;
164         trip.signalAll();
165     }
166 
167     /**
168      * Main barrier code, covering the various policies.
169      */
dowait(boolean timed, long nanos)170     private int dowait(boolean timed, long nanos)
171         throws InterruptedException, BrokenBarrierException,
172                TimeoutException {
173         final ReentrantLock lock = this.lock;
174         lock.lock();
175         try {
176             final Generation g = generation;
177 
178             if (g.broken)
179                 throw new BrokenBarrierException();
180 
181             if (Thread.interrupted()) {
182                 breakBarrier();
183                 throw new InterruptedException();
184             }
185 
186             int index = --count;
187             if (index == 0) {  // tripped
188                 boolean ranAction = false;
189                 try {
190                     final Runnable command = barrierCommand;
191                     if (command != null)
192                         command.run();
193                     ranAction = true;
194                     nextGeneration();
195                     return 0;
196                 } finally {
197                     if (!ranAction)
198                         breakBarrier();
199                 }
200             }
201 
202             // loop until tripped, broken, interrupted, or timed out
203             for (;;) {
204                 try {
205                     if (!timed)
206                         trip.await();
207                     else if (nanos > 0L)
208                         nanos = trip.awaitNanos(nanos);
209                 } catch (InterruptedException ie) {
210                     if (g == generation && ! g.broken) {
211                         breakBarrier();
212                         throw ie;
213                     } else {
214                         // We're about to finish waiting even if we had not
215                         // been interrupted, so this interrupt is deemed to
216                         // "belong" to subsequent execution.
217                         Thread.currentThread().interrupt();
218                     }
219                 }
220 
221                 if (g.broken)
222                     throw new BrokenBarrierException();
223 
224                 if (g != generation)
225                     return index;
226 
227                 if (timed && nanos <= 0L) {
228                     breakBarrier();
229                     throw new TimeoutException();
230                 }
231             }
232         } finally {
233             lock.unlock();
234         }
235     }
236 
237     /**
238      * Creates a new {@code CyclicBarrier} that will trip when the
239      * given number of parties (threads) are waiting upon it, and which
240      * will execute the given barrier action when the barrier is tripped,
241      * performed by the last thread entering the barrier.
242      *
243      * @param parties the number of threads that must invoke {@link #await}
244      *        before the barrier is tripped
245      * @param barrierAction the command to execute when the barrier is
246      *        tripped, or {@code null} if there is no action
247      * @throws IllegalArgumentException if {@code parties} is less than 1
248      */
CyclicBarrier(int parties, Runnable barrierAction)249     public CyclicBarrier(int parties, Runnable barrierAction) {
250         if (parties <= 0) throw new IllegalArgumentException();
251         this.parties = parties;
252         this.count = parties;
253         this.barrierCommand = barrierAction;
254     }
255 
256     /**
257      * Creates a new {@code CyclicBarrier} that will trip when the
258      * given number of parties (threads) are waiting upon it, and
259      * does not perform a predefined action when the barrier is tripped.
260      *
261      * @param parties the number of threads that must invoke {@link #await}
262      *        before the barrier is tripped
263      * @throws IllegalArgumentException if {@code parties} is less than 1
264      */
CyclicBarrier(int parties)265     public CyclicBarrier(int parties) {
266         this(parties, null);
267     }
268 
269     /**
270      * Returns the number of parties required to trip this barrier.
271      *
272      * @return the number of parties required to trip this barrier
273      */
getParties()274     public int getParties() {
275         return parties;
276     }
277 
278     /**
279      * Waits until all {@linkplain #getParties parties} have invoked
280      * {@code await} on this barrier.
281      *
282      * <p>If the current thread is not the last to arrive then it is
283      * disabled for thread scheduling purposes and lies dormant until
284      * one of the following things happens:
285      * <ul>
286      * <li>The last thread arrives; or
287      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
288      * the current thread; or
289      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
290      * one of the other waiting threads; or
291      * <li>Some other thread times out while waiting for barrier; or
292      * <li>Some other thread invokes {@link #reset} on this barrier.
293      * </ul>
294      *
295      * <p>If the current thread:
296      * <ul>
297      * <li>has its interrupted status set on entry to this method; or
298      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
299      * </ul>
300      * then {@link InterruptedException} is thrown and the current thread's
301      * interrupted status is cleared.
302      *
303      * <p>If the barrier is {@link #reset} while any thread is waiting,
304      * or if the barrier {@linkplain #isBroken is broken} when
305      * {@code await} is invoked, or while any thread is waiting, then
306      * {@link BrokenBarrierException} is thrown.
307      *
308      * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
309      * then all other waiting threads will throw
310      * {@link BrokenBarrierException} and the barrier is placed in the broken
311      * state.
312      *
313      * <p>If the current thread is the last thread to arrive, and a
314      * non-null barrier action was supplied in the constructor, then the
315      * current thread runs the action before allowing the other threads to
316      * continue.
317      * If an exception occurs during the barrier action then that exception
318      * will be propagated in the current thread and the barrier is placed in
319      * the broken state.
320      *
321      * @return the arrival index of the current thread, where index
322      *         {@code getParties() - 1} indicates the first
323      *         to arrive and zero indicates the last to arrive
324      * @throws InterruptedException if the current thread was interrupted
325      *         while waiting
326      * @throws BrokenBarrierException if <em>another</em> thread was
327      *         interrupted or timed out while the current thread was
328      *         waiting, or the barrier was reset, or the barrier was
329      *         broken when {@code await} was called, or the barrier
330      *         action (if present) failed due to an exception
331      */
await()332     public int await() throws InterruptedException, BrokenBarrierException {
333         try {
334             return dowait(false, 0L);
335         } catch (TimeoutException toe) {
336             throw new Error(toe); // cannot happen
337         }
338     }
339 
340     /**
341      * Waits until all {@linkplain #getParties parties} have invoked
342      * {@code await} on this barrier, or the specified waiting time elapses.
343      *
344      * <p>If the current thread is not the last to arrive then it is
345      * disabled for thread scheduling purposes and lies dormant until
346      * one of the following things happens:
347      * <ul>
348      * <li>The last thread arrives; or
349      * <li>The specified timeout elapses; or
350      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
351      * the current thread; or
352      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
353      * one of the other waiting threads; or
354      * <li>Some other thread times out while waiting for barrier; or
355      * <li>Some other thread invokes {@link #reset} on this barrier.
356      * </ul>
357      *
358      * <p>If the current thread:
359      * <ul>
360      * <li>has its interrupted status set on entry to this method; or
361      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
362      * </ul>
363      * then {@link InterruptedException} is thrown and the current thread's
364      * interrupted status is cleared.
365      *
366      * <p>If the specified waiting time elapses then {@link TimeoutException}
367      * is thrown. If the time is less than or equal to zero, the
368      * method will not wait at all.
369      *
370      * <p>If the barrier is {@link #reset} while any thread is waiting,
371      * or if the barrier {@linkplain #isBroken is broken} when
372      * {@code await} is invoked, or while any thread is waiting, then
373      * {@link BrokenBarrierException} is thrown.
374      *
375      * <p>If any thread is {@linkplain Thread#interrupt interrupted} while
376      * waiting, then all other waiting threads will throw {@link
377      * BrokenBarrierException} and the barrier is placed in the broken
378      * state.
379      *
380      * <p>If the current thread is the last thread to arrive, and a
381      * non-null barrier action was supplied in the constructor, then the
382      * current thread runs the action before allowing the other threads to
383      * continue.
384      * If an exception occurs during the barrier action then that exception
385      * will be propagated in the current thread and the barrier is placed in
386      * the broken state.
387      *
388      * @param timeout the time to wait for the barrier
389      * @param unit the time unit of the timeout parameter
390      * @return the arrival index of the current thread, where index
391      *         {@code getParties() - 1} indicates the first
392      *         to arrive and zero indicates the last to arrive
393      * @throws InterruptedException if the current thread was interrupted
394      *         while waiting
395      * @throws TimeoutException if the specified timeout elapses.
396      *         In this case the barrier will be broken.
397      * @throws BrokenBarrierException if <em>another</em> thread was
398      *         interrupted or timed out while the current thread was
399      *         waiting, or the barrier was reset, or the barrier was broken
400      *         when {@code await} was called, or the barrier action (if
401      *         present) failed due to an exception
402      */
await(long timeout, TimeUnit unit)403     public int await(long timeout, TimeUnit unit)
404         throws InterruptedException,
405                BrokenBarrierException,
406                TimeoutException {
407         return dowait(true, unit.toNanos(timeout));
408     }
409 
410     /**
411      * Queries if this barrier is in a broken state.
412      *
413      * @return {@code true} if one or more parties broke out of this
414      *         barrier due to interruption or timeout since
415      *         construction or the last reset, or a barrier action
416      *         failed due to an exception; {@code false} otherwise.
417      */
isBroken()418     public boolean isBroken() {
419         final ReentrantLock lock = this.lock;
420         lock.lock();
421         try {
422             return generation.broken;
423         } finally {
424             lock.unlock();
425         }
426     }
427 
428     /**
429      * Resets the barrier to its initial state.  If any parties are
430      * currently waiting at the barrier, they will return with a
431      * {@link BrokenBarrierException}. Note that resets <em>after</em>
432      * a breakage has occurred for other reasons can be complicated to
433      * carry out; threads need to re-synchronize in some other way,
434      * and choose one to perform the reset.  It may be preferable to
435      * instead create a new barrier for subsequent use.
436      */
reset()437     public void reset() {
438         final ReentrantLock lock = this.lock;
439         lock.lock();
440         try {
441             breakBarrier();   // break the current generation
442             nextGeneration(); // start a new generation
443         } finally {
444             lock.unlock();
445         }
446     }
447 
448     /**
449      * Returns the number of parties currently waiting at the barrier.
450      * This method is primarily useful for debugging and assertions.
451      *
452      * @return the number of parties currently blocked in {@link #await}
453      */
getNumberWaiting()454     public int getNumberWaiting() {
455         final ReentrantLock lock = this.lock;
456         lock.lock();
457         try {
458             return parties - count;
459         } finally {
460             lock.unlock();
461         }
462     }
463 }
464