1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7 package java.util.concurrent; 8 import java.util.concurrent.locks.Condition; 9 import java.util.concurrent.locks.ReentrantLock; 10 11 /** 12 * A synchronization aid that allows a set of threads to all wait for 13 * each other to reach a common barrier point. CyclicBarriers are 14 * useful in programs involving a fixed sized party of threads that 15 * must occasionally wait for each other. The barrier is called 16 * <em>cyclic</em> because it can be re-used after the waiting threads 17 * are released. 18 * 19 * <p>A {@code CyclicBarrier} supports an optional {@link Runnable} command 20 * that is run once per barrier point, after the last thread in the party 21 * arrives, but before any threads are released. 22 * This <em>barrier action</em> is useful 23 * for updating shared-state before any of the parties continue. 24 * 25 * <p><b>Sample usage:</b> Here is an example of 26 * using a barrier in a parallel decomposition design: 27 * 28 * <pre> {@code 29 * class Solver { 30 * final int N; 31 * final float[][] data; 32 * final CyclicBarrier barrier; 33 * 34 * class Worker implements Runnable { 35 * int myRow; 36 * Worker(int row) { myRow = row; } 37 * public void run() { 38 * while (!done()) { 39 * processRow(myRow); 40 * 41 * try { 42 * barrier.await(); 43 * } catch (InterruptedException ex) { 44 * return; 45 * } catch (BrokenBarrierException ex) { 46 * return; 47 * } 48 * } 49 * } 50 * } 51 * 52 * public Solver(float[][] matrix) { 53 * data = matrix; 54 * N = matrix.length; 55 * barrier = new CyclicBarrier(N, 56 * new Runnable() { 57 * public void run() { 58 * mergeRows(...); 59 * } 60 * }); 61 * for (int i = 0; i < N; ++i) 62 * new Thread(new Worker(i)).start(); 63 * 64 * waitUntilDone(); 65 * } 66 * }}</pre> 67 * 68 * Here, each worker thread processes a row of the matrix then waits at the 69 * barrier until all rows have been processed. When all rows are processed 70 * the supplied {@link Runnable} barrier action is executed and merges the 71 * rows. If the merger 72 * determines that a solution has been found then {@code done()} will return 73 * {@code true} and each worker will terminate. 74 * 75 * <p>If the barrier action does not rely on the parties being suspended when 76 * it is executed, then any of the threads in the party could execute that 77 * action when it is released. To facilitate this, each invocation of 78 * {@link #await} returns the arrival index of that thread at the barrier. 79 * You can then choose which thread should execute the barrier action, for 80 * example: 81 * <pre> {@code 82 * if (barrier.await() == 0) { 83 * // log the completion of this iteration 84 * }}</pre> 85 * 86 * <p>The {@code CyclicBarrier} uses an all-or-none breakage model 87 * for failed synchronization attempts: If a thread leaves a barrier 88 * point prematurely because of interruption, failure, or timeout, all 89 * other threads waiting at that barrier point will also leave 90 * abnormally via {@link BrokenBarrierException} (or 91 * {@link InterruptedException} if they too were interrupted at about 92 * the same time). 93 * 94 * <p>Memory consistency effects: Actions in a thread prior to calling 95 * {@code await()} 96 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 97 * actions that are part of the barrier action, which in turn 98 * <i>happen-before</i> actions following a successful return from the 99 * corresponding {@code await()} in other threads. 100 * 101 * @since 1.5 102 * @see CountDownLatch 103 * 104 * @author Doug Lea 105 */ 106 public class CyclicBarrier { 107 /** 108 * Each use of the barrier is represented as a generation instance. 109 * The generation changes whenever the barrier is tripped, or 110 * is reset. There can be many generations associated with threads 111 * using the barrier - due to the non-deterministic way the lock 112 * may be allocated to waiting threads - but only one of these 113 * can be active at a time (the one to which {@code count} applies) 114 * and all the rest are either broken or tripped. 115 * There need not be an active generation if there has been a break 116 * but no subsequent reset. 117 */ 118 private static class Generation { 119 boolean broken = false; 120 } 121 122 /** The lock for guarding barrier entry */ 123 private final ReentrantLock lock = new ReentrantLock(); 124 /** Condition to wait on until tripped */ 125 private final Condition trip = lock.newCondition(); 126 /** The number of parties */ 127 private final int parties; 128 /* The command to run when tripped */ 129 private final Runnable barrierCommand; 130 /** The current generation */ 131 private Generation generation = new Generation(); 132 133 /** 134 * Number of parties still waiting. Counts down from parties to 0 135 * on each generation. It is reset to parties on each new 136 * generation or when broken. 137 */ 138 private int count; 139 140 /** 141 * Updates state on barrier trip and wakes up everyone. 142 * Called only while holding lock. 143 */ nextGeneration()144 private void nextGeneration() { 145 // signal completion of last generation 146 trip.signalAll(); 147 // set up next generation 148 count = parties; 149 generation = new Generation(); 150 } 151 152 /** 153 * Sets current barrier generation as broken and wakes up everyone. 154 * Called only while holding lock. 155 */ breakBarrier()156 private void breakBarrier() { 157 generation.broken = true; 158 count = parties; 159 trip.signalAll(); 160 } 161 162 /** 163 * Main barrier code, covering the various policies. 164 */ dowait(boolean timed, long nanos)165 private int dowait(boolean timed, long nanos) 166 throws InterruptedException, BrokenBarrierException, 167 TimeoutException { 168 final ReentrantLock lock = this.lock; 169 lock.lock(); 170 try { 171 final Generation g = generation; 172 173 if (g.broken) 174 throw new BrokenBarrierException(); 175 176 if (Thread.interrupted()) { 177 breakBarrier(); 178 throw new InterruptedException(); 179 } 180 181 int index = --count; 182 if (index == 0) { // tripped 183 boolean ranAction = false; 184 try { 185 final Runnable command = barrierCommand; 186 if (command != null) 187 command.run(); 188 ranAction = true; 189 nextGeneration(); 190 return 0; 191 } finally { 192 if (!ranAction) 193 breakBarrier(); 194 } 195 } 196 197 // loop until tripped, broken, interrupted, or timed out 198 for (;;) { 199 try { 200 if (!timed) 201 trip.await(); 202 else if (nanos > 0L) 203 nanos = trip.awaitNanos(nanos); 204 } catch (InterruptedException ie) { 205 if (g == generation && ! g.broken) { 206 breakBarrier(); 207 throw ie; 208 } else { 209 // We're about to finish waiting even if we had not 210 // been interrupted, so this interrupt is deemed to 211 // "belong" to subsequent execution. 212 Thread.currentThread().interrupt(); 213 } 214 } 215 216 if (g.broken) 217 throw new BrokenBarrierException(); 218 219 if (g != generation) 220 return index; 221 222 if (timed && nanos <= 0L) { 223 breakBarrier(); 224 throw new TimeoutException(); 225 } 226 } 227 } finally { 228 lock.unlock(); 229 } 230 } 231 232 /** 233 * Creates a new {@code CyclicBarrier} that will trip when the 234 * given number of parties (threads) are waiting upon it, and which 235 * will execute the given barrier action when the barrier is tripped, 236 * performed by the last thread entering the barrier. 237 * 238 * @param parties the number of threads that must invoke {@link #await} 239 * before the barrier is tripped 240 * @param barrierAction the command to execute when the barrier is 241 * tripped, or {@code null} if there is no action 242 * @throws IllegalArgumentException if {@code parties} is less than 1 243 */ CyclicBarrier(int parties, Runnable barrierAction)244 public CyclicBarrier(int parties, Runnable barrierAction) { 245 if (parties <= 0) throw new IllegalArgumentException(); 246 this.parties = parties; 247 this.count = parties; 248 this.barrierCommand = barrierAction; 249 } 250 251 /** 252 * Creates a new {@code CyclicBarrier} that will trip when the 253 * given number of parties (threads) are waiting upon it, and 254 * does not perform a predefined action when the barrier is tripped. 255 * 256 * @param parties the number of threads that must invoke {@link #await} 257 * before the barrier is tripped 258 * @throws IllegalArgumentException if {@code parties} is less than 1 259 */ CyclicBarrier(int parties)260 public CyclicBarrier(int parties) { 261 this(parties, null); 262 } 263 264 /** 265 * Returns the number of parties required to trip this barrier. 266 * 267 * @return the number of parties required to trip this barrier 268 */ getParties()269 public int getParties() { 270 return parties; 271 } 272 273 /** 274 * Waits until all {@linkplain #getParties parties} have invoked 275 * {@code await} on this barrier. 276 * 277 * <p>If the current thread is not the last to arrive then it is 278 * disabled for thread scheduling purposes and lies dormant until 279 * one of the following things happens: 280 * <ul> 281 * <li>The last thread arrives; or 282 * <li>Some other thread {@linkplain Thread#interrupt interrupts} 283 * the current thread; or 284 * <li>Some other thread {@linkplain Thread#interrupt interrupts} 285 * one of the other waiting threads; or 286 * <li>Some other thread times out while waiting for barrier; or 287 * <li>Some other thread invokes {@link #reset} on this barrier. 288 * </ul> 289 * 290 * <p>If the current thread: 291 * <ul> 292 * <li>has its interrupted status set on entry to this method; or 293 * <li>is {@linkplain Thread#interrupt interrupted} while waiting 294 * </ul> 295 * then {@link InterruptedException} is thrown and the current thread's 296 * interrupted status is cleared. 297 * 298 * <p>If the barrier is {@link #reset} while any thread is waiting, 299 * or if the barrier {@linkplain #isBroken is broken} when 300 * {@code await} is invoked, or while any thread is waiting, then 301 * {@link BrokenBarrierException} is thrown. 302 * 303 * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting, 304 * then all other waiting threads will throw 305 * {@link BrokenBarrierException} and the barrier is placed in the broken 306 * state. 307 * 308 * <p>If the current thread is the last thread to arrive, and a 309 * non-null barrier action was supplied in the constructor, then the 310 * current thread runs the action before allowing the other threads to 311 * continue. 312 * If an exception occurs during the barrier action then that exception 313 * will be propagated in the current thread and the barrier is placed in 314 * the broken state. 315 * 316 * @return the arrival index of the current thread, where index 317 * {@code getParties() - 1} indicates the first 318 * to arrive and zero indicates the last to arrive 319 * @throws InterruptedException if the current thread was interrupted 320 * while waiting 321 * @throws BrokenBarrierException if <em>another</em> thread was 322 * interrupted or timed out while the current thread was 323 * waiting, or the barrier was reset, or the barrier was 324 * broken when {@code await} was called, or the barrier 325 * action (if present) failed due to an exception 326 */ await()327 public int await() throws InterruptedException, BrokenBarrierException { 328 try { 329 return dowait(false, 0L); 330 } catch (TimeoutException toe) { 331 throw new Error(toe); // cannot happen 332 } 333 } 334 335 /** 336 * Waits until all {@linkplain #getParties parties} have invoked 337 * {@code await} on this barrier, or the specified waiting time elapses. 338 * 339 * <p>If the current thread is not the last to arrive then it is 340 * disabled for thread scheduling purposes and lies dormant until 341 * one of the following things happens: 342 * <ul> 343 * <li>The last thread arrives; or 344 * <li>The specified timeout elapses; or 345 * <li>Some other thread {@linkplain Thread#interrupt interrupts} 346 * the current thread; or 347 * <li>Some other thread {@linkplain Thread#interrupt interrupts} 348 * one of the other waiting threads; or 349 * <li>Some other thread times out while waiting for barrier; or 350 * <li>Some other thread invokes {@link #reset} on this barrier. 351 * </ul> 352 * 353 * <p>If the current thread: 354 * <ul> 355 * <li>has its interrupted status set on entry to this method; or 356 * <li>is {@linkplain Thread#interrupt interrupted} while waiting 357 * </ul> 358 * then {@link InterruptedException} is thrown and the current thread's 359 * interrupted status is cleared. 360 * 361 * <p>If the specified waiting time elapses then {@link TimeoutException} 362 * is thrown. If the time is less than or equal to zero, the 363 * method will not wait at all. 364 * 365 * <p>If the barrier is {@link #reset} while any thread is waiting, 366 * or if the barrier {@linkplain #isBroken is broken} when 367 * {@code await} is invoked, or while any thread is waiting, then 368 * {@link BrokenBarrierException} is thrown. 369 * 370 * <p>If any thread is {@linkplain Thread#interrupt interrupted} while 371 * waiting, then all other waiting threads will throw {@link 372 * BrokenBarrierException} and the barrier is placed in the broken 373 * state. 374 * 375 * <p>If the current thread is the last thread to arrive, and a 376 * non-null barrier action was supplied in the constructor, then the 377 * current thread runs the action before allowing the other threads to 378 * continue. 379 * If an exception occurs during the barrier action then that exception 380 * will be propagated in the current thread and the barrier is placed in 381 * the broken state. 382 * 383 * @param timeout the time to wait for the barrier 384 * @param unit the time unit of the timeout parameter 385 * @return the arrival index of the current thread, where index 386 * {@code getParties() - 1} indicates the first 387 * to arrive and zero indicates the last to arrive 388 * @throws InterruptedException if the current thread was interrupted 389 * while waiting 390 * @throws TimeoutException if the specified timeout elapses 391 * @throws BrokenBarrierException if <em>another</em> thread was 392 * interrupted or timed out while the current thread was 393 * waiting, or the barrier was reset, or the barrier was broken 394 * when {@code await} was called, or the barrier action (if 395 * present) failed due to an exception 396 */ await(long timeout, TimeUnit unit)397 public int await(long timeout, TimeUnit unit) 398 throws InterruptedException, 399 BrokenBarrierException, 400 TimeoutException { 401 return dowait(true, unit.toNanos(timeout)); 402 } 403 404 /** 405 * Queries if this barrier is in a broken state. 406 * 407 * @return {@code true} if one or more parties broke out of this 408 * barrier due to interruption or timeout since 409 * construction or the last reset, or a barrier action 410 * failed due to an exception; {@code false} otherwise. 411 */ isBroken()412 public boolean isBroken() { 413 final ReentrantLock lock = this.lock; 414 lock.lock(); 415 try { 416 return generation.broken; 417 } finally { 418 lock.unlock(); 419 } 420 } 421 422 /** 423 * Resets the barrier to its initial state. If any parties are 424 * currently waiting at the barrier, they will return with a 425 * {@link BrokenBarrierException}. Note that resets <em>after</em> 426 * a breakage has occurred for other reasons can be complicated to 427 * carry out; threads need to re-synchronize in some other way, 428 * and choose one to perform the reset. It may be preferable to 429 * instead create a new barrier for subsequent use. 430 */ reset()431 public void reset() { 432 final ReentrantLock lock = this.lock; 433 lock.lock(); 434 try { 435 breakBarrier(); // break the current generation 436 nextGeneration(); // start a new generation 437 } finally { 438 lock.unlock(); 439 } 440 } 441 442 /** 443 * Returns the number of parties currently waiting at the barrier. 444 * This method is primarily useful for debugging and assertions. 445 * 446 * @return the number of parties currently blocked in {@link #await} 447 */ getNumberWaiting()448 public int getNumberWaiting() { 449 final ReentrantLock lock = this.lock; 450 lock.lock(); 451 try { 452 return parties - count; 453 } finally { 454 lock.unlock(); 455 } 456 } 457 } 458