1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import java.util.concurrent.locks.Condition;
39 import java.util.concurrent.locks.ReentrantLock;
40 
41 /**
42  * A synchronization aid that allows a set of threads to all wait for
43  * each other to reach a common barrier point.  CyclicBarriers are
44  * useful in programs involving a fixed sized party of threads that
45  * must occasionally wait for each other. The barrier is called
46  * <em>cyclic</em> because it can be re-used after the waiting threads
47  * are released.
48  *
49  * <p>A {@code CyclicBarrier} supports an optional {@link Runnable} command
50  * that is run once per barrier point, after the last thread in the party
51  * arrives, but before any threads are released.
52  * This <em>barrier action</em> is useful
53  * for updating shared-state before any of the parties continue.
54  *
55  * <p><b>Sample usage:</b> Here is an example of using a barrier in a
56  * parallel decomposition design:
57  *
58  * <pre> {@code
59  * class Solver {
60  *   final int N;
61  *   final float[][] data;
62  *   final CyclicBarrier barrier;
63  *
64  *   class Worker implements Runnable {
65  *     int myRow;
66  *     Worker(int row) { myRow = row; }
67  *     public void run() {
68  *       while (!done()) {
69  *         processRow(myRow);
70  *
71  *         try {
72  *           barrier.await();
73  *         } catch (InterruptedException ex) {
74  *           return;
75  *         } catch (BrokenBarrierException ex) {
76  *           return;
77  *         }
78  *       }
79  *     }
80  *   }
81  *
82  *   public Solver(float[][] matrix) {
83  *     data = matrix;
84  *     N = matrix.length;
85  *     Runnable barrierAction =
86  *       new Runnable() { public void run() { mergeRows(...); }};
87  *     barrier = new CyclicBarrier(N, barrierAction);
88  *
89  *     List<Thread> threads = new ArrayList<>(N);
90  *     for (int i = 0; i < N; i++) {
91  *       Thread thread = new Thread(new Worker(i));
92  *       threads.add(thread);
93  *       thread.start();
94  *     }
95  *
96  *     // wait until done
97  *     for (Thread thread : threads)
98  *       thread.join();
99  *   }
100  * }}</pre>
101  *
102  * Here, each worker thread processes a row of the matrix then waits at the
103  * barrier until all rows have been processed. When all rows are processed
104  * the supplied {@link Runnable} barrier action is executed and merges the
105  * rows. If the merger
106  * determines that a solution has been found then {@code done()} will return
107  * {@code true} and each worker will terminate.
108  *
109  * <p>If the barrier action does not rely on the parties being suspended when
110  * it is executed, then any of the threads in the party could execute that
111  * action when it is released. To facilitate this, each invocation of
112  * {@link #await} returns the arrival index of that thread at the barrier.
113  * You can then choose which thread should execute the barrier action, for
114  * example:
115  * <pre> {@code
116  * if (barrier.await() == 0) {
117  *   // log the completion of this iteration
118  * }}</pre>
119  *
120  * <p>The {@code CyclicBarrier} uses an all-or-none breakage model
121  * for failed synchronization attempts: If a thread leaves a barrier
122  * point prematurely because of interruption, failure, or timeout, all
123  * other threads waiting at that barrier point will also leave
124  * abnormally via {@link BrokenBarrierException} (or
125  * {@link InterruptedException} if they too were interrupted at about
126  * the same time).
127  *
128  * <p>Memory consistency effects: Actions in a thread prior to calling
129  * {@code await()}
130  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
131  * actions that are part of the barrier action, which in turn
132  * <i>happen-before</i> actions following a successful return from the
133  * corresponding {@code await()} in other threads.
134  *
135  * @since 1.5
136  * @see CountDownLatch
137  *
138  * @author Doug Lea
139  */
140 public class CyclicBarrier {
141     /**
142      * Each use of the barrier is represented as a generation instance.
143      * The generation changes whenever the barrier is tripped, or
144      * is reset. There can be many generations associated with threads
145      * using the barrier - due to the non-deterministic way the lock
146      * may be allocated to waiting threads - but only one of these
147      * can be active at a time (the one to which {@code count} applies)
148      * and all the rest are either broken or tripped.
149      * There need not be an active generation if there has been a break
150      * but no subsequent reset.
151      */
152     private static class Generation {
153         boolean broken;         // initially false
154     }
155 
156     /** The lock for guarding barrier entry */
157     private final ReentrantLock lock = new ReentrantLock();
158     /** Condition to wait on until tripped */
159     private final Condition trip = lock.newCondition();
160     /** The number of parties */
161     private final int parties;
162     /** The command to run when tripped */
163     private final Runnable barrierCommand;
164     /** The current generation */
165     private Generation generation = new Generation();
166 
167     /**
168      * Number of parties still waiting. Counts down from parties to 0
169      * on each generation.  It is reset to parties on each new
170      * generation or when broken.
171      */
172     private int count;
173 
174     /**
175      * Updates state on barrier trip and wakes up everyone.
176      * Called only while holding lock.
177      */
nextGeneration()178     private void nextGeneration() {
179         // signal completion of last generation
180         trip.signalAll();
181         // set up next generation
182         count = parties;
183         generation = new Generation();
184     }
185 
186     /**
187      * Sets current barrier generation as broken and wakes up everyone.
188      * Called only while holding lock.
189      */
breakBarrier()190     private void breakBarrier() {
191         generation.broken = true;
192         count = parties;
193         trip.signalAll();
194     }
195 
196     /**
197      * Main barrier code, covering the various policies.
198      */
dowait(boolean timed, long nanos)199     private int dowait(boolean timed, long nanos)
200         throws InterruptedException, BrokenBarrierException,
201                TimeoutException {
202         final ReentrantLock lock = this.lock;
203         lock.lock();
204         try {
205             final Generation g = generation;
206 
207             if (g.broken)
208                 throw new BrokenBarrierException();
209 
210             if (Thread.interrupted()) {
211                 breakBarrier();
212                 throw new InterruptedException();
213             }
214 
215             int index = --count;
216             if (index == 0) {  // tripped
217                 boolean ranAction = false;
218                 try {
219                     final Runnable command = barrierCommand;
220                     if (command != null)
221                         command.run();
222                     ranAction = true;
223                     nextGeneration();
224                     return 0;
225                 } finally {
226                     if (!ranAction)
227                         breakBarrier();
228                 }
229             }
230 
231             // loop until tripped, broken, interrupted, or timed out
232             for (;;) {
233                 try {
234                     if (!timed)
235                         trip.await();
236                     else if (nanos > 0L)
237                         nanos = trip.awaitNanos(nanos);
238                 } catch (InterruptedException ie) {
239                     if (g == generation && ! g.broken) {
240                         breakBarrier();
241                         throw ie;
242                     } else {
243                         // We're about to finish waiting even if we had not
244                         // been interrupted, so this interrupt is deemed to
245                         // "belong" to subsequent execution.
246                         Thread.currentThread().interrupt();
247                     }
248                 }
249 
250                 if (g.broken)
251                     throw new BrokenBarrierException();
252 
253                 if (g != generation)
254                     return index;
255 
256                 if (timed && nanos <= 0L) {
257                     breakBarrier();
258                     throw new TimeoutException();
259                 }
260             }
261         } finally {
262             lock.unlock();
263         }
264     }
265 
266     /**
267      * Creates a new {@code CyclicBarrier} that will trip when the
268      * given number of parties (threads) are waiting upon it, and which
269      * will execute the given barrier action when the barrier is tripped,
270      * performed by the last thread entering the barrier.
271      *
272      * @param parties the number of threads that must invoke {@link #await}
273      *        before the barrier is tripped
274      * @param barrierAction the command to execute when the barrier is
275      *        tripped, or {@code null} if there is no action
276      * @throws IllegalArgumentException if {@code parties} is less than 1
277      */
CyclicBarrier(int parties, Runnable barrierAction)278     public CyclicBarrier(int parties, Runnable barrierAction) {
279         if (parties <= 0) throw new IllegalArgumentException();
280         this.parties = parties;
281         this.count = parties;
282         this.barrierCommand = barrierAction;
283     }
284 
285     /**
286      * Creates a new {@code CyclicBarrier} that will trip when the
287      * given number of parties (threads) are waiting upon it, and
288      * does not perform a predefined action when the barrier is tripped.
289      *
290      * @param parties the number of threads that must invoke {@link #await}
291      *        before the barrier is tripped
292      * @throws IllegalArgumentException if {@code parties} is less than 1
293      */
CyclicBarrier(int parties)294     public CyclicBarrier(int parties) {
295         this(parties, null);
296     }
297 
298     /**
299      * Returns the number of parties required to trip this barrier.
300      *
301      * @return the number of parties required to trip this barrier
302      */
getParties()303     public int getParties() {
304         return parties;
305     }
306 
307     /**
308      * Waits until all {@linkplain #getParties parties} have invoked
309      * {@code await} on this barrier.
310      *
311      * <p>If the current thread is not the last to arrive then it is
312      * disabled for thread scheduling purposes and lies dormant until
313      * one of the following things happens:
314      * <ul>
315      * <li>The last thread arrives; or
316      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
317      * the current thread; or
318      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
319      * one of the other waiting threads; or
320      * <li>Some other thread times out while waiting for barrier; or
321      * <li>Some other thread invokes {@link #reset} on this barrier.
322      * </ul>
323      *
324      * <p>If the current thread:
325      * <ul>
326      * <li>has its interrupted status set on entry to this method; or
327      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
328      * </ul>
329      * then {@link InterruptedException} is thrown and the current thread's
330      * interrupted status is cleared.
331      *
332      * <p>If the barrier is {@link #reset} while any thread is waiting,
333      * or if the barrier {@linkplain #isBroken is broken} when
334      * {@code await} is invoked, or while any thread is waiting, then
335      * {@link BrokenBarrierException} is thrown.
336      *
337      * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
338      * then all other waiting threads will throw
339      * {@link BrokenBarrierException} and the barrier is placed in the broken
340      * state.
341      *
342      * <p>If the current thread is the last thread to arrive, and a
343      * non-null barrier action was supplied in the constructor, then the
344      * current thread runs the action before allowing the other threads to
345      * continue.
346      * If an exception occurs during the barrier action then that exception
347      * will be propagated in the current thread and the barrier is placed in
348      * the broken state.
349      *
350      * @return the arrival index of the current thread, where index
351      *         {@code getParties() - 1} indicates the first
352      *         to arrive and zero indicates the last to arrive
353      * @throws InterruptedException if the current thread was interrupted
354      *         while waiting
355      * @throws BrokenBarrierException if <em>another</em> thread was
356      *         interrupted or timed out while the current thread was
357      *         waiting, or the barrier was reset, or the barrier was
358      *         broken when {@code await} was called, or the barrier
359      *         action (if present) failed due to an exception
360      */
await()361     public int await() throws InterruptedException, BrokenBarrierException {
362         try {
363             return dowait(false, 0L);
364         } catch (TimeoutException toe) {
365             throw new Error(toe); // cannot happen
366         }
367     }
368 
369     /**
370      * Waits until all {@linkplain #getParties parties} have invoked
371      * {@code await} on this barrier, or the specified waiting time elapses.
372      *
373      * <p>If the current thread is not the last to arrive then it is
374      * disabled for thread scheduling purposes and lies dormant until
375      * one of the following things happens:
376      * <ul>
377      * <li>The last thread arrives; or
378      * <li>The specified timeout elapses; or
379      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
380      * the current thread; or
381      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
382      * one of the other waiting threads; or
383      * <li>Some other thread times out while waiting for barrier; or
384      * <li>Some other thread invokes {@link #reset} on this barrier.
385      * </ul>
386      *
387      * <p>If the current thread:
388      * <ul>
389      * <li>has its interrupted status set on entry to this method; or
390      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
391      * </ul>
392      * then {@link InterruptedException} is thrown and the current thread's
393      * interrupted status is cleared.
394      *
395      * <p>If the specified waiting time elapses then {@link TimeoutException}
396      * is thrown. If the time is less than or equal to zero, the
397      * method will not wait at all.
398      *
399      * <p>If the barrier is {@link #reset} while any thread is waiting,
400      * or if the barrier {@linkplain #isBroken is broken} when
401      * {@code await} is invoked, or while any thread is waiting, then
402      * {@link BrokenBarrierException} is thrown.
403      *
404      * <p>If any thread is {@linkplain Thread#interrupt interrupted} while
405      * waiting, then all other waiting threads will throw {@link
406      * BrokenBarrierException} and the barrier is placed in the broken
407      * state.
408      *
409      * <p>If the current thread is the last thread to arrive, and a
410      * non-null barrier action was supplied in the constructor, then the
411      * current thread runs the action before allowing the other threads to
412      * continue.
413      * If an exception occurs during the barrier action then that exception
414      * will be propagated in the current thread and the barrier is placed in
415      * the broken state.
416      *
417      * @param timeout the time to wait for the barrier
418      * @param unit the time unit of the timeout parameter
419      * @return the arrival index of the current thread, where index
420      *         {@code getParties() - 1} indicates the first
421      *         to arrive and zero indicates the last to arrive
422      * @throws InterruptedException if the current thread was interrupted
423      *         while waiting
424      * @throws TimeoutException if the specified timeout elapses.
425      *         In this case the barrier will be broken.
426      * @throws BrokenBarrierException if <em>another</em> thread was
427      *         interrupted or timed out while the current thread was
428      *         waiting, or the barrier was reset, or the barrier was broken
429      *         when {@code await} was called, or the barrier action (if
430      *         present) failed due to an exception
431      */
await(long timeout, TimeUnit unit)432     public int await(long timeout, TimeUnit unit)
433         throws InterruptedException,
434                BrokenBarrierException,
435                TimeoutException {
436         return dowait(true, unit.toNanos(timeout));
437     }
438 
439     /**
440      * Queries if this barrier is in a broken state.
441      *
442      * @return {@code true} if one or more parties broke out of this
443      *         barrier due to interruption or timeout since
444      *         construction or the last reset, or a barrier action
445      *         failed due to an exception; {@code false} otherwise.
446      */
isBroken()447     public boolean isBroken() {
448         final ReentrantLock lock = this.lock;
449         lock.lock();
450         try {
451             return generation.broken;
452         } finally {
453             lock.unlock();
454         }
455     }
456 
457     /**
458      * Resets the barrier to its initial state.  If any parties are
459      * currently waiting at the barrier, they will return with a
460      * {@link BrokenBarrierException}. Note that resets <em>after</em>
461      * a breakage has occurred for other reasons can be complicated to
462      * carry out; threads need to re-synchronize in some other way,
463      * and choose one to perform the reset.  It may be preferable to
464      * instead create a new barrier for subsequent use.
465      */
reset()466     public void reset() {
467         final ReentrantLock lock = this.lock;
468         lock.lock();
469         try {
470             breakBarrier();   // break the current generation
471             nextGeneration(); // start a new generation
472         } finally {
473             lock.unlock();
474         }
475     }
476 
477     /**
478      * Returns the number of parties currently waiting at the barrier.
479      * This method is primarily useful for debugging and assertions.
480      *
481      * @return the number of parties currently blocked in {@link #await}
482      */
getNumberWaiting()483     public int getNumberWaiting() {
484         final ReentrantLock lock = this.lock;
485         lock.lock();
486         try {
487             return parties - count;
488         } finally {
489             lock.unlock();
490         }
491     }
492 }
493