1 /* 2 * Written by Doug Lea, Bill Scherer, and Michael Scott with 3 * assistance from members of JCP JSR-166 Expert Group and released to 4 * the public domain, as explained at 5 * http://creativecommons.org/publicdomain/zero/1.0/ 6 */ 7 8 package java.util.concurrent; 9 10 /** 11 * A synchronization point at which threads can pair and swap elements 12 * within pairs. Each thread presents some object on entry to the 13 * {@link #exchange exchange} method, matches with a partner thread, 14 * and receives its partner's object on return. An Exchanger may be 15 * viewed as a bidirectional form of a {@link SynchronousQueue}. 16 * Exchangers may be useful in applications such as genetic algorithms 17 * and pipeline designs. 18 * 19 * <p><b>Sample Usage:</b> 20 * Here are the highlights of a class that uses an {@code Exchanger} 21 * to swap buffers between threads so that the thread filling the 22 * buffer gets a freshly emptied one when it needs it, handing off the 23 * filled one to the thread emptying the buffer. 24 * <pre> {@code 25 * class FillAndEmpty { 26 * Exchanger<DataBuffer> exchanger = new Exchanger<>(); 27 * DataBuffer initialEmptyBuffer = ... a made-up type 28 * DataBuffer initialFullBuffer = ... 29 * 30 * class FillingLoop implements Runnable { 31 * public void run() { 32 * DataBuffer currentBuffer = initialEmptyBuffer; 33 * try { 34 * while (currentBuffer != null) { 35 * addToBuffer(currentBuffer); 36 * if (currentBuffer.isFull()) 37 * currentBuffer = exchanger.exchange(currentBuffer); 38 * } 39 * } catch (InterruptedException ex) { ... handle ... } 40 * } 41 * } 42 * 43 * class EmptyingLoop implements Runnable { 44 * public void run() { 45 * DataBuffer currentBuffer = initialFullBuffer; 46 * try { 47 * while (currentBuffer != null) { 48 * takeFromBuffer(currentBuffer); 49 * if (currentBuffer.isEmpty()) 50 * currentBuffer = exchanger.exchange(currentBuffer); 51 * } 52 * } catch (InterruptedException ex) { ... handle ...} 53 * } 54 * } 55 * 56 * void start() { 57 * new Thread(new FillingLoop()).start(); 58 * new Thread(new EmptyingLoop()).start(); 59 * } 60 * }}</pre> 61 * 62 * <p>Memory consistency effects: For each pair of threads that 63 * successfully exchange objects via an {@code Exchanger}, actions 64 * prior to the {@code exchange()} in each thread 65 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 66 * those subsequent to a return from the corresponding {@code exchange()} 67 * in the other thread. 68 * 69 * @since 1.5 70 * @author Doug Lea and Bill Scherer and Michael Scott 71 * @param <V> The type of objects that may be exchanged 72 */ 73 public class Exchanger<V> { 74 75 /* 76 * Overview: The core algorithm is, for an exchange "slot", 77 * and a participant (caller) with an item: 78 * 79 * for (;;) { 80 * if (slot is empty) { // offer 81 * place item in a Node; 82 * if (can CAS slot from empty to node) { 83 * wait for release; 84 * return matching item in node; 85 * } 86 * } 87 * else if (can CAS slot from node to empty) { // release 88 * get the item in node; 89 * set matching item in node; 90 * release waiting thread; 91 * } 92 * // else retry on CAS failure 93 * } 94 * 95 * This is among the simplest forms of a "dual data structure" -- 96 * see Scott and Scherer's DISC 04 paper and 97 * http://www.cs.rochester.edu/research/synchronization/pseudocode/duals.html 98 * 99 * This works great in principle. But in practice, like many 100 * algorithms centered on atomic updates to a single location, it 101 * scales horribly when there are more than a few participants 102 * using the same Exchanger. So the implementation instead uses a 103 * form of elimination arena, that spreads out this contention by 104 * arranging that some threads typically use different slots, 105 * while still ensuring that eventually, any two parties will be 106 * able to exchange items. That is, we cannot completely partition 107 * across threads, but instead give threads arena indices that 108 * will on average grow under contention and shrink under lack of 109 * contention. We approach this by defining the Nodes that we need 110 * anyway as ThreadLocals, and include in them per-thread index 111 * and related bookkeeping state. (We can safely reuse per-thread 112 * nodes rather than creating them fresh each time because slots 113 * alternate between pointing to a node vs null, so cannot 114 * encounter ABA problems. However, we do need some care in 115 * resetting them between uses.) 116 * 117 * Implementing an effective arena requires allocating a bunch of 118 * space, so we only do so upon detecting contention (except on 119 * uniprocessors, where they wouldn't help, so aren't used). 120 * Otherwise, exchanges use the single-slot slotExchange method. 121 * On contention, not only must the slots be in different 122 * locations, but the locations must not encounter memory 123 * contention due to being on the same cache line (or more 124 * generally, the same coherence unit). Because, as of this 125 * writing, there is no way to determine cacheline size, we define 126 * a value that is enough for common platforms. Additionally, 127 * extra care elsewhere is taken to avoid other false/unintended 128 * sharing and to enhance locality, including adding padding (via 129 * @Contended) to Nodes, embedding "bound" as an Exchanger field, 130 * and reworking some park/unpark mechanics compared to 131 * LockSupport versions. 132 * 133 * The arena starts out with only one used slot. We expand the 134 * effective arena size by tracking collisions; i.e., failed CASes 135 * while trying to exchange. By nature of the above algorithm, the 136 * only kinds of collision that reliably indicate contention are 137 * when two attempted releases collide -- one of two attempted 138 * offers can legitimately fail to CAS without indicating 139 * contention by more than one other thread. (Note: it is possible 140 * but not worthwhile to more precisely detect contention by 141 * reading slot values after CAS failures.) When a thread has 142 * collided at each slot within the current arena bound, it tries 143 * to expand the arena size by one. We track collisions within 144 * bounds by using a version (sequence) number on the "bound" 145 * field, and conservatively reset collision counts when a 146 * participant notices that bound has been updated (in either 147 * direction). 148 * 149 * The effective arena size is reduced (when there is more than 150 * one slot) by giving up on waiting after a while and trying to 151 * decrement the arena size on expiration. The value of "a while" 152 * is an empirical matter. We implement by piggybacking on the 153 * use of spin->yield->block that is essential for reasonable 154 * waiting performance anyway -- in a busy exchanger, offers are 155 * usually almost immediately released, in which case context 156 * switching on multiprocessors is extremely slow/wasteful. Arena 157 * waits just omit the blocking part, and instead cancel. The spin 158 * count is empirically chosen to be a value that avoids blocking 159 * 99% of the time under maximum sustained exchange rates on a 160 * range of test machines. Spins and yields entail some limited 161 * randomness (using a cheap xorshift) to avoid regular patterns 162 * that can induce unproductive grow/shrink cycles. (Using a 163 * pseudorandom also helps regularize spin cycle duration by 164 * making branches unpredictable.) Also, during an offer, a 165 * waiter can "know" that it will be released when its slot has 166 * changed, but cannot yet proceed until match is set. In the 167 * mean time it cannot cancel the offer, so instead spins/yields. 168 * Note: It is possible to avoid this secondary check by changing 169 * the linearization point to be a CAS of the match field (as done 170 * in one case in the Scott & Scherer DISC paper), which also 171 * increases asynchrony a bit, at the expense of poorer collision 172 * detection and inability to always reuse per-thread nodes. So 173 * the current scheme is typically a better tradeoff. 174 * 175 * On collisions, indices traverse the arena cyclically in reverse 176 * order, restarting at the maximum index (which will tend to be 177 * sparsest) when bounds change. (On expirations, indices instead 178 * are halved until reaching 0.) It is possible (and has been 179 * tried) to use randomized, prime-value-stepped, or double-hash 180 * style traversal instead of simple cyclic traversal to reduce 181 * bunching. But empirically, whatever benefits these may have 182 * don't overcome their added overhead: We are managing operations 183 * that occur very quickly unless there is sustained contention, 184 * so simpler/faster control policies work better than more 185 * accurate but slower ones. 186 * 187 * Because we use expiration for arena size control, we cannot 188 * throw TimeoutExceptions in the timed version of the public 189 * exchange method until the arena size has shrunken to zero (or 190 * the arena isn't enabled). This may delay response to timeout 191 * but is still within spec. 192 * 193 * Essentially all of the implementation is in methods 194 * slotExchange and arenaExchange. These have similar overall 195 * structure, but differ in too many details to combine. The 196 * slotExchange method uses the single Exchanger field "slot" 197 * rather than arena array elements. However, it still needs 198 * minimal collision detection to trigger arena construction. 199 * (The messiest part is making sure interrupt status and 200 * InterruptedExceptions come out right during transitions when 201 * both methods may be called. This is done by using null return 202 * as a sentinel to recheck interrupt status.) 203 * 204 * As is too common in this sort of code, methods are monolithic 205 * because most of the logic relies on reads of fields that are 206 * maintained as local variables so can't be nicely factored -- 207 * mainly, here, bulky spin->yield->block/cancel code), and 208 * heavily dependent on intrinsics (Unsafe) to use inlined 209 * embedded CAS and related memory access operations (that tend 210 * not to be as readily inlined by dynamic compilers when they are 211 * hidden behind other methods that would more nicely name and 212 * encapsulate the intended effects). This includes the use of 213 * putOrderedX to clear fields of the per-thread Nodes between 214 * uses. Note that field Node.item is not declared as volatile 215 * even though it is read by releasing threads, because they only 216 * do so after CAS operations that must precede access, and all 217 * uses by the owning thread are otherwise acceptably ordered by 218 * other operations. (Because the actual points of atomicity are 219 * slot CASes, it would also be legal for the write to Node.match 220 * in a release to be weaker than a full volatile write. However, 221 * this is not done because it could allow further postponement of 222 * the write, delaying progress.) 223 */ 224 225 /** 226 * The byte distance (as a shift value) between any two used slots 227 * in the arena. 1 << ASHIFT should be at least cacheline size. 228 */ 229 private static final int ASHIFT = 7; 230 231 /** 232 * The maximum supported arena index. The maximum allocatable 233 * arena size is MMASK + 1. Must be a power of two minus one, less 234 * than (1<<(31-ASHIFT)). The cap of 255 (0xff) more than suffices 235 * for the expected scaling limits of the main algorithms. 236 */ 237 private static final int MMASK = 0xff; 238 239 /** 240 * Unit for sequence/version bits of bound field. Each successful 241 * change to the bound also adds SEQ. 242 */ 243 private static final int SEQ = MMASK + 1; 244 245 /** The number of CPUs, for sizing and spin control */ 246 private static final int NCPU = Runtime.getRuntime().availableProcessors(); 247 248 /** 249 * The maximum slot index of the arena: The number of slots that 250 * can in principle hold all threads without contention, or at 251 * most the maximum indexable value. 252 */ 253 static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1; 254 255 /** 256 * The bound for spins while waiting for a match. The actual 257 * number of iterations will on average be about twice this value 258 * due to randomization. Note: Spinning is disabled when NCPU==1. 259 */ 260 private static final int SPINS = 1 << 10; 261 262 /** 263 * Value representing null arguments/returns from public 264 * methods. Needed because the API originally didn't disallow null 265 * arguments, which it should have. 266 */ 267 private static final Object NULL_ITEM = new Object(); 268 269 /** 270 * Sentinel value returned by internal exchange methods upon 271 * timeout, to avoid need for separate timed versions of these 272 * methods. 273 */ 274 private static final Object TIMED_OUT = new Object(); 275 276 /** 277 * Nodes hold partially exchanged data, plus other per-thread 278 * bookkeeping. Padded via @Contended to reduce memory contention. 279 */ 280 //@jdk.internal.vm.annotation.Contended // android-removed 281 static final class Node { 282 int index; // Arena index 283 int bound; // Last recorded value of Exchanger.bound 284 int collides; // Number of CAS failures at current bound 285 int hash; // Pseudo-random for spins 286 Object item; // This thread's current item 287 volatile Object match; // Item provided by releasing thread 288 volatile Thread parked; // Set to this thread when parked, else null 289 } 290 291 /** The corresponding thread local class */ 292 static final class Participant extends ThreadLocal<Node> { initialValue()293 public Node initialValue() { return new Node(); } 294 } 295 296 /** 297 * Per-thread state. 298 */ 299 private final Participant participant; 300 301 /** 302 * Elimination array; null until enabled (within slotExchange). 303 * Element accesses use emulation of volatile gets and CAS. 304 */ 305 private volatile Node[] arena; 306 307 /** 308 * Slot used until contention detected. 309 */ 310 private volatile Node slot; 311 312 /** 313 * The index of the largest valid arena position, OR'ed with SEQ 314 * number in high bits, incremented on each update. The initial 315 * update from 0 to SEQ is used to ensure that the arena array is 316 * constructed only once. 317 */ 318 private volatile int bound; 319 320 /** 321 * Exchange function when arenas enabled. See above for explanation. 322 * 323 * @param item the (non-null) item to exchange 324 * @param timed true if the wait is timed 325 * @param ns if timed, the maximum wait time, else 0L 326 * @return the other thread's item; or null if interrupted; or 327 * TIMED_OUT if timed and timed out 328 */ arenaExchange(Object item, boolean timed, long ns)329 private final Object arenaExchange(Object item, boolean timed, long ns) { 330 Node[] a = arena; 331 Node p = participant.get(); 332 for (int i = p.index;;) { // access slot at i 333 int b, m, c; long j; // j is raw array offset 334 Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); 335 if (q != null && U.compareAndSwapObject(a, j, q, null)) { 336 Object v = q.item; // release 337 q.match = item; 338 Thread w = q.parked; 339 if (w != null) 340 U.unpark(w); 341 return v; 342 } 343 else if (i <= (m = (b = bound) & MMASK) && q == null) { 344 p.item = item; // offer 345 if (U.compareAndSwapObject(a, j, null, p)) { 346 long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; 347 Thread t = Thread.currentThread(); // wait 348 for (int h = p.hash, spins = SPINS;;) { 349 Object v = p.match; 350 if (v != null) { 351 U.putOrderedObject(p, MATCH, null); 352 p.item = null; // clear for next use 353 p.hash = h; 354 return v; 355 } 356 else if (spins > 0) { 357 h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift 358 if (h == 0) // initialize hash 359 h = SPINS | (int)t.getId(); 360 else if (h < 0 && // approx 50% true 361 (--spins & ((SPINS >>> 1) - 1)) == 0) 362 Thread.yield(); // two yields per wait 363 } 364 else if (U.getObjectVolatile(a, j) != p) 365 spins = SPINS; // releaser hasn't set match yet 366 else if (!t.isInterrupted() && m == 0 && 367 (!timed || 368 (ns = end - System.nanoTime()) > 0L)) { 369 U.putObject(t, BLOCKER, this); // emulate LockSupport 370 p.parked = t; // minimize window 371 if (U.getObjectVolatile(a, j) == p) 372 U.park(false, ns); 373 p.parked = null; 374 U.putObject(t, BLOCKER, null); 375 } 376 else if (U.getObjectVolatile(a, j) == p && 377 U.compareAndSwapObject(a, j, p, null)) { 378 if (m != 0) // try to shrink 379 U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); 380 p.item = null; 381 p.hash = h; 382 i = p.index >>>= 1; // descend 383 if (Thread.interrupted()) 384 return null; 385 if (timed && m == 0 && ns <= 0L) 386 return TIMED_OUT; 387 break; // expired; restart 388 } 389 } 390 } 391 else 392 p.item = null; // clear offer 393 } 394 else { 395 if (p.bound != b) { // stale; reset 396 p.bound = b; 397 p.collides = 0; 398 i = (i != m || m == 0) ? m : m - 1; 399 } 400 else if ((c = p.collides) < m || m == FULL || 401 !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { 402 p.collides = c + 1; 403 i = (i == 0) ? m : i - 1; // cyclically traverse 404 } 405 else 406 i = m + 1; // grow 407 p.index = i; 408 } 409 } 410 } 411 412 /** 413 * Exchange function used until arenas enabled. See above for explanation. 414 * 415 * @param item the item to exchange 416 * @param timed true if the wait is timed 417 * @param ns if timed, the maximum wait time, else 0L 418 * @return the other thread's item; or null if either the arena 419 * was enabled or the thread was interrupted before completion; or 420 * TIMED_OUT if timed and timed out 421 */ slotExchange(Object item, boolean timed, long ns)422 private final Object slotExchange(Object item, boolean timed, long ns) { 423 Node p = participant.get(); 424 Thread t = Thread.currentThread(); 425 if (t.isInterrupted()) // preserve interrupt status so caller can recheck 426 return null; 427 428 for (Node q;;) { 429 if ((q = slot) != null) { 430 if (U.compareAndSwapObject(this, SLOT, q, null)) { 431 Object v = q.item; 432 q.match = item; 433 Thread w = q.parked; 434 if (w != null) 435 U.unpark(w); 436 return v; 437 } 438 // create arena on contention, but continue until slot null 439 if (NCPU > 1 && bound == 0 && 440 U.compareAndSwapInt(this, BOUND, 0, SEQ)) 441 arena = new Node[(FULL + 2) << ASHIFT]; 442 } 443 else if (arena != null) 444 return null; // caller must reroute to arenaExchange 445 else { 446 p.item = item; 447 if (U.compareAndSwapObject(this, SLOT, null, p)) 448 break; 449 p.item = null; 450 } 451 } 452 453 // await release 454 int h = p.hash; 455 long end = timed ? System.nanoTime() + ns : 0L; 456 int spins = (NCPU > 1) ? SPINS : 1; 457 Object v; 458 while ((v = p.match) == null) { 459 if (spins > 0) { 460 h ^= h << 1; h ^= h >>> 3; h ^= h << 10; 461 if (h == 0) 462 h = SPINS | (int)t.getId(); 463 else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) 464 Thread.yield(); 465 } 466 else if (slot != p) 467 spins = SPINS; 468 else if (!t.isInterrupted() && arena == null && 469 (!timed || (ns = end - System.nanoTime()) > 0L)) { 470 U.putObject(t, BLOCKER, this); 471 p.parked = t; 472 if (slot == p) 473 U.park(false, ns); 474 p.parked = null; 475 U.putObject(t, BLOCKER, null); 476 } 477 else if (U.compareAndSwapObject(this, SLOT, p, null)) { 478 v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; 479 break; 480 } 481 } 482 U.putOrderedObject(p, MATCH, null); 483 p.item = null; 484 p.hash = h; 485 return v; 486 } 487 488 /** 489 * Creates a new Exchanger. 490 */ Exchanger()491 public Exchanger() { 492 participant = new Participant(); 493 } 494 495 /** 496 * Waits for another thread to arrive at this exchange point (unless 497 * the current thread is {@linkplain Thread#interrupt interrupted}), 498 * and then transfers the given object to it, receiving its object 499 * in return. 500 * 501 * <p>If another thread is already waiting at the exchange point then 502 * it is resumed for thread scheduling purposes and receives the object 503 * passed in by the current thread. The current thread returns immediately, 504 * receiving the object passed to the exchange by that other thread. 505 * 506 * <p>If no other thread is already waiting at the exchange then the 507 * current thread is disabled for thread scheduling purposes and lies 508 * dormant until one of two things happens: 509 * <ul> 510 * <li>Some other thread enters the exchange; or 511 * <li>Some other thread {@linkplain Thread#interrupt interrupts} 512 * the current thread. 513 * </ul> 514 * <p>If the current thread: 515 * <ul> 516 * <li>has its interrupted status set on entry to this method; or 517 * <li>is {@linkplain Thread#interrupt interrupted} while waiting 518 * for the exchange, 519 * </ul> 520 * then {@link InterruptedException} is thrown and the current thread's 521 * interrupted status is cleared. 522 * 523 * @param x the object to exchange 524 * @return the object provided by the other thread 525 * @throws InterruptedException if the current thread was 526 * interrupted while waiting 527 */ 528 @SuppressWarnings("unchecked") exchange(V x)529 public V exchange(V x) throws InterruptedException { 530 Object v; 531 Object item = (x == null) ? NULL_ITEM : x; // translate null args 532 if ((arena != null || 533 (v = slotExchange(item, false, 0L)) == null) && 534 ((Thread.interrupted() || // disambiguates null return 535 (v = arenaExchange(item, false, 0L)) == null))) 536 throw new InterruptedException(); 537 return (v == NULL_ITEM) ? null : (V)v; 538 } 539 540 /** 541 * Waits for another thread to arrive at this exchange point (unless 542 * the current thread is {@linkplain Thread#interrupt interrupted} or 543 * the specified waiting time elapses), and then transfers the given 544 * object to it, receiving its object in return. 545 * 546 * <p>If another thread is already waiting at the exchange point then 547 * it is resumed for thread scheduling purposes and receives the object 548 * passed in by the current thread. The current thread returns immediately, 549 * receiving the object passed to the exchange by that other thread. 550 * 551 * <p>If no other thread is already waiting at the exchange then the 552 * current thread is disabled for thread scheduling purposes and lies 553 * dormant until one of three things happens: 554 * <ul> 555 * <li>Some other thread enters the exchange; or 556 * <li>Some other thread {@linkplain Thread#interrupt interrupts} 557 * the current thread; or 558 * <li>The specified waiting time elapses. 559 * </ul> 560 * <p>If the current thread: 561 * <ul> 562 * <li>has its interrupted status set on entry to this method; or 563 * <li>is {@linkplain Thread#interrupt interrupted} while waiting 564 * for the exchange, 565 * </ul> 566 * then {@link InterruptedException} is thrown and the current thread's 567 * interrupted status is cleared. 568 * 569 * <p>If the specified waiting time elapses then {@link 570 * TimeoutException} is thrown. If the time is less than or equal 571 * to zero, the method will not wait at all. 572 * 573 * @param x the object to exchange 574 * @param timeout the maximum time to wait 575 * @param unit the time unit of the {@code timeout} argument 576 * @return the object provided by the other thread 577 * @throws InterruptedException if the current thread was 578 * interrupted while waiting 579 * @throws TimeoutException if the specified waiting time elapses 580 * before another thread enters the exchange 581 */ 582 @SuppressWarnings("unchecked") exchange(V x, long timeout, TimeUnit unit)583 public V exchange(V x, long timeout, TimeUnit unit) 584 throws InterruptedException, TimeoutException { 585 Object v; 586 Object item = (x == null) ? NULL_ITEM : x; 587 long ns = unit.toNanos(timeout); 588 if ((arena != null || 589 (v = slotExchange(item, true, ns)) == null) && 590 ((Thread.interrupted() || 591 (v = arenaExchange(item, true, ns)) == null))) 592 throw new InterruptedException(); 593 if (v == TIMED_OUT) 594 throw new TimeoutException(); 595 return (v == NULL_ITEM) ? null : (V)v; 596 } 597 598 // Unsafe mechanics 599 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); 600 private static final long BOUND; 601 private static final long SLOT; 602 private static final long MATCH; 603 private static final long BLOCKER; 604 private static final int ABASE; 605 static { 606 try { 607 BOUND = U.objectFieldOffset 608 (Exchanger.class.getDeclaredField("bound")); 609 SLOT = U.objectFieldOffset 610 (Exchanger.class.getDeclaredField("slot")); 611 612 MATCH = U.objectFieldOffset 613 (Node.class.getDeclaredField("match")); 614 615 BLOCKER = U.objectFieldOffset 616 (Thread.class.getDeclaredField("parkBlocker")); 617 618 int scale = U.arrayIndexScale(Node[].class); 619 if ((scale & (scale - 1)) != 0 || scale > (1 << ASHIFT)) 620 throw new Error("Unsupported array scale"); 621 // ABASE absorbs padding in front of element 0 622 ABASE = U.arrayBaseOffset(Node[].class) + (1 << ASHIFT); 623 } catch (ReflectiveOperationException e) { 624 throw new Error(e); 625 } 626 } 627 628 } 629