1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import java.util.concurrent.locks.Condition;
39 import java.util.concurrent.locks.ReentrantLock;
40 
41 /**
42  * A synchronization aid that allows a set of threads to all wait for
43  * each other to reach a common barrier point.  CyclicBarriers are
44  * useful in programs involving a fixed sized party of threads that
45  * must occasionally wait for each other. The barrier is called
46  * <em>cyclic</em> because it can be re-used after the waiting threads
47  * are released.
48  *
49  * <p>A {@code CyclicBarrier} supports an optional {@link Runnable} command
50  * that is run once per barrier point, after the last thread in the party
51  * arrives, but before any threads are released.
52  * This <em>barrier action</em> is useful
53  * for updating shared-state before any of the parties continue.
54  *
55  * <p><b>Sample usage:</b> Here is an example of using a barrier in a
56  * parallel decomposition design:
57  *
58  * <pre> {@code
59  * class Solver {
60  *   final int N;
61  *   final float[][] data;
62  *   final CyclicBarrier barrier;
63  *
64  *   class Worker implements Runnable {
65  *     int myRow;
66  *     Worker(int row) { myRow = row; }
67  *     public void run() {
68  *       while (!done()) {
69  *         processRow(myRow);
70  *
71  *         try {
72  *           barrier.await();
73  *         } catch (InterruptedException ex) {
74  *           return;
75  *         } catch (BrokenBarrierException ex) {
76  *           return;
77  *         }
78  *       }
79  *     }
80  *   }
81  *
82  *   public Solver(float[][] matrix) {
83  *     data = matrix;
84  *     N = matrix.length;
85  *     Runnable barrierAction = () -> mergeRows(...);
86  *     barrier = new CyclicBarrier(N, barrierAction);
87  *
88  *     List<Thread> threads = new ArrayList<>(N);
89  *     for (int i = 0; i < N; i++) {
90  *       Thread thread = new Thread(new Worker(i));
91  *       threads.add(thread);
92  *       thread.start();
93  *     }
94  *
95  *     // wait until done
96  *     for (Thread thread : threads)
97  *       try {
98  *         thread.join();
99  *       } catch (InterruptedException ex) { }
100  *   }
101  * }}</pre>
102  *
103  * Here, each worker thread processes a row of the matrix, then waits at the
104  * barrier until all rows have been processed. When all rows are processed the
105  * supplied {@link Runnable} barrier action is executed and merges the rows.
106  * If the merger determines that a solution has been found then {@code done()}
107  * will return {@code true} and each worker will terminate.
108  *
109  * <p>If the barrier action does not rely on the parties being suspended when
110  * it is executed, then any of the threads in the party could execute that
111  * action when it is released. To facilitate this, each invocation of
112  * {@link #await} returns the arrival index of that thread at the barrier.
113  * You can then choose which thread should execute the barrier action, for
114  * example:
115  * <pre> {@code
116  * if (barrier.await() == 0) {
117  *   // log the completion of this iteration
118  * }}</pre>
119  *
120  * <p>The {@code CyclicBarrier} uses an all-or-none breakage model
121  * for failed synchronization attempts: If a thread leaves a barrier
122  * point prematurely because of interruption, failure, or timeout, all
123  * other threads waiting at that barrier point will also leave
124  * abnormally via {@link BrokenBarrierException} (or
125  * {@link InterruptedException} if they too were interrupted at about
126  * the same time).
127  *
128  * <p>Memory consistency effects: Actions in a thread prior to calling
129  * {@code await()}
130  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
131  * actions that are part of the barrier action, which in turn
132  * <i>happen-before</i> actions following a successful return from the
133  * corresponding {@code await()} in other threads.
134  *
135  * @see CountDownLatch
136  * @see Phaser
137  *
138  * @author Doug Lea
139  * @since 1.5
140  */
141 public class CyclicBarrier {
142     /**
143      * Each use of the barrier is represented as a generation instance.
144      * The generation changes whenever the barrier is tripped, or
145      * is reset. There can be many generations associated with threads
146      * using the barrier - due to the non-deterministic way the lock
147      * may be allocated to waiting threads - but only one of these
148      * can be active at a time (the one to which {@code count} applies)
149      * and all the rest are either broken or tripped.
150      * There need not be an active generation if there has been a break
151      * but no subsequent reset.
152      */
153     private static class Generation {
Generation()154         Generation() {}                 // prevent access constructor creation
155         boolean broken;                 // initially false
156     }
157 
158     /** The lock for guarding barrier entry */
159     private final ReentrantLock lock = new ReentrantLock();
160     /** Condition to wait on until tripped */
161     private final Condition trip = lock.newCondition();
162     /** The number of parties */
163     private final int parties;
164     /** The command to run when tripped */
165     private final Runnable barrierCommand;
166     /** The current generation */
167     private Generation generation = new Generation();
168 
169     /**
170      * Number of parties still waiting. Counts down from parties to 0
171      * on each generation.  It is reset to parties on each new
172      * generation or when broken.
173      */
174     private int count;
175 
176     /**
177      * Updates state on barrier trip and wakes up everyone.
178      * Called only while holding lock.
179      */
nextGeneration()180     private void nextGeneration() {
181         // signal completion of last generation
182         trip.signalAll();
183         // set up next generation
184         count = parties;
185         generation = new Generation();
186     }
187 
188     /**
189      * Sets current barrier generation as broken and wakes up everyone.
190      * Called only while holding lock.
191      */
breakBarrier()192     private void breakBarrier() {
193         generation.broken = true;
194         count = parties;
195         trip.signalAll();
196     }
197 
198     /**
199      * Main barrier code, covering the various policies.
200      */
dowait(boolean timed, long nanos)201     private int dowait(boolean timed, long nanos)
202         throws InterruptedException, BrokenBarrierException,
203                TimeoutException {
204         final ReentrantLock lock = this.lock;
205         lock.lock();
206         try {
207             final Generation g = generation;
208 
209             if (g.broken)
210                 throw new BrokenBarrierException();
211 
212             if (Thread.interrupted()) {
213                 breakBarrier();
214                 throw new InterruptedException();
215             }
216 
217             int index = --count;
218             if (index == 0) {  // tripped
219                 Runnable command = barrierCommand;
220                 if (command != null) {
221                     try {
222                         command.run();
223                     } catch (Throwable ex) {
224                         breakBarrier();
225                         throw ex;
226                     }
227                 }
228                 nextGeneration();
229                 return 0;
230             }
231 
232             // loop until tripped, broken, interrupted, or timed out
233             for (;;) {
234                 try {
235                     if (!timed)
236                         trip.await();
237                     else if (nanos > 0L)
238                         nanos = trip.awaitNanos(nanos);
239                 } catch (InterruptedException ie) {
240                     if (g == generation && ! g.broken) {
241                         breakBarrier();
242                         throw ie;
243                     } else {
244                         // We're about to finish waiting even if we had not
245                         // been interrupted, so this interrupt is deemed to
246                         // "belong" to subsequent execution.
247                         Thread.currentThread().interrupt();
248                     }
249                 }
250 
251                 if (g.broken)
252                     throw new BrokenBarrierException();
253 
254                 if (g != generation)
255                     return index;
256 
257                 if (timed && nanos <= 0L) {
258                     breakBarrier();
259                     throw new TimeoutException();
260                 }
261             }
262         } finally {
263             lock.unlock();
264         }
265     }
266 
267     /**
268      * Creates a new {@code CyclicBarrier} that will trip when the
269      * given number of parties (threads) are waiting upon it, and which
270      * will execute the given barrier action when the barrier is tripped,
271      * performed by the last thread entering the barrier.
272      *
273      * @param parties the number of threads that must invoke {@link #await}
274      *        before the barrier is tripped
275      * @param barrierAction the command to execute when the barrier is
276      *        tripped, or {@code null} if there is no action
277      * @throws IllegalArgumentException if {@code parties} is less than 1
278      */
CyclicBarrier(int parties, Runnable barrierAction)279     public CyclicBarrier(int parties, Runnable barrierAction) {
280         if (parties <= 0) throw new IllegalArgumentException();
281         this.parties = parties;
282         this.count = parties;
283         this.barrierCommand = barrierAction;
284     }
285 
286     /**
287      * Creates a new {@code CyclicBarrier} that will trip when the
288      * given number of parties (threads) are waiting upon it, and
289      * does not perform a predefined action when the barrier is tripped.
290      *
291      * @param parties the number of threads that must invoke {@link #await}
292      *        before the barrier is tripped
293      * @throws IllegalArgumentException if {@code parties} is less than 1
294      */
CyclicBarrier(int parties)295     public CyclicBarrier(int parties) {
296         this(parties, null);
297     }
298 
299     /**
300      * Returns the number of parties required to trip this barrier.
301      *
302      * @return the number of parties required to trip this barrier
303      */
getParties()304     public int getParties() {
305         return parties;
306     }
307 
308     /**
309      * Waits until all {@linkplain #getParties parties} have invoked
310      * {@code await} on this barrier.
311      *
312      * <p>If the current thread is not the last to arrive then it is
313      * disabled for thread scheduling purposes and lies dormant until
314      * one of the following things happens:
315      * <ul>
316      * <li>The last thread arrives; or
317      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
318      * the current thread; or
319      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
320      * one of the other waiting threads; or
321      * <li>Some other thread times out while waiting for barrier; or
322      * <li>Some other thread invokes {@link #reset} on this barrier.
323      * </ul>
324      *
325      * <p>If the current thread:
326      * <ul>
327      * <li>has its interrupted status set on entry to this method; or
328      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
329      * </ul>
330      * then {@link InterruptedException} is thrown and the current thread's
331      * interrupted status is cleared.
332      *
333      * <p>If the barrier is {@link #reset} while any thread is waiting,
334      * or if the barrier {@linkplain #isBroken is broken} when
335      * {@code await} is invoked, or while any thread is waiting, then
336      * {@link BrokenBarrierException} is thrown.
337      *
338      * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
339      * then all other waiting threads will throw
340      * {@link BrokenBarrierException} and the barrier is placed in the broken
341      * state.
342      *
343      * <p>If the current thread is the last thread to arrive, and a
344      * non-null barrier action was supplied in the constructor, then the
345      * current thread runs the action before allowing the other threads to
346      * continue.
347      * If an exception occurs during the barrier action then that exception
348      * will be propagated in the current thread and the barrier is placed in
349      * the broken state.
350      *
351      * @return the arrival index of the current thread, where index
352      *         {@code getParties() - 1} indicates the first
353      *         to arrive and zero indicates the last to arrive
354      * @throws InterruptedException if the current thread was interrupted
355      *         while waiting
356      * @throws BrokenBarrierException if <em>another</em> thread was
357      *         interrupted or timed out while the current thread was
358      *         waiting, or the barrier was reset, or the barrier was
359      *         broken when {@code await} was called, or the barrier
360      *         action (if present) failed due to an exception
361      */
await()362     public int await() throws InterruptedException, BrokenBarrierException {
363         try {
364             return dowait(false, 0L);
365         } catch (TimeoutException toe) {
366             throw new Error(toe); // cannot happen
367         }
368     }
369 
370     /**
371      * Waits until all {@linkplain #getParties parties} have invoked
372      * {@code await} on this barrier, or the specified waiting time elapses.
373      *
374      * <p>If the current thread is not the last to arrive then it is
375      * disabled for thread scheduling purposes and lies dormant until
376      * one of the following things happens:
377      * <ul>
378      * <li>The last thread arrives; or
379      * <li>The specified timeout elapses; or
380      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
381      * the current thread; or
382      * <li>Some other thread {@linkplain Thread#interrupt interrupts}
383      * one of the other waiting threads; or
384      * <li>Some other thread times out while waiting for barrier; or
385      * <li>Some other thread invokes {@link #reset} on this barrier.
386      * </ul>
387      *
388      * <p>If the current thread:
389      * <ul>
390      * <li>has its interrupted status set on entry to this method; or
391      * <li>is {@linkplain Thread#interrupt interrupted} while waiting
392      * </ul>
393      * then {@link InterruptedException} is thrown and the current thread's
394      * interrupted status is cleared.
395      *
396      * <p>If the specified waiting time elapses then {@link TimeoutException}
397      * is thrown. If the time is less than or equal to zero, the
398      * method will not wait at all.
399      *
400      * <p>If the barrier is {@link #reset} while any thread is waiting,
401      * or if the barrier {@linkplain #isBroken is broken} when
402      * {@code await} is invoked, or while any thread is waiting, then
403      * {@link BrokenBarrierException} is thrown.
404      *
405      * <p>If any thread is {@linkplain Thread#interrupt interrupted} while
406      * waiting, then all other waiting threads will throw {@link
407      * BrokenBarrierException} and the barrier is placed in the broken
408      * state.
409      *
410      * <p>If the current thread is the last thread to arrive, and a
411      * non-null barrier action was supplied in the constructor, then the
412      * current thread runs the action before allowing the other threads to
413      * continue.
414      * If an exception occurs during the barrier action then that exception
415      * will be propagated in the current thread and the barrier is placed in
416      * the broken state.
417      *
418      * @param timeout the time to wait for the barrier
419      * @param unit the time unit of the timeout parameter
420      * @return the arrival index of the current thread, where index
421      *         {@code getParties() - 1} indicates the first
422      *         to arrive and zero indicates the last to arrive
423      * @throws InterruptedException if the current thread was interrupted
424      *         while waiting
425      * @throws TimeoutException if the specified timeout elapses.
426      *         In this case the barrier will be broken.
427      * @throws BrokenBarrierException if <em>another</em> thread was
428      *         interrupted or timed out while the current thread was
429      *         waiting, or the barrier was reset, or the barrier was broken
430      *         when {@code await} was called, or the barrier action (if
431      *         present) failed due to an exception
432      */
await(long timeout, TimeUnit unit)433     public int await(long timeout, TimeUnit unit)
434         throws InterruptedException,
435                BrokenBarrierException,
436                TimeoutException {
437         return dowait(true, unit.toNanos(timeout));
438     }
439 
440     /**
441      * Queries if this barrier is in a broken state.
442      *
443      * @return {@code true} if one or more parties broke out of this
444      *         barrier due to interruption or timeout since
445      *         construction or the last reset, or a barrier action
446      *         failed due to an exception; {@code false} otherwise.
447      */
isBroken()448     public boolean isBroken() {
449         final ReentrantLock lock = this.lock;
450         lock.lock();
451         try {
452             return generation.broken;
453         } finally {
454             lock.unlock();
455         }
456     }
457 
458     /**
459      * Resets the barrier to its initial state.  If any parties are
460      * currently waiting at the barrier, they will return with a
461      * {@link BrokenBarrierException}. Note that resets <em>after</em>
462      * a breakage has occurred for other reasons can be complicated to
463      * carry out; threads need to re-synchronize in some other way,
464      * and choose one to perform the reset.  It may be preferable to
465      * instead create a new barrier for subsequent use.
466      */
reset()467     public void reset() {
468         final ReentrantLock lock = this.lock;
469         lock.lock();
470         try {
471             breakBarrier();   // break the current generation
472             nextGeneration(); // start a new generation
473         } finally {
474             lock.unlock();
475         }
476     }
477 
478     /**
479      * Returns the number of parties currently waiting at the barrier.
480      * This method is primarily useful for debugging and assertions.
481      *
482      * @return the number of parties currently blocked in {@link #await}
483      */
getNumberWaiting()484     public int getNumberWaiting() {
485         final ReentrantLock lock = this.lock;
486         lock.lock();
487         try {
488             return parties - count;
489         } finally {
490             lock.unlock();
491         }
492     }
493 }
494