1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7 package java.util.concurrent.locks; 8 9 import java.util.ArrayList; 10 import java.util.Collection; 11 import java.util.Date; 12 import java.util.concurrent.TimeUnit; 13 /// OPENJDK-9 import jdk.internal.vm.annotation.ReservedStackAccess; 14 15 /** 16 * Provides a framework for implementing blocking locks and related 17 * synchronizers (semaphores, events, etc) that rely on 18 * first-in-first-out (FIFO) wait queues. This class is designed to 19 * be a useful basis for most kinds of synchronizers that rely on a 20 * single atomic {@code int} value to represent state. Subclasses 21 * must define the protected methods that change this state, and which 22 * define what that state means in terms of this object being acquired 23 * or released. Given these, the other methods in this class carry 24 * out all queuing and blocking mechanics. Subclasses can maintain 25 * other state fields, but only the atomically updated {@code int} 26 * value manipulated using methods {@link #getState}, {@link 27 * #setState} and {@link #compareAndSetState} is tracked with respect 28 * to synchronization. 29 * 30 * <p>Subclasses should be defined as non-public internal helper 31 * classes that are used to implement the synchronization properties 32 * of their enclosing class. Class 33 * {@code AbstractQueuedSynchronizer} does not implement any 34 * synchronization interface. Instead it defines methods such as 35 * {@link #acquireInterruptibly} that can be invoked as 36 * appropriate by concrete locks and related synchronizers to 37 * implement their public methods. 38 * 39 * <p>This class supports either or both a default <em>exclusive</em> 40 * mode and a <em>shared</em> mode. When acquired in exclusive mode, 41 * attempted acquires by other threads cannot succeed. Shared mode 42 * acquires by multiple threads may (but need not) succeed. This class 43 * does not "understand" these differences except in the 44 * mechanical sense that when a shared mode acquire succeeds, the next 45 * waiting thread (if one exists) must also determine whether it can 46 * acquire as well. Threads waiting in the different modes share the 47 * same FIFO queue. Usually, implementation subclasses support only 48 * one of these modes, but both can come into play for example in a 49 * {@link ReadWriteLock}. Subclasses that support only exclusive or 50 * only shared modes need not define the methods supporting the unused mode. 51 * 52 * <p>This class defines a nested {@link ConditionObject} class that 53 * can be used as a {@link Condition} implementation by subclasses 54 * supporting exclusive mode for which method {@link 55 * #isHeldExclusively} reports whether synchronization is exclusively 56 * held with respect to the current thread, method {@link #release} 57 * invoked with the current {@link #getState} value fully releases 58 * this object, and {@link #acquire}, given this saved state value, 59 * eventually restores this object to its previous acquired state. No 60 * {@code AbstractQueuedSynchronizer} method otherwise creates such a 61 * condition, so if this constraint cannot be met, do not use it. The 62 * behavior of {@link ConditionObject} depends of course on the 63 * semantics of its synchronizer implementation. 64 * 65 * <p>This class provides inspection, instrumentation, and monitoring 66 * methods for the internal queue, as well as similar methods for 67 * condition objects. These can be exported as desired into classes 68 * using an {@code AbstractQueuedSynchronizer} for their 69 * synchronization mechanics. 70 * 71 * <p>Serialization of this class stores only the underlying atomic 72 * integer maintaining state, so deserialized objects have empty 73 * thread queues. Typical subclasses requiring serializability will 74 * define a {@code readObject} method that restores this to a known 75 * initial state upon deserialization. 76 * 77 * <h3>Usage</h3> 78 * 79 * <p>To use this class as the basis of a synchronizer, redefine the 80 * following methods, as applicable, by inspecting and/or modifying 81 * the synchronization state using {@link #getState}, {@link 82 * #setState} and/or {@link #compareAndSetState}: 83 * 84 * <ul> 85 * <li>{@link #tryAcquire} 86 * <li>{@link #tryRelease} 87 * <li>{@link #tryAcquireShared} 88 * <li>{@link #tryReleaseShared} 89 * <li>{@link #isHeldExclusively} 90 * </ul> 91 * 92 * Each of these methods by default throws {@link 93 * UnsupportedOperationException}. Implementations of these methods 94 * must be internally thread-safe, and should in general be short and 95 * not block. Defining these methods is the <em>only</em> supported 96 * means of using this class. All other methods are declared 97 * {@code final} because they cannot be independently varied. 98 * 99 * <p>You may also find the inherited methods from {@link 100 * AbstractOwnableSynchronizer} useful to keep track of the thread 101 * owning an exclusive synchronizer. You are encouraged to use them 102 * -- this enables monitoring and diagnostic tools to assist users in 103 * determining which threads hold locks. 104 * 105 * <p>Even though this class is based on an internal FIFO queue, it 106 * does not automatically enforce FIFO acquisition policies. The core 107 * of exclusive synchronization takes the form: 108 * 109 * <pre> 110 * Acquire: 111 * while (!tryAcquire(arg)) { 112 * <em>enqueue thread if it is not already queued</em>; 113 * <em>possibly block current thread</em>; 114 * } 115 * 116 * Release: 117 * if (tryRelease(arg)) 118 * <em>unblock the first queued thread</em>; 119 * </pre> 120 * 121 * (Shared mode is similar but may involve cascading signals.) 122 * 123 * <p id="barging">Because checks in acquire are invoked before 124 * enqueuing, a newly acquiring thread may <em>barge</em> ahead of 125 * others that are blocked and queued. However, you can, if desired, 126 * define {@code tryAcquire} and/or {@code tryAcquireShared} to 127 * disable barging by internally invoking one or more of the inspection 128 * methods, thereby providing a <em>fair</em> FIFO acquisition order. 129 * In particular, most fair synchronizers can define {@code tryAcquire} 130 * to return {@code false} if {@link #hasQueuedPredecessors} (a method 131 * specifically designed to be used by fair synchronizers) returns 132 * {@code true}. Other variations are possible. 133 * 134 * <p>Throughput and scalability are generally highest for the 135 * default barging (also known as <em>greedy</em>, 136 * <em>renouncement</em>, and <em>convoy-avoidance</em>) strategy. 137 * While this is not guaranteed to be fair or starvation-free, earlier 138 * queued threads are allowed to recontend before later queued 139 * threads, and each recontention has an unbiased chance to succeed 140 * against incoming threads. Also, while acquires do not 141 * "spin" in the usual sense, they may perform multiple 142 * invocations of {@code tryAcquire} interspersed with other 143 * computations before blocking. This gives most of the benefits of 144 * spins when exclusive synchronization is only briefly held, without 145 * most of the liabilities when it isn't. If so desired, you can 146 * augment this by preceding calls to acquire methods with 147 * "fast-path" checks, possibly prechecking {@link #hasContended} 148 * and/or {@link #hasQueuedThreads} to only do so if the synchronizer 149 * is likely not to be contended. 150 * 151 * <p>This class provides an efficient and scalable basis for 152 * synchronization in part by specializing its range of use to 153 * synchronizers that can rely on {@code int} state, acquire, and 154 * release parameters, and an internal FIFO wait queue. When this does 155 * not suffice, you can build synchronizers from a lower level using 156 * {@link java.util.concurrent.atomic atomic} classes, your own custom 157 * {@link java.util.Queue} classes, and {@link LockSupport} blocking 158 * support. 159 * 160 * <h3>Usage Examples</h3> 161 * 162 * <p>Here is a non-reentrant mutual exclusion lock class that uses 163 * the value zero to represent the unlocked state, and one to 164 * represent the locked state. While a non-reentrant lock 165 * does not strictly require recording of the current owner 166 * thread, this class does so anyway to make usage easier to monitor. 167 * It also supports conditions and exposes 168 * one of the instrumentation methods: 169 * 170 * <pre> {@code 171 * class Mutex implements Lock, java.io.Serializable { 172 * 173 * // Our internal helper class 174 * private static class Sync extends AbstractQueuedSynchronizer { 175 * // Reports whether in locked state 176 * protected boolean isHeldExclusively() { 177 * return getState() == 1; 178 * } 179 * 180 * // Acquires the lock if state is zero 181 * public boolean tryAcquire(int acquires) { 182 * assert acquires == 1; // Otherwise unused 183 * if (compareAndSetState(0, 1)) { 184 * setExclusiveOwnerThread(Thread.currentThread()); 185 * return true; 186 * } 187 * return false; 188 * } 189 * 190 * // Releases the lock by setting state to zero 191 * protected boolean tryRelease(int releases) { 192 * assert releases == 1; // Otherwise unused 193 * if (getState() == 0) throw new IllegalMonitorStateException(); 194 * setExclusiveOwnerThread(null); 195 * setState(0); 196 * return true; 197 * } 198 * 199 * // Provides a Condition 200 * Condition newCondition() { return new ConditionObject(); } 201 * 202 * // Deserializes properly 203 * private void readObject(ObjectInputStream s) 204 * throws IOException, ClassNotFoundException { 205 * s.defaultReadObject(); 206 * setState(0); // reset to unlocked state 207 * } 208 * } 209 * 210 * // The sync object does all the hard work. We just forward to it. 211 * private final Sync sync = new Sync(); 212 * 213 * public void lock() { sync.acquire(1); } 214 * public boolean tryLock() { return sync.tryAcquire(1); } 215 * public void unlock() { sync.release(1); } 216 * public Condition newCondition() { return sync.newCondition(); } 217 * public boolean isLocked() { return sync.isHeldExclusively(); } 218 * public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } 219 * public void lockInterruptibly() throws InterruptedException { 220 * sync.acquireInterruptibly(1); 221 * } 222 * public boolean tryLock(long timeout, TimeUnit unit) 223 * throws InterruptedException { 224 * return sync.tryAcquireNanos(1, unit.toNanos(timeout)); 225 * } 226 * }}</pre> 227 * 228 * <p>Here is a latch class that is like a 229 * {@link java.util.concurrent.CountDownLatch CountDownLatch} 230 * except that it only requires a single {@code signal} to 231 * fire. Because a latch is non-exclusive, it uses the {@code shared} 232 * acquire and release methods. 233 * 234 * <pre> {@code 235 * class BooleanLatch { 236 * 237 * private static class Sync extends AbstractQueuedSynchronizer { 238 * boolean isSignalled() { return getState() != 0; } 239 * 240 * protected int tryAcquireShared(int ignore) { 241 * return isSignalled() ? 1 : -1; 242 * } 243 * 244 * protected boolean tryReleaseShared(int ignore) { 245 * setState(1); 246 * return true; 247 * } 248 * } 249 * 250 * private final Sync sync = new Sync(); 251 * public boolean isSignalled() { return sync.isSignalled(); } 252 * public void signal() { sync.releaseShared(1); } 253 * public void await() throws InterruptedException { 254 * sync.acquireSharedInterruptibly(1); 255 * } 256 * }}</pre> 257 * 258 * @since 1.5 259 * @author Doug Lea 260 */ 261 public abstract class AbstractQueuedSynchronizer 262 extends AbstractOwnableSynchronizer 263 implements java.io.Serializable { 264 265 private static final long serialVersionUID = 7373984972572414691L; 266 267 /** 268 * Creates a new {@code AbstractQueuedSynchronizer} instance 269 * with initial synchronization state of zero. 270 */ AbstractQueuedSynchronizer()271 protected AbstractQueuedSynchronizer() { } 272 273 /** 274 * Wait queue node class. 275 * 276 * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and 277 * Hagersten) lock queue. CLH locks are normally used for 278 * spinlocks. We instead use them for blocking synchronizers, but 279 * use the same basic tactic of holding some of the control 280 * information about a thread in the predecessor of its node. A 281 * "status" field in each node keeps track of whether a thread 282 * should block. A node is signalled when its predecessor 283 * releases. Each node of the queue otherwise serves as a 284 * specific-notification-style monitor holding a single waiting 285 * thread. The status field does NOT control whether threads are 286 * granted locks etc though. A thread may try to acquire if it is 287 * first in the queue. But being first does not guarantee success; 288 * it only gives the right to contend. So the currently released 289 * contender thread may need to rewait. 290 * 291 * <p>To enqueue into a CLH lock, you atomically splice it in as new 292 * tail. To dequeue, you just set the head field. 293 * <pre> 294 * +------+ prev +-----+ +-----+ 295 * head | | <---- | | <---- | | tail 296 * +------+ +-----+ +-----+ 297 * </pre> 298 * 299 * <p>Insertion into a CLH queue requires only a single atomic 300 * operation on "tail", so there is a simple atomic point of 301 * demarcation from unqueued to queued. Similarly, dequeuing 302 * involves only updating the "head". However, it takes a bit 303 * more work for nodes to determine who their successors are, 304 * in part to deal with possible cancellation due to timeouts 305 * and interrupts. 306 * 307 * <p>The "prev" links (not used in original CLH locks), are mainly 308 * needed to handle cancellation. If a node is cancelled, its 309 * successor is (normally) relinked to a non-cancelled 310 * predecessor. For explanation of similar mechanics in the case 311 * of spin locks, see the papers by Scott and Scherer at 312 * http://www.cs.rochester.edu/u/scott/synchronization/ 313 * 314 * <p>We also use "next" links to implement blocking mechanics. 315 * The thread id for each node is kept in its own node, so a 316 * predecessor signals the next node to wake up by traversing 317 * next link to determine which thread it is. Determination of 318 * successor must avoid races with newly queued nodes to set 319 * the "next" fields of their predecessors. This is solved 320 * when necessary by checking backwards from the atomically 321 * updated "tail" when a node's successor appears to be null. 322 * (Or, said differently, the next-links are an optimization 323 * so that we don't usually need a backward scan.) 324 * 325 * <p>Cancellation introduces some conservatism to the basic 326 * algorithms. Since we must poll for cancellation of other 327 * nodes, we can miss noticing whether a cancelled node is 328 * ahead or behind us. This is dealt with by always unparking 329 * successors upon cancellation, allowing them to stabilize on 330 * a new predecessor, unless we can identify an uncancelled 331 * predecessor who will carry this responsibility. 332 * 333 * <p>CLH queues need a dummy header node to get started. But 334 * we don't create them on construction, because it would be wasted 335 * effort if there is never contention. Instead, the node 336 * is constructed and head and tail pointers are set upon first 337 * contention. 338 * 339 * <p>Threads waiting on Conditions use the same nodes, but 340 * use an additional link. Conditions only need to link nodes 341 * in simple (non-concurrent) linked queues because they are 342 * only accessed when exclusively held. Upon await, a node is 343 * inserted into a condition queue. Upon signal, the node is 344 * transferred to the main queue. A special value of status 345 * field is used to mark which queue a node is on. 346 * 347 * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill 348 * Scherer and Michael Scott, along with members of JSR-166 349 * expert group, for helpful ideas, discussions, and critiques 350 * on the design of this class. 351 */ 352 static final class Node { 353 /** Marker to indicate a node is waiting in shared mode */ 354 static final Node SHARED = new Node(); 355 /** Marker to indicate a node is waiting in exclusive mode */ 356 static final Node EXCLUSIVE = null; 357 358 /** waitStatus value to indicate thread has cancelled. */ 359 static final int CANCELLED = 1; 360 /** waitStatus value to indicate successor's thread needs unparking. */ 361 static final int SIGNAL = -1; 362 /** waitStatus value to indicate thread is waiting on condition. */ 363 static final int CONDITION = -2; 364 /** 365 * waitStatus value to indicate the next acquireShared should 366 * unconditionally propagate. 367 */ 368 static final int PROPAGATE = -3; 369 370 /** 371 * Status field, taking on only the values: 372 * SIGNAL: The successor of this node is (or will soon be) 373 * blocked (via park), so the current node must 374 * unpark its successor when it releases or 375 * cancels. To avoid races, acquire methods must 376 * first indicate they need a signal, 377 * then retry the atomic acquire, and then, 378 * on failure, block. 379 * CANCELLED: This node is cancelled due to timeout or interrupt. 380 * Nodes never leave this state. In particular, 381 * a thread with cancelled node never again blocks. 382 * CONDITION: This node is currently on a condition queue. 383 * It will not be used as a sync queue node 384 * until transferred, at which time the status 385 * will be set to 0. (Use of this value here has 386 * nothing to do with the other uses of the 387 * field, but simplifies mechanics.) 388 * PROPAGATE: A releaseShared should be propagated to other 389 * nodes. This is set (for head node only) in 390 * doReleaseShared to ensure propagation 391 * continues, even if other operations have 392 * since intervened. 393 * 0: None of the above 394 * 395 * The values are arranged numerically to simplify use. 396 * Non-negative values mean that a node doesn't need to 397 * signal. So, most code doesn't need to check for particular 398 * values, just for sign. 399 * 400 * The field is initialized to 0 for normal sync nodes, and 401 * CONDITION for condition nodes. It is modified using CAS 402 * (or when possible, unconditional volatile writes). 403 */ 404 volatile int waitStatus; 405 406 /** 407 * Link to predecessor node that current node/thread relies on 408 * for checking waitStatus. Assigned during enqueuing, and nulled 409 * out (for sake of GC) only upon dequeuing. Also, upon 410 * cancellation of a predecessor, we short-circuit while 411 * finding a non-cancelled one, which will always exist 412 * because the head node is never cancelled: A node becomes 413 * head only as a result of successful acquire. A 414 * cancelled thread never succeeds in acquiring, and a thread only 415 * cancels itself, not any other node. 416 */ 417 volatile Node prev; 418 419 /** 420 * Link to the successor node that the current node/thread 421 * unparks upon release. Assigned during enqueuing, adjusted 422 * when bypassing cancelled predecessors, and nulled out (for 423 * sake of GC) when dequeued. The enq operation does not 424 * assign next field of a predecessor until after attachment, 425 * so seeing a null next field does not necessarily mean that 426 * node is at end of queue. However, if a next field appears 427 * to be null, we can scan prev's from the tail to 428 * double-check. The next field of cancelled nodes is set to 429 * point to the node itself instead of null, to make life 430 * easier for isOnSyncQueue. 431 */ 432 volatile Node next; 433 434 /** 435 * The thread that enqueued this node. Initialized on 436 * construction and nulled out after use. 437 */ 438 volatile Thread thread; 439 440 /** 441 * Link to next node waiting on condition, or the special 442 * value SHARED. Because condition queues are accessed only 443 * when holding in exclusive mode, we just need a simple 444 * linked queue to hold nodes while they are waiting on 445 * conditions. They are then transferred to the queue to 446 * re-acquire. And because conditions can only be exclusive, 447 * we save a field by using special value to indicate shared 448 * mode. 449 */ 450 Node nextWaiter; 451 452 /** 453 * Returns true if node is waiting in shared mode. 454 */ isShared()455 final boolean isShared() { 456 return nextWaiter == SHARED; 457 } 458 459 /** 460 * Returns previous node, or throws NullPointerException if null. 461 * Use when predecessor cannot be null. The null check could 462 * be elided, but is present to help the VM. 463 * 464 * @return the predecessor of this node 465 */ predecessor()466 final Node predecessor() throws NullPointerException { 467 Node p = prev; 468 if (p == null) 469 throw new NullPointerException(); 470 else 471 return p; 472 } 473 474 /** Establishes initial head or SHARED marker. */ Node()475 Node() {} 476 477 /** Constructor used by addWaiter. */ Node(Node nextWaiter)478 Node(Node nextWaiter) { 479 this.nextWaiter = nextWaiter; 480 U.putObject(this, THREAD, Thread.currentThread()); 481 } 482 483 /** Constructor used by addConditionWaiter. */ Node(int waitStatus)484 Node(int waitStatus) { 485 U.putInt(this, WAITSTATUS, waitStatus); 486 U.putObject(this, THREAD, Thread.currentThread()); 487 } 488 489 /** CASes waitStatus field. */ compareAndSetWaitStatus(int expect, int update)490 final boolean compareAndSetWaitStatus(int expect, int update) { 491 return U.compareAndSwapInt(this, WAITSTATUS, expect, update); 492 } 493 494 /** CASes next field. */ compareAndSetNext(Node expect, Node update)495 final boolean compareAndSetNext(Node expect, Node update) { 496 return U.compareAndSwapObject(this, NEXT, expect, update); 497 } 498 499 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); 500 private static final long NEXT; 501 static final long PREV; 502 private static final long THREAD; 503 private static final long WAITSTATUS; 504 static { 505 try { 506 NEXT = U.objectFieldOffset 507 (Node.class.getDeclaredField("next")); 508 PREV = U.objectFieldOffset 509 (Node.class.getDeclaredField("prev")); 510 THREAD = U.objectFieldOffset 511 (Node.class.getDeclaredField("thread")); 512 WAITSTATUS = U.objectFieldOffset 513 (Node.class.getDeclaredField("waitStatus")); 514 } catch (ReflectiveOperationException e) { 515 throw new Error(e); 516 } 517 } 518 } 519 520 /** 521 * Head of the wait queue, lazily initialized. Except for 522 * initialization, it is modified only via method setHead. Note: 523 * If head exists, its waitStatus is guaranteed not to be 524 * CANCELLED. 525 */ 526 private transient volatile Node head; 527 528 /** 529 * Tail of the wait queue, lazily initialized. Modified only via 530 * method enq to add new wait node. 531 */ 532 private transient volatile Node tail; 533 534 /** 535 * The synchronization state. 536 */ 537 private volatile int state; 538 539 /** 540 * Returns the current value of synchronization state. 541 * This operation has memory semantics of a {@code volatile} read. 542 * @return current state value 543 */ getState()544 protected final int getState() { 545 return state; 546 } 547 548 /** 549 * Sets the value of synchronization state. 550 * This operation has memory semantics of a {@code volatile} write. 551 * @param newState the new state value 552 */ setState(int newState)553 protected final void setState(int newState) { 554 state = newState; 555 } 556 557 /** 558 * Atomically sets synchronization state to the given updated 559 * value if the current state value equals the expected value. 560 * This operation has memory semantics of a {@code volatile} read 561 * and write. 562 * 563 * @param expect the expected value 564 * @param update the new value 565 * @return {@code true} if successful. False return indicates that the actual 566 * value was not equal to the expected value. 567 */ compareAndSetState(int expect, int update)568 protected final boolean compareAndSetState(int expect, int update) { 569 return U.compareAndSwapInt(this, STATE, expect, update); 570 } 571 572 // Queuing utilities 573 574 /** 575 * The number of nanoseconds for which it is faster to spin 576 * rather than to use timed park. A rough estimate suffices 577 * to improve responsiveness with very short timeouts. 578 */ 579 static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L; 580 581 /** 582 * Inserts node into queue, initializing if necessary. See picture above. 583 * @param node the node to insert 584 * @return node's predecessor 585 */ enq(Node node)586 private Node enq(Node node) { 587 for (;;) { 588 Node oldTail = tail; 589 if (oldTail != null) { 590 U.putObject(node, Node.PREV, oldTail); 591 if (compareAndSetTail(oldTail, node)) { 592 oldTail.next = node; 593 return oldTail; 594 } 595 } else { 596 initializeSyncQueue(); 597 } 598 } 599 } 600 601 /** 602 * Creates and enqueues node for current thread and given mode. 603 * 604 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 605 * @return the new node 606 */ addWaiter(Node mode)607 private Node addWaiter(Node mode) { 608 Node node = new Node(mode); 609 610 for (;;) { 611 Node oldTail = tail; 612 if (oldTail != null) { 613 U.putObject(node, Node.PREV, oldTail); 614 if (compareAndSetTail(oldTail, node)) { 615 oldTail.next = node; 616 return node; 617 } 618 } else { 619 initializeSyncQueue(); 620 } 621 } 622 } 623 624 /** 625 * Sets head of queue to be node, thus dequeuing. Called only by 626 * acquire methods. Also nulls out unused fields for sake of GC 627 * and to suppress unnecessary signals and traversals. 628 * 629 * @param node the node 630 */ setHead(Node node)631 private void setHead(Node node) { 632 head = node; 633 node.thread = null; 634 node.prev = null; 635 } 636 637 /** 638 * Wakes up node's successor, if one exists. 639 * 640 * @param node the node 641 */ unparkSuccessor(Node node)642 private void unparkSuccessor(Node node) { 643 /* 644 * If status is negative (i.e., possibly needing signal) try 645 * to clear in anticipation of signalling. It is OK if this 646 * fails or if status is changed by waiting thread. 647 */ 648 int ws = node.waitStatus; 649 if (ws < 0) 650 node.compareAndSetWaitStatus(ws, 0); 651 652 /* 653 * Thread to unpark is held in successor, which is normally 654 * just the next node. But if cancelled or apparently null, 655 * traverse backwards from tail to find the actual 656 * non-cancelled successor. 657 */ 658 Node s = node.next; 659 if (s == null || s.waitStatus > 0) { 660 s = null; 661 for (Node p = tail; p != node && p != null; p = p.prev) 662 if (p.waitStatus <= 0) 663 s = p; 664 } 665 if (s != null) 666 LockSupport.unpark(s.thread); 667 } 668 669 /** 670 * Release action for shared mode -- signals successor and ensures 671 * propagation. (Note: For exclusive mode, release just amounts 672 * to calling unparkSuccessor of head if it needs signal.) 673 */ doReleaseShared()674 private void doReleaseShared() { 675 /* 676 * Ensure that a release propagates, even if there are other 677 * in-progress acquires/releases. This proceeds in the usual 678 * way of trying to unparkSuccessor of head if it needs 679 * signal. But if it does not, status is set to PROPAGATE to 680 * ensure that upon release, propagation continues. 681 * Additionally, we must loop in case a new node is added 682 * while we are doing this. Also, unlike other uses of 683 * unparkSuccessor, we need to know if CAS to reset status 684 * fails, if so rechecking. 685 */ 686 for (;;) { 687 Node h = head; 688 if (h != null && h != tail) { 689 int ws = h.waitStatus; 690 if (ws == Node.SIGNAL) { 691 if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) 692 continue; // loop to recheck cases 693 unparkSuccessor(h); 694 } 695 else if (ws == 0 && 696 !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) 697 continue; // loop on failed CAS 698 } 699 if (h == head) // loop if head changed 700 break; 701 } 702 } 703 704 /** 705 * Sets head of queue, and checks if successor may be waiting 706 * in shared mode, if so propagating if either propagate > 0 or 707 * PROPAGATE status was set. 708 * 709 * @param node the node 710 * @param propagate the return value from a tryAcquireShared 711 */ setHeadAndPropagate(Node node, int propagate)712 private void setHeadAndPropagate(Node node, int propagate) { 713 Node h = head; // Record old head for check below 714 setHead(node); 715 /* 716 * Try to signal next queued node if: 717 * Propagation was indicated by caller, 718 * or was recorded (as h.waitStatus either before 719 * or after setHead) by a previous operation 720 * (note: this uses sign-check of waitStatus because 721 * PROPAGATE status may transition to SIGNAL.) 722 * and 723 * The next node is waiting in shared mode, 724 * or we don't know, because it appears null 725 * 726 * The conservatism in both of these checks may cause 727 * unnecessary wake-ups, but only when there are multiple 728 * racing acquires/releases, so most need signals now or soon 729 * anyway. 730 */ 731 if (propagate > 0 || h == null || h.waitStatus < 0 || 732 (h = head) == null || h.waitStatus < 0) { 733 Node s = node.next; 734 if (s == null || s.isShared()) 735 doReleaseShared(); 736 } 737 } 738 739 // Utilities for various versions of acquire 740 741 /** 742 * Cancels an ongoing attempt to acquire. 743 * 744 * @param node the node 745 */ cancelAcquire(Node node)746 private void cancelAcquire(Node node) { 747 // Ignore if node doesn't exist 748 if (node == null) 749 return; 750 751 node.thread = null; 752 753 // Skip cancelled predecessors 754 Node pred = node.prev; 755 while (pred.waitStatus > 0) 756 node.prev = pred = pred.prev; 757 758 // predNext is the apparent node to unsplice. CASes below will 759 // fail if not, in which case, we lost race vs another cancel 760 // or signal, so no further action is necessary. 761 Node predNext = pred.next; 762 763 // Can use unconditional write instead of CAS here. 764 // After this atomic step, other Nodes can skip past us. 765 // Before, we are free of interference from other threads. 766 node.waitStatus = Node.CANCELLED; 767 768 // If we are the tail, remove ourselves. 769 if (node == tail && compareAndSetTail(node, pred)) { 770 pred.compareAndSetNext(predNext, null); 771 } else { 772 // If successor needs signal, try to set pred's next-link 773 // so it will get one. Otherwise wake it up to propagate. 774 int ws; 775 if (pred != head && 776 ((ws = pred.waitStatus) == Node.SIGNAL || 777 (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && 778 pred.thread != null) { 779 Node next = node.next; 780 if (next != null && next.waitStatus <= 0) 781 pred.compareAndSetNext(predNext, next); 782 } else { 783 unparkSuccessor(node); 784 } 785 786 node.next = node; // help GC 787 } 788 } 789 790 /** 791 * Checks and updates status for a node that failed to acquire. 792 * Returns true if thread should block. This is the main signal 793 * control in all acquire loops. Requires that pred == node.prev. 794 * 795 * @param pred node's predecessor holding status 796 * @param node the node 797 * @return {@code true} if thread should block 798 */ shouldParkAfterFailedAcquire(Node pred, Node node)799 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 800 int ws = pred.waitStatus; 801 if (ws == Node.SIGNAL) 802 /* 803 * This node has already set status asking a release 804 * to signal it, so it can safely park. 805 */ 806 return true; 807 if (ws > 0) { 808 /* 809 * Predecessor was cancelled. Skip over predecessors and 810 * indicate retry. 811 */ 812 do { 813 node.prev = pred = pred.prev; 814 } while (pred.waitStatus > 0); 815 pred.next = node; 816 } else { 817 /* 818 * waitStatus must be 0 or PROPAGATE. Indicate that we 819 * need a signal, but don't park yet. Caller will need to 820 * retry to make sure it cannot acquire before parking. 821 */ 822 pred.compareAndSetWaitStatus(ws, Node.SIGNAL); 823 } 824 return false; 825 } 826 827 /** 828 * Convenience method to interrupt current thread. 829 */ selfInterrupt()830 static void selfInterrupt() { 831 Thread.currentThread().interrupt(); 832 } 833 834 /** 835 * Convenience method to park and then check if interrupted. 836 * 837 * @return {@code true} if interrupted 838 */ parkAndCheckInterrupt()839 private final boolean parkAndCheckInterrupt() { 840 LockSupport.park(this); 841 return Thread.interrupted(); 842 } 843 844 /* 845 * Various flavors of acquire, varying in exclusive/shared and 846 * control modes. Each is mostly the same, but annoyingly 847 * different. Only a little bit of factoring is possible due to 848 * interactions of exception mechanics (including ensuring that we 849 * cancel if tryAcquire throws exception) and other control, at 850 * least not without hurting performance too much. 851 */ 852 853 /** 854 * Acquires in exclusive uninterruptible mode for thread already in 855 * queue. Used by condition wait methods as well as acquire. 856 * 857 * @param node the node 858 * @param arg the acquire argument 859 * @return {@code true} if interrupted while waiting 860 */ 861 /// OPENJDK-9 @ReservedStackAccess acquireQueued(final Node node, int arg)862 final boolean acquireQueued(final Node node, int arg) { 863 try { 864 boolean interrupted = false; 865 for (;;) { 866 final Node p = node.predecessor(); 867 if (p == head && tryAcquire(arg)) { 868 setHead(node); 869 p.next = null; // help GC 870 return interrupted; 871 } 872 if (shouldParkAfterFailedAcquire(p, node) && 873 parkAndCheckInterrupt()) 874 interrupted = true; 875 } 876 } catch (Throwable t) { 877 cancelAcquire(node); 878 throw t; 879 } 880 } 881 882 /** 883 * Acquires in exclusive interruptible mode. 884 * @param arg the acquire argument 885 */ doAcquireInterruptibly(int arg)886 private void doAcquireInterruptibly(int arg) 887 throws InterruptedException { 888 final Node node = addWaiter(Node.EXCLUSIVE); 889 try { 890 for (;;) { 891 final Node p = node.predecessor(); 892 if (p == head && tryAcquire(arg)) { 893 setHead(node); 894 p.next = null; // help GC 895 return; 896 } 897 if (shouldParkAfterFailedAcquire(p, node) && 898 parkAndCheckInterrupt()) 899 throw new InterruptedException(); 900 } 901 } catch (Throwable t) { 902 cancelAcquire(node); 903 throw t; 904 } 905 } 906 907 /** 908 * Acquires in exclusive timed mode. 909 * 910 * @param arg the acquire argument 911 * @param nanosTimeout max wait time 912 * @return {@code true} if acquired 913 */ doAcquireNanos(int arg, long nanosTimeout)914 private boolean doAcquireNanos(int arg, long nanosTimeout) 915 throws InterruptedException { 916 if (nanosTimeout <= 0L) 917 return false; 918 final long deadline = System.nanoTime() + nanosTimeout; 919 final Node node = addWaiter(Node.EXCLUSIVE); 920 try { 921 for (;;) { 922 final Node p = node.predecessor(); 923 if (p == head && tryAcquire(arg)) { 924 setHead(node); 925 p.next = null; // help GC 926 return true; 927 } 928 nanosTimeout = deadline - System.nanoTime(); 929 if (nanosTimeout <= 0L) { 930 cancelAcquire(node); 931 return false; 932 } 933 if (shouldParkAfterFailedAcquire(p, node) && 934 nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) 935 LockSupport.parkNanos(this, nanosTimeout); 936 if (Thread.interrupted()) 937 throw new InterruptedException(); 938 } 939 } catch (Throwable t) { 940 cancelAcquire(node); 941 throw t; 942 } 943 } 944 945 /** 946 * Acquires in shared uninterruptible mode. 947 * @param arg the acquire argument 948 */ doAcquireShared(int arg)949 private void doAcquireShared(int arg) { 950 final Node node = addWaiter(Node.SHARED); 951 try { 952 boolean interrupted = false; 953 for (;;) { 954 final Node p = node.predecessor(); 955 if (p == head) { 956 int r = tryAcquireShared(arg); 957 if (r >= 0) { 958 setHeadAndPropagate(node, r); 959 p.next = null; // help GC 960 if (interrupted) 961 selfInterrupt(); 962 return; 963 } 964 } 965 if (shouldParkAfterFailedAcquire(p, node) && 966 parkAndCheckInterrupt()) 967 interrupted = true; 968 } 969 } catch (Throwable t) { 970 cancelAcquire(node); 971 throw t; 972 } 973 } 974 975 /** 976 * Acquires in shared interruptible mode. 977 * @param arg the acquire argument 978 */ doAcquireSharedInterruptibly(int arg)979 private void doAcquireSharedInterruptibly(int arg) 980 throws InterruptedException { 981 final Node node = addWaiter(Node.SHARED); 982 try { 983 for (;;) { 984 final Node p = node.predecessor(); 985 if (p == head) { 986 int r = tryAcquireShared(arg); 987 if (r >= 0) { 988 setHeadAndPropagate(node, r); 989 p.next = null; // help GC 990 return; 991 } 992 } 993 if (shouldParkAfterFailedAcquire(p, node) && 994 parkAndCheckInterrupt()) 995 throw new InterruptedException(); 996 } 997 } catch (Throwable t) { 998 cancelAcquire(node); 999 throw t; 1000 } 1001 } 1002 1003 /** 1004 * Acquires in shared timed mode. 1005 * 1006 * @param arg the acquire argument 1007 * @param nanosTimeout max wait time 1008 * @return {@code true} if acquired 1009 */ doAcquireSharedNanos(int arg, long nanosTimeout)1010 private boolean doAcquireSharedNanos(int arg, long nanosTimeout) 1011 throws InterruptedException { 1012 if (nanosTimeout <= 0L) 1013 return false; 1014 final long deadline = System.nanoTime() + nanosTimeout; 1015 final Node node = addWaiter(Node.SHARED); 1016 try { 1017 for (;;) { 1018 final Node p = node.predecessor(); 1019 if (p == head) { 1020 int r = tryAcquireShared(arg); 1021 if (r >= 0) { 1022 setHeadAndPropagate(node, r); 1023 p.next = null; // help GC 1024 return true; 1025 } 1026 } 1027 nanosTimeout = deadline - System.nanoTime(); 1028 if (nanosTimeout <= 0L) { 1029 cancelAcquire(node); 1030 return false; 1031 } 1032 if (shouldParkAfterFailedAcquire(p, node) && 1033 nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) 1034 LockSupport.parkNanos(this, nanosTimeout); 1035 if (Thread.interrupted()) 1036 throw new InterruptedException(); 1037 } 1038 } catch (Throwable t) { 1039 cancelAcquire(node); 1040 throw t; 1041 } 1042 } 1043 1044 // Main exported methods 1045 1046 /** 1047 * Attempts to acquire in exclusive mode. This method should query 1048 * if the state of the object permits it to be acquired in the 1049 * exclusive mode, and if so to acquire it. 1050 * 1051 * <p>This method is always invoked by the thread performing 1052 * acquire. If this method reports failure, the acquire method 1053 * may queue the thread, if it is not already queued, until it is 1054 * signalled by a release from some other thread. This can be used 1055 * to implement method {@link Lock#tryLock()}. 1056 * 1057 * <p>The default 1058 * implementation throws {@link UnsupportedOperationException}. 1059 * 1060 * @param arg the acquire argument. This value is always the one 1061 * passed to an acquire method, or is the value saved on entry 1062 * to a condition wait. The value is otherwise uninterpreted 1063 * and can represent anything you like. 1064 * @return {@code true} if successful. Upon success, this object has 1065 * been acquired. 1066 * @throws IllegalMonitorStateException if acquiring would place this 1067 * synchronizer in an illegal state. This exception must be 1068 * thrown in a consistent fashion for synchronization to work 1069 * correctly. 1070 * @throws UnsupportedOperationException if exclusive mode is not supported 1071 */ tryAcquire(int arg)1072 protected boolean tryAcquire(int arg) { 1073 throw new UnsupportedOperationException(); 1074 } 1075 1076 /** 1077 * Attempts to set the state to reflect a release in exclusive 1078 * mode. 1079 * 1080 * <p>This method is always invoked by the thread performing release. 1081 * 1082 * <p>The default implementation throws 1083 * {@link UnsupportedOperationException}. 1084 * 1085 * @param arg the release argument. This value is always the one 1086 * passed to a release method, or the current state value upon 1087 * entry to a condition wait. The value is otherwise 1088 * uninterpreted and can represent anything you like. 1089 * @return {@code true} if this object is now in a fully released 1090 * state, so that any waiting threads may attempt to acquire; 1091 * and {@code false} otherwise. 1092 * @throws IllegalMonitorStateException if releasing would place this 1093 * synchronizer in an illegal state. This exception must be 1094 * thrown in a consistent fashion for synchronization to work 1095 * correctly. 1096 * @throws UnsupportedOperationException if exclusive mode is not supported 1097 */ tryRelease(int arg)1098 protected boolean tryRelease(int arg) { 1099 throw new UnsupportedOperationException(); 1100 } 1101 1102 /** 1103 * Attempts to acquire in shared mode. This method should query if 1104 * the state of the object permits it to be acquired in the shared 1105 * mode, and if so to acquire it. 1106 * 1107 * <p>This method is always invoked by the thread performing 1108 * acquire. If this method reports failure, the acquire method 1109 * may queue the thread, if it is not already queued, until it is 1110 * signalled by a release from some other thread. 1111 * 1112 * <p>The default implementation throws {@link 1113 * UnsupportedOperationException}. 1114 * 1115 * @param arg the acquire argument. This value is always the one 1116 * passed to an acquire method, or is the value saved on entry 1117 * to a condition wait. The value is otherwise uninterpreted 1118 * and can represent anything you like. 1119 * @return a negative value on failure; zero if acquisition in shared 1120 * mode succeeded but no subsequent shared-mode acquire can 1121 * succeed; and a positive value if acquisition in shared 1122 * mode succeeded and subsequent shared-mode acquires might 1123 * also succeed, in which case a subsequent waiting thread 1124 * must check availability. (Support for three different 1125 * return values enables this method to be used in contexts 1126 * where acquires only sometimes act exclusively.) Upon 1127 * success, this object has been acquired. 1128 * @throws IllegalMonitorStateException if acquiring would place this 1129 * synchronizer in an illegal state. This exception must be 1130 * thrown in a consistent fashion for synchronization to work 1131 * correctly. 1132 * @throws UnsupportedOperationException if shared mode is not supported 1133 */ tryAcquireShared(int arg)1134 protected int tryAcquireShared(int arg) { 1135 throw new UnsupportedOperationException(); 1136 } 1137 1138 /** 1139 * Attempts to set the state to reflect a release in shared mode. 1140 * 1141 * <p>This method is always invoked by the thread performing release. 1142 * 1143 * <p>The default implementation throws 1144 * {@link UnsupportedOperationException}. 1145 * 1146 * @param arg the release argument. This value is always the one 1147 * passed to a release method, or the current state value upon 1148 * entry to a condition wait. The value is otherwise 1149 * uninterpreted and can represent anything you like. 1150 * @return {@code true} if this release of shared mode may permit a 1151 * waiting acquire (shared or exclusive) to succeed; and 1152 * {@code false} otherwise 1153 * @throws IllegalMonitorStateException if releasing would place this 1154 * synchronizer in an illegal state. This exception must be 1155 * thrown in a consistent fashion for synchronization to work 1156 * correctly. 1157 * @throws UnsupportedOperationException if shared mode is not supported 1158 */ tryReleaseShared(int arg)1159 protected boolean tryReleaseShared(int arg) { 1160 throw new UnsupportedOperationException(); 1161 } 1162 1163 /** 1164 * Returns {@code true} if synchronization is held exclusively with 1165 * respect to the current (calling) thread. This method is invoked 1166 * upon each call to a non-waiting {@link ConditionObject} method. 1167 * (Waiting methods instead invoke {@link #release}.) 1168 * 1169 * <p>The default implementation throws {@link 1170 * UnsupportedOperationException}. This method is invoked 1171 * internally only within {@link ConditionObject} methods, so need 1172 * not be defined if conditions are not used. 1173 * 1174 * @return {@code true} if synchronization is held exclusively; 1175 * {@code false} otherwise 1176 * @throws UnsupportedOperationException if conditions are not supported 1177 */ isHeldExclusively()1178 protected boolean isHeldExclusively() { 1179 throw new UnsupportedOperationException(); 1180 } 1181 1182 /** 1183 * Acquires in exclusive mode, ignoring interrupts. Implemented 1184 * by invoking at least once {@link #tryAcquire}, 1185 * returning on success. Otherwise the thread is queued, possibly 1186 * repeatedly blocking and unblocking, invoking {@link 1187 * #tryAcquire} until success. This method can be used 1188 * to implement method {@link Lock#lock}. 1189 * 1190 * @param arg the acquire argument. This value is conveyed to 1191 * {@link #tryAcquire} but is otherwise uninterpreted and 1192 * can represent anything you like. 1193 */ 1194 /// OPENJDK-9 @ReservedStackAccess acquire(int arg)1195 public final void acquire(int arg) { 1196 if (!tryAcquire(arg) && 1197 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 1198 selfInterrupt(); 1199 } 1200 1201 /** 1202 * Acquires in exclusive mode, aborting if interrupted. 1203 * Implemented by first checking interrupt status, then invoking 1204 * at least once {@link #tryAcquire}, returning on 1205 * success. Otherwise the thread is queued, possibly repeatedly 1206 * blocking and unblocking, invoking {@link #tryAcquire} 1207 * until success or the thread is interrupted. This method can be 1208 * used to implement method {@link Lock#lockInterruptibly}. 1209 * 1210 * @param arg the acquire argument. This value is conveyed to 1211 * {@link #tryAcquire} but is otherwise uninterpreted and 1212 * can represent anything you like. 1213 * @throws InterruptedException if the current thread is interrupted 1214 */ acquireInterruptibly(int arg)1215 public final void acquireInterruptibly(int arg) 1216 throws InterruptedException { 1217 if (Thread.interrupted()) 1218 throw new InterruptedException(); 1219 if (!tryAcquire(arg)) 1220 doAcquireInterruptibly(arg); 1221 } 1222 1223 /** 1224 * Attempts to acquire in exclusive mode, aborting if interrupted, 1225 * and failing if the given timeout elapses. Implemented by first 1226 * checking interrupt status, then invoking at least once {@link 1227 * #tryAcquire}, returning on success. Otherwise, the thread is 1228 * queued, possibly repeatedly blocking and unblocking, invoking 1229 * {@link #tryAcquire} until success or the thread is interrupted 1230 * or the timeout elapses. This method can be used to implement 1231 * method {@link Lock#tryLock(long, TimeUnit)}. 1232 * 1233 * @param arg the acquire argument. This value is conveyed to 1234 * {@link #tryAcquire} but is otherwise uninterpreted and 1235 * can represent anything you like. 1236 * @param nanosTimeout the maximum number of nanoseconds to wait 1237 * @return {@code true} if acquired; {@code false} if timed out 1238 * @throws InterruptedException if the current thread is interrupted 1239 */ tryAcquireNanos(int arg, long nanosTimeout)1240 public final boolean tryAcquireNanos(int arg, long nanosTimeout) 1241 throws InterruptedException { 1242 if (Thread.interrupted()) 1243 throw new InterruptedException(); 1244 return tryAcquire(arg) || 1245 doAcquireNanos(arg, nanosTimeout); 1246 } 1247 1248 /** 1249 * Releases in exclusive mode. Implemented by unblocking one or 1250 * more threads if {@link #tryRelease} returns true. 1251 * This method can be used to implement method {@link Lock#unlock}. 1252 * 1253 * @param arg the release argument. This value is conveyed to 1254 * {@link #tryRelease} but is otherwise uninterpreted and 1255 * can represent anything you like. 1256 * @return the value returned from {@link #tryRelease} 1257 */ 1258 /// OPENJDK-9 @ReservedStackAccess release(int arg)1259 public final boolean release(int arg) { 1260 if (tryRelease(arg)) { 1261 Node h = head; 1262 if (h != null && h.waitStatus != 0) 1263 unparkSuccessor(h); 1264 return true; 1265 } 1266 return false; 1267 } 1268 1269 /** 1270 * Acquires in shared mode, ignoring interrupts. Implemented by 1271 * first invoking at least once {@link #tryAcquireShared}, 1272 * returning on success. Otherwise the thread is queued, possibly 1273 * repeatedly blocking and unblocking, invoking {@link 1274 * #tryAcquireShared} until success. 1275 * 1276 * @param arg the acquire argument. This value is conveyed to 1277 * {@link #tryAcquireShared} but is otherwise uninterpreted 1278 * and can represent anything you like. 1279 */ acquireShared(int arg)1280 public final void acquireShared(int arg) { 1281 if (tryAcquireShared(arg) < 0) 1282 doAcquireShared(arg); 1283 } 1284 1285 /** 1286 * Acquires in shared mode, aborting if interrupted. Implemented 1287 * by first checking interrupt status, then invoking at least once 1288 * {@link #tryAcquireShared}, returning on success. Otherwise the 1289 * thread is queued, possibly repeatedly blocking and unblocking, 1290 * invoking {@link #tryAcquireShared} until success or the thread 1291 * is interrupted. 1292 * @param arg the acquire argument. 1293 * This value is conveyed to {@link #tryAcquireShared} but is 1294 * otherwise uninterpreted and can represent anything 1295 * you like. 1296 * @throws InterruptedException if the current thread is interrupted 1297 */ acquireSharedInterruptibly(int arg)1298 public final void acquireSharedInterruptibly(int arg) 1299 throws InterruptedException { 1300 if (Thread.interrupted()) 1301 throw new InterruptedException(); 1302 if (tryAcquireShared(arg) < 0) 1303 doAcquireSharedInterruptibly(arg); 1304 } 1305 1306 /** 1307 * Attempts to acquire in shared mode, aborting if interrupted, and 1308 * failing if the given timeout elapses. Implemented by first 1309 * checking interrupt status, then invoking at least once {@link 1310 * #tryAcquireShared}, returning on success. Otherwise, the 1311 * thread is queued, possibly repeatedly blocking and unblocking, 1312 * invoking {@link #tryAcquireShared} until success or the thread 1313 * is interrupted or the timeout elapses. 1314 * 1315 * @param arg the acquire argument. This value is conveyed to 1316 * {@link #tryAcquireShared} but is otherwise uninterpreted 1317 * and can represent anything you like. 1318 * @param nanosTimeout the maximum number of nanoseconds to wait 1319 * @return {@code true} if acquired; {@code false} if timed out 1320 * @throws InterruptedException if the current thread is interrupted 1321 */ tryAcquireSharedNanos(int arg, long nanosTimeout)1322 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) 1323 throws InterruptedException { 1324 if (Thread.interrupted()) 1325 throw new InterruptedException(); 1326 return tryAcquireShared(arg) >= 0 || 1327 doAcquireSharedNanos(arg, nanosTimeout); 1328 } 1329 1330 /** 1331 * Releases in shared mode. Implemented by unblocking one or more 1332 * threads if {@link #tryReleaseShared} returns true. 1333 * 1334 * @param arg the release argument. This value is conveyed to 1335 * {@link #tryReleaseShared} but is otherwise uninterpreted 1336 * and can represent anything you like. 1337 * @return the value returned from {@link #tryReleaseShared} 1338 */ 1339 /// OPENJDK-9 @ReservedStackAccess releaseShared(int arg)1340 public final boolean releaseShared(int arg) { 1341 if (tryReleaseShared(arg)) { 1342 doReleaseShared(); 1343 return true; 1344 } 1345 return false; 1346 } 1347 1348 // Queue inspection methods 1349 1350 /** 1351 * Queries whether any threads are waiting to acquire. Note that 1352 * because cancellations due to interrupts and timeouts may occur 1353 * at any time, a {@code true} return does not guarantee that any 1354 * other thread will ever acquire. 1355 * 1356 * <p>In this implementation, this operation returns in 1357 * constant time. 1358 * 1359 * @return {@code true} if there may be other threads waiting to acquire 1360 */ hasQueuedThreads()1361 public final boolean hasQueuedThreads() { 1362 return head != tail; 1363 } 1364 1365 /** 1366 * Queries whether any threads have ever contended to acquire this 1367 * synchronizer; that is, if an acquire method has ever blocked. 1368 * 1369 * <p>In this implementation, this operation returns in 1370 * constant time. 1371 * 1372 * @return {@code true} if there has ever been contention 1373 */ hasContended()1374 public final boolean hasContended() { 1375 return head != null; 1376 } 1377 1378 /** 1379 * Returns the first (longest-waiting) thread in the queue, or 1380 * {@code null} if no threads are currently queued. 1381 * 1382 * <p>In this implementation, this operation normally returns in 1383 * constant time, but may iterate upon contention if other threads are 1384 * concurrently modifying the queue. 1385 * 1386 * @return the first (longest-waiting) thread in the queue, or 1387 * {@code null} if no threads are currently queued 1388 */ getFirstQueuedThread()1389 public final Thread getFirstQueuedThread() { 1390 // handle only fast path, else relay 1391 return (head == tail) ? null : fullGetFirstQueuedThread(); 1392 } 1393 1394 /** 1395 * Version of getFirstQueuedThread called when fastpath fails. 1396 */ fullGetFirstQueuedThread()1397 private Thread fullGetFirstQueuedThread() { 1398 /* 1399 * The first node is normally head.next. Try to get its 1400 * thread field, ensuring consistent reads: If thread 1401 * field is nulled out or s.prev is no longer head, then 1402 * some other thread(s) concurrently performed setHead in 1403 * between some of our reads. We try this twice before 1404 * resorting to traversal. 1405 */ 1406 Node h, s; 1407 Thread st; 1408 if (((h = head) != null && (s = h.next) != null && 1409 s.prev == head && (st = s.thread) != null) || 1410 ((h = head) != null && (s = h.next) != null && 1411 s.prev == head && (st = s.thread) != null)) 1412 return st; 1413 1414 /* 1415 * Head's next field might not have been set yet, or may have 1416 * been unset after setHead. So we must check to see if tail 1417 * is actually first node. If not, we continue on, safely 1418 * traversing from tail back to head to find first, 1419 * guaranteeing termination. 1420 */ 1421 1422 Thread firstThread = null; 1423 for (Node p = tail; p != null && p != head; p = p.prev) { 1424 Thread t = p.thread; 1425 if (t != null) 1426 firstThread = t; 1427 } 1428 return firstThread; 1429 } 1430 1431 /** 1432 * Returns true if the given thread is currently queued. 1433 * 1434 * <p>This implementation traverses the queue to determine 1435 * presence of the given thread. 1436 * 1437 * @param thread the thread 1438 * @return {@code true} if the given thread is on the queue 1439 * @throws NullPointerException if the thread is null 1440 */ isQueued(Thread thread)1441 public final boolean isQueued(Thread thread) { 1442 if (thread == null) 1443 throw new NullPointerException(); 1444 for (Node p = tail; p != null; p = p.prev) 1445 if (p.thread == thread) 1446 return true; 1447 return false; 1448 } 1449 1450 /** 1451 * Returns {@code true} if the apparent first queued thread, if one 1452 * exists, is waiting in exclusive mode. If this method returns 1453 * {@code true}, and the current thread is attempting to acquire in 1454 * shared mode (that is, this method is invoked from {@link 1455 * #tryAcquireShared}) then it is guaranteed that the current thread 1456 * is not the first queued thread. Used only as a heuristic in 1457 * ReentrantReadWriteLock. 1458 */ apparentlyFirstQueuedIsExclusive()1459 final boolean apparentlyFirstQueuedIsExclusive() { 1460 Node h, s; 1461 return (h = head) != null && 1462 (s = h.next) != null && 1463 !s.isShared() && 1464 s.thread != null; 1465 } 1466 1467 /** 1468 * Queries whether any threads have been waiting to acquire longer 1469 * than the current thread. 1470 * 1471 * <p>An invocation of this method is equivalent to (but may be 1472 * more efficient than): 1473 * <pre> {@code 1474 * getFirstQueuedThread() != Thread.currentThread() 1475 * && hasQueuedThreads()}</pre> 1476 * 1477 * <p>Note that because cancellations due to interrupts and 1478 * timeouts may occur at any time, a {@code true} return does not 1479 * guarantee that some other thread will acquire before the current 1480 * thread. Likewise, it is possible for another thread to win a 1481 * race to enqueue after this method has returned {@code false}, 1482 * due to the queue being empty. 1483 * 1484 * <p>This method is designed to be used by a fair synchronizer to 1485 * avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>. 1486 * Such a synchronizer's {@link #tryAcquire} method should return 1487 * {@code false}, and its {@link #tryAcquireShared} method should 1488 * return a negative value, if this method returns {@code true} 1489 * (unless this is a reentrant acquire). For example, the {@code 1490 * tryAcquire} method for a fair, reentrant, exclusive mode 1491 * synchronizer might look like this: 1492 * 1493 * <pre> {@code 1494 * protected boolean tryAcquire(int arg) { 1495 * if (isHeldExclusively()) { 1496 * // A reentrant acquire; increment hold count 1497 * return true; 1498 * } else if (hasQueuedPredecessors()) { 1499 * return false; 1500 * } else { 1501 * // try to acquire normally 1502 * } 1503 * }}</pre> 1504 * 1505 * @return {@code true} if there is a queued thread preceding the 1506 * current thread, and {@code false} if the current thread 1507 * is at the head of the queue or the queue is empty 1508 * @since 1.7 1509 */ hasQueuedPredecessors()1510 public final boolean hasQueuedPredecessors() { 1511 // The correctness of this depends on head being initialized 1512 // before tail and on head.next being accurate if the current 1513 // thread is first in queue. 1514 Node t = tail; // Read fields in reverse initialization order 1515 Node h = head; 1516 Node s; 1517 return h != t && 1518 ((s = h.next) == null || s.thread != Thread.currentThread()); 1519 } 1520 1521 1522 // Instrumentation and monitoring methods 1523 1524 /** 1525 * Returns an estimate of the number of threads waiting to 1526 * acquire. The value is only an estimate because the number of 1527 * threads may change dynamically while this method traverses 1528 * internal data structures. This method is designed for use in 1529 * monitoring system state, not for synchronization control. 1530 * 1531 * @return the estimated number of threads waiting to acquire 1532 */ getQueueLength()1533 public final int getQueueLength() { 1534 int n = 0; 1535 for (Node p = tail; p != null; p = p.prev) { 1536 if (p.thread != null) 1537 ++n; 1538 } 1539 return n; 1540 } 1541 1542 /** 1543 * Returns a collection containing threads that may be waiting to 1544 * acquire. Because the actual set of threads may change 1545 * dynamically while constructing this result, the returned 1546 * collection is only a best-effort estimate. The elements of the 1547 * returned collection are in no particular order. This method is 1548 * designed to facilitate construction of subclasses that provide 1549 * more extensive monitoring facilities. 1550 * 1551 * @return the collection of threads 1552 */ getQueuedThreads()1553 public final Collection<Thread> getQueuedThreads() { 1554 ArrayList<Thread> list = new ArrayList<>(); 1555 for (Node p = tail; p != null; p = p.prev) { 1556 Thread t = p.thread; 1557 if (t != null) 1558 list.add(t); 1559 } 1560 return list; 1561 } 1562 1563 /** 1564 * Returns a collection containing threads that may be waiting to 1565 * acquire in exclusive mode. This has the same properties 1566 * as {@link #getQueuedThreads} except that it only returns 1567 * those threads waiting due to an exclusive acquire. 1568 * 1569 * @return the collection of threads 1570 */ getExclusiveQueuedThreads()1571 public final Collection<Thread> getExclusiveQueuedThreads() { 1572 ArrayList<Thread> list = new ArrayList<>(); 1573 for (Node p = tail; p != null; p = p.prev) { 1574 if (!p.isShared()) { 1575 Thread t = p.thread; 1576 if (t != null) 1577 list.add(t); 1578 } 1579 } 1580 return list; 1581 } 1582 1583 /** 1584 * Returns a collection containing threads that may be waiting to 1585 * acquire in shared mode. This has the same properties 1586 * as {@link #getQueuedThreads} except that it only returns 1587 * those threads waiting due to a shared acquire. 1588 * 1589 * @return the collection of threads 1590 */ getSharedQueuedThreads()1591 public final Collection<Thread> getSharedQueuedThreads() { 1592 ArrayList<Thread> list = new ArrayList<>(); 1593 for (Node p = tail; p != null; p = p.prev) { 1594 if (p.isShared()) { 1595 Thread t = p.thread; 1596 if (t != null) 1597 list.add(t); 1598 } 1599 } 1600 return list; 1601 } 1602 1603 /** 1604 * Returns a string identifying this synchronizer, as well as its state. 1605 * The state, in brackets, includes the String {@code "State ="} 1606 * followed by the current value of {@link #getState}, and either 1607 * {@code "nonempty"} or {@code "empty"} depending on whether the 1608 * queue is empty. 1609 * 1610 * @return a string identifying this synchronizer, as well as its state 1611 */ toString()1612 public String toString() { 1613 return super.toString() 1614 + "[State = " + getState() + ", " 1615 + (hasQueuedThreads() ? "non" : "") + "empty queue]"; 1616 } 1617 1618 1619 // Internal support methods for Conditions 1620 1621 /** 1622 * Returns true if a node, always one that was initially placed on 1623 * a condition queue, is now waiting to reacquire on sync queue. 1624 * @param node the node 1625 * @return true if is reacquiring 1626 */ isOnSyncQueue(Node node)1627 final boolean isOnSyncQueue(Node node) { 1628 if (node.waitStatus == Node.CONDITION || node.prev == null) 1629 return false; 1630 if (node.next != null) // If has successor, it must be on queue 1631 return true; 1632 /* 1633 * node.prev can be non-null, but not yet on queue because 1634 * the CAS to place it on queue can fail. So we have to 1635 * traverse from tail to make sure it actually made it. It 1636 * will always be near the tail in calls to this method, and 1637 * unless the CAS failed (which is unlikely), it will be 1638 * there, so we hardly ever traverse much. 1639 */ 1640 return findNodeFromTail(node); 1641 } 1642 1643 /** 1644 * Returns true if node is on sync queue by searching backwards from tail. 1645 * Called only when needed by isOnSyncQueue. 1646 * @return true if present 1647 */ findNodeFromTail(Node node)1648 private boolean findNodeFromTail(Node node) { 1649 // We check for node first, since it's likely to be at or near tail. 1650 // tail is known to be non-null, so we could re-order to "save" 1651 // one null check, but we leave it this way to help the VM. 1652 for (Node p = tail;;) { 1653 if (p == node) 1654 return true; 1655 if (p == null) 1656 return false; 1657 p = p.prev; 1658 } 1659 } 1660 1661 /** 1662 * Transfers a node from a condition queue onto sync queue. 1663 * Returns true if successful. 1664 * @param node the node 1665 * @return true if successfully transferred (else the node was 1666 * cancelled before signal) 1667 */ transferForSignal(Node node)1668 final boolean transferForSignal(Node node) { 1669 /* 1670 * If cannot change waitStatus, the node has been cancelled. 1671 */ 1672 if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) 1673 return false; 1674 1675 /* 1676 * Splice onto queue and try to set waitStatus of predecessor to 1677 * indicate that thread is (probably) waiting. If cancelled or 1678 * attempt to set waitStatus fails, wake up to resync (in which 1679 * case the waitStatus can be transiently and harmlessly wrong). 1680 */ 1681 Node p = enq(node); 1682 int ws = p.waitStatus; 1683 if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) 1684 LockSupport.unpark(node.thread); 1685 return true; 1686 } 1687 1688 /** 1689 * Transfers node, if necessary, to sync queue after a cancelled wait. 1690 * Returns true if thread was cancelled before being signalled. 1691 * 1692 * @param node the node 1693 * @return true if cancelled before the node was signalled 1694 */ transferAfterCancelledWait(Node node)1695 final boolean transferAfterCancelledWait(Node node) { 1696 if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) { 1697 enq(node); 1698 return true; 1699 } 1700 /* 1701 * If we lost out to a signal(), then we can't proceed 1702 * until it finishes its enq(). Cancelling during an 1703 * incomplete transfer is both rare and transient, so just 1704 * spin. 1705 */ 1706 while (!isOnSyncQueue(node)) 1707 Thread.yield(); 1708 return false; 1709 } 1710 1711 /** 1712 * Invokes release with current state value; returns saved state. 1713 * Cancels node and throws exception on failure. 1714 * @param node the condition node for this wait 1715 * @return previous sync state 1716 */ fullyRelease(Node node)1717 final int fullyRelease(Node node) { 1718 try { 1719 int savedState = getState(); 1720 if (release(savedState)) 1721 return savedState; 1722 throw new IllegalMonitorStateException(); 1723 } catch (Throwable t) { 1724 node.waitStatus = Node.CANCELLED; 1725 throw t; 1726 } 1727 } 1728 1729 // Instrumentation methods for conditions 1730 1731 /** 1732 * Queries whether the given ConditionObject 1733 * uses this synchronizer as its lock. 1734 * 1735 * @param condition the condition 1736 * @return {@code true} if owned 1737 * @throws NullPointerException if the condition is null 1738 */ owns(ConditionObject condition)1739 public final boolean owns(ConditionObject condition) { 1740 return condition.isOwnedBy(this); 1741 } 1742 1743 /** 1744 * Queries whether any threads are waiting on the given condition 1745 * associated with this synchronizer. Note that because timeouts 1746 * and interrupts may occur at any time, a {@code true} return 1747 * does not guarantee that a future {@code signal} will awaken 1748 * any threads. This method is designed primarily for use in 1749 * monitoring of the system state. 1750 * 1751 * @param condition the condition 1752 * @return {@code true} if there are any waiting threads 1753 * @throws IllegalMonitorStateException if exclusive synchronization 1754 * is not held 1755 * @throws IllegalArgumentException if the given condition is 1756 * not associated with this synchronizer 1757 * @throws NullPointerException if the condition is null 1758 */ hasWaiters(ConditionObject condition)1759 public final boolean hasWaiters(ConditionObject condition) { 1760 if (!owns(condition)) 1761 throw new IllegalArgumentException("Not owner"); 1762 return condition.hasWaiters(); 1763 } 1764 1765 /** 1766 * Returns an estimate of the number of threads waiting on the 1767 * given condition associated with this synchronizer. Note that 1768 * because timeouts and interrupts may occur at any time, the 1769 * estimate serves only as an upper bound on the actual number of 1770 * waiters. This method is designed for use in monitoring system 1771 * state, not for synchronization control. 1772 * 1773 * @param condition the condition 1774 * @return the estimated number of waiting threads 1775 * @throws IllegalMonitorStateException if exclusive synchronization 1776 * is not held 1777 * @throws IllegalArgumentException if the given condition is 1778 * not associated with this synchronizer 1779 * @throws NullPointerException if the condition is null 1780 */ getWaitQueueLength(ConditionObject condition)1781 public final int getWaitQueueLength(ConditionObject condition) { 1782 if (!owns(condition)) 1783 throw new IllegalArgumentException("Not owner"); 1784 return condition.getWaitQueueLength(); 1785 } 1786 1787 /** 1788 * Returns a collection containing those threads that may be 1789 * waiting on the given condition associated with this 1790 * synchronizer. Because the actual set of threads may change 1791 * dynamically while constructing this result, the returned 1792 * collection is only a best-effort estimate. The elements of the 1793 * returned collection are in no particular order. 1794 * 1795 * @param condition the condition 1796 * @return the collection of threads 1797 * @throws IllegalMonitorStateException if exclusive synchronization 1798 * is not held 1799 * @throws IllegalArgumentException if the given condition is 1800 * not associated with this synchronizer 1801 * @throws NullPointerException if the condition is null 1802 */ getWaitingThreads(ConditionObject condition)1803 public final Collection<Thread> getWaitingThreads(ConditionObject condition) { 1804 if (!owns(condition)) 1805 throw new IllegalArgumentException("Not owner"); 1806 return condition.getWaitingThreads(); 1807 } 1808 1809 /** 1810 * Condition implementation for a {@link 1811 * AbstractQueuedSynchronizer} serving as the basis of a {@link 1812 * Lock} implementation. 1813 * 1814 * <p>Method documentation for this class describes mechanics, 1815 * not behavioral specifications from the point of view of Lock 1816 * and Condition users. Exported versions of this class will in 1817 * general need to be accompanied by documentation describing 1818 * condition semantics that rely on those of the associated 1819 * {@code AbstractQueuedSynchronizer}. 1820 * 1821 * <p>This class is Serializable, but all fields are transient, 1822 * so deserialized conditions have no waiters. 1823 */ 1824 public class ConditionObject implements Condition, java.io.Serializable { 1825 private static final long serialVersionUID = 1173984872572414699L; 1826 /** First node of condition queue. */ 1827 private transient Node firstWaiter; 1828 /** Last node of condition queue. */ 1829 private transient Node lastWaiter; 1830 1831 /** 1832 * Creates a new {@code ConditionObject} instance. 1833 */ ConditionObject()1834 public ConditionObject() { } 1835 1836 // Internal methods 1837 1838 /** 1839 * Adds a new waiter to wait queue. 1840 * @return its new wait node 1841 */ addConditionWaiter()1842 private Node addConditionWaiter() { 1843 Node t = lastWaiter; 1844 // If lastWaiter is cancelled, clean out. 1845 if (t != null && t.waitStatus != Node.CONDITION) { 1846 unlinkCancelledWaiters(); 1847 t = lastWaiter; 1848 } 1849 1850 Node node = new Node(Node.CONDITION); 1851 1852 if (t == null) 1853 firstWaiter = node; 1854 else 1855 t.nextWaiter = node; 1856 lastWaiter = node; 1857 return node; 1858 } 1859 1860 /** 1861 * Removes and transfers nodes until hit non-cancelled one or 1862 * null. Split out from signal in part to encourage compilers 1863 * to inline the case of no waiters. 1864 * @param first (non-null) the first node on condition queue 1865 */ doSignal(Node first)1866 private void doSignal(Node first) { 1867 do { 1868 if ( (firstWaiter = first.nextWaiter) == null) 1869 lastWaiter = null; 1870 first.nextWaiter = null; 1871 } while (!transferForSignal(first) && 1872 (first = firstWaiter) != null); 1873 } 1874 1875 /** 1876 * Removes and transfers all nodes. 1877 * @param first (non-null) the first node on condition queue 1878 */ doSignalAll(Node first)1879 private void doSignalAll(Node first) { 1880 lastWaiter = firstWaiter = null; 1881 do { 1882 Node next = first.nextWaiter; 1883 first.nextWaiter = null; 1884 transferForSignal(first); 1885 first = next; 1886 } while (first != null); 1887 } 1888 1889 /** 1890 * Unlinks cancelled waiter nodes from condition queue. 1891 * Called only while holding lock. This is called when 1892 * cancellation occurred during condition wait, and upon 1893 * insertion of a new waiter when lastWaiter is seen to have 1894 * been cancelled. This method is needed to avoid garbage 1895 * retention in the absence of signals. So even though it may 1896 * require a full traversal, it comes into play only when 1897 * timeouts or cancellations occur in the absence of 1898 * signals. It traverses all nodes rather than stopping at a 1899 * particular target to unlink all pointers to garbage nodes 1900 * without requiring many re-traversals during cancellation 1901 * storms. 1902 */ unlinkCancelledWaiters()1903 private void unlinkCancelledWaiters() { 1904 Node t = firstWaiter; 1905 Node trail = null; 1906 while (t != null) { 1907 Node next = t.nextWaiter; 1908 if (t.waitStatus != Node.CONDITION) { 1909 t.nextWaiter = null; 1910 if (trail == null) 1911 firstWaiter = next; 1912 else 1913 trail.nextWaiter = next; 1914 if (next == null) 1915 lastWaiter = trail; 1916 } 1917 else 1918 trail = t; 1919 t = next; 1920 } 1921 } 1922 1923 // public methods 1924 1925 /** 1926 * Moves the longest-waiting thread, if one exists, from the 1927 * wait queue for this condition to the wait queue for the 1928 * owning lock. 1929 * 1930 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1931 * returns {@code false} 1932 */ signal()1933 public final void signal() { 1934 if (!isHeldExclusively()) 1935 throw new IllegalMonitorStateException(); 1936 Node first = firstWaiter; 1937 if (first != null) 1938 doSignal(first); 1939 } 1940 1941 /** 1942 * Moves all threads from the wait queue for this condition to 1943 * the wait queue for the owning lock. 1944 * 1945 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1946 * returns {@code false} 1947 */ signalAll()1948 public final void signalAll() { 1949 if (!isHeldExclusively()) 1950 throw new IllegalMonitorStateException(); 1951 Node first = firstWaiter; 1952 if (first != null) 1953 doSignalAll(first); 1954 } 1955 1956 /** 1957 * Implements uninterruptible condition wait. 1958 * <ol> 1959 * <li>Save lock state returned by {@link #getState}. 1960 * <li>Invoke {@link #release} with saved state as argument, 1961 * throwing IllegalMonitorStateException if it fails. 1962 * <li>Block until signalled. 1963 * <li>Reacquire by invoking specialized version of 1964 * {@link #acquire} with saved state as argument. 1965 * </ol> 1966 */ awaitUninterruptibly()1967 public final void awaitUninterruptibly() { 1968 Node node = addConditionWaiter(); 1969 int savedState = fullyRelease(node); 1970 boolean interrupted = false; 1971 while (!isOnSyncQueue(node)) { 1972 LockSupport.park(this); 1973 if (Thread.interrupted()) 1974 interrupted = true; 1975 } 1976 if (acquireQueued(node, savedState) || interrupted) 1977 selfInterrupt(); 1978 } 1979 1980 /* 1981 * For interruptible waits, we need to track whether to throw 1982 * InterruptedException, if interrupted while blocked on 1983 * condition, versus reinterrupt current thread, if 1984 * interrupted while blocked waiting to re-acquire. 1985 */ 1986 1987 /** Mode meaning to reinterrupt on exit from wait */ 1988 private static final int REINTERRUPT = 1; 1989 /** Mode meaning to throw InterruptedException on exit from wait */ 1990 private static final int THROW_IE = -1; 1991 1992 /** 1993 * Checks for interrupt, returning THROW_IE if interrupted 1994 * before signalled, REINTERRUPT if after signalled, or 1995 * 0 if not interrupted. 1996 */ checkInterruptWhileWaiting(Node node)1997 private int checkInterruptWhileWaiting(Node node) { 1998 return Thread.interrupted() ? 1999 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 2000 0; 2001 } 2002 2003 /** 2004 * Throws InterruptedException, reinterrupts current thread, or 2005 * does nothing, depending on mode. 2006 */ reportInterruptAfterWait(int interruptMode)2007 private void reportInterruptAfterWait(int interruptMode) 2008 throws InterruptedException { 2009 if (interruptMode == THROW_IE) 2010 throw new InterruptedException(); 2011 else if (interruptMode == REINTERRUPT) 2012 selfInterrupt(); 2013 } 2014 2015 /** 2016 * Implements interruptible condition wait. 2017 * <ol> 2018 * <li>If current thread is interrupted, throw InterruptedException. 2019 * <li>Save lock state returned by {@link #getState}. 2020 * <li>Invoke {@link #release} with saved state as argument, 2021 * throwing IllegalMonitorStateException if it fails. 2022 * <li>Block until signalled or interrupted. 2023 * <li>Reacquire by invoking specialized version of 2024 * {@link #acquire} with saved state as argument. 2025 * <li>If interrupted while blocked in step 4, throw InterruptedException. 2026 * </ol> 2027 */ await()2028 public final void await() throws InterruptedException { 2029 if (Thread.interrupted()) 2030 throw new InterruptedException(); 2031 Node node = addConditionWaiter(); 2032 int savedState = fullyRelease(node); 2033 int interruptMode = 0; 2034 while (!isOnSyncQueue(node)) { 2035 LockSupport.park(this); 2036 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 2037 break; 2038 } 2039 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 2040 interruptMode = REINTERRUPT; 2041 if (node.nextWaiter != null) // clean up if cancelled 2042 unlinkCancelledWaiters(); 2043 if (interruptMode != 0) 2044 reportInterruptAfterWait(interruptMode); 2045 } 2046 2047 /** 2048 * Implements timed condition wait. 2049 * <ol> 2050 * <li>If current thread is interrupted, throw InterruptedException. 2051 * <li>Save lock state returned by {@link #getState}. 2052 * <li>Invoke {@link #release} with saved state as argument, 2053 * throwing IllegalMonitorStateException if it fails. 2054 * <li>Block until signalled, interrupted, or timed out. 2055 * <li>Reacquire by invoking specialized version of 2056 * {@link #acquire} with saved state as argument. 2057 * <li>If interrupted while blocked in step 4, throw InterruptedException. 2058 * </ol> 2059 */ awaitNanos(long nanosTimeout)2060 public final long awaitNanos(long nanosTimeout) 2061 throws InterruptedException { 2062 if (Thread.interrupted()) 2063 throw new InterruptedException(); 2064 // We don't check for nanosTimeout <= 0L here, to allow 2065 // awaitNanos(0) as a way to "yield the lock". 2066 final long deadline = System.nanoTime() + nanosTimeout; 2067 long initialNanos = nanosTimeout; 2068 Node node = addConditionWaiter(); 2069 int savedState = fullyRelease(node); 2070 int interruptMode = 0; 2071 while (!isOnSyncQueue(node)) { 2072 if (nanosTimeout <= 0L) { 2073 transferAfterCancelledWait(node); 2074 break; 2075 } 2076 if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) 2077 LockSupport.parkNanos(this, nanosTimeout); 2078 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 2079 break; 2080 nanosTimeout = deadline - System.nanoTime(); 2081 } 2082 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 2083 interruptMode = REINTERRUPT; 2084 if (node.nextWaiter != null) 2085 unlinkCancelledWaiters(); 2086 if (interruptMode != 0) 2087 reportInterruptAfterWait(interruptMode); 2088 long remaining = deadline - System.nanoTime(); // avoid overflow 2089 return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE; 2090 } 2091 2092 /** 2093 * Implements absolute timed condition wait. 2094 * <ol> 2095 * <li>If current thread is interrupted, throw InterruptedException. 2096 * <li>Save lock state returned by {@link #getState}. 2097 * <li>Invoke {@link #release} with saved state as argument, 2098 * throwing IllegalMonitorStateException if it fails. 2099 * <li>Block until signalled, interrupted, or timed out. 2100 * <li>Reacquire by invoking specialized version of 2101 * {@link #acquire} with saved state as argument. 2102 * <li>If interrupted while blocked in step 4, throw InterruptedException. 2103 * <li>If timed out while blocked in step 4, return false, else true. 2104 * </ol> 2105 */ awaitUntil(Date deadline)2106 public final boolean awaitUntil(Date deadline) 2107 throws InterruptedException { 2108 long abstime = deadline.getTime(); 2109 if (Thread.interrupted()) 2110 throw new InterruptedException(); 2111 Node node = addConditionWaiter(); 2112 int savedState = fullyRelease(node); 2113 boolean timedout = false; 2114 int interruptMode = 0; 2115 while (!isOnSyncQueue(node)) { 2116 if (System.currentTimeMillis() >= abstime) { 2117 timedout = transferAfterCancelledWait(node); 2118 break; 2119 } 2120 LockSupport.parkUntil(this, abstime); 2121 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 2122 break; 2123 } 2124 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 2125 interruptMode = REINTERRUPT; 2126 if (node.nextWaiter != null) 2127 unlinkCancelledWaiters(); 2128 if (interruptMode != 0) 2129 reportInterruptAfterWait(interruptMode); 2130 return !timedout; 2131 } 2132 2133 /** 2134 * Implements timed condition wait. 2135 * <ol> 2136 * <li>If current thread is interrupted, throw InterruptedException. 2137 * <li>Save lock state returned by {@link #getState}. 2138 * <li>Invoke {@link #release} with saved state as argument, 2139 * throwing IllegalMonitorStateException if it fails. 2140 * <li>Block until signalled, interrupted, or timed out. 2141 * <li>Reacquire by invoking specialized version of 2142 * {@link #acquire} with saved state as argument. 2143 * <li>If interrupted while blocked in step 4, throw InterruptedException. 2144 * <li>If timed out while blocked in step 4, return false, else true. 2145 * </ol> 2146 */ await(long time, TimeUnit unit)2147 public final boolean await(long time, TimeUnit unit) 2148 throws InterruptedException { 2149 long nanosTimeout = unit.toNanos(time); 2150 if (Thread.interrupted()) 2151 throw new InterruptedException(); 2152 // We don't check for nanosTimeout <= 0L here, to allow 2153 // await(0, unit) as a way to "yield the lock". 2154 final long deadline = System.nanoTime() + nanosTimeout; 2155 Node node = addConditionWaiter(); 2156 int savedState = fullyRelease(node); 2157 boolean timedout = false; 2158 int interruptMode = 0; 2159 while (!isOnSyncQueue(node)) { 2160 if (nanosTimeout <= 0L) { 2161 timedout = transferAfterCancelledWait(node); 2162 break; 2163 } 2164 if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) 2165 LockSupport.parkNanos(this, nanosTimeout); 2166 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 2167 break; 2168 nanosTimeout = deadline - System.nanoTime(); 2169 } 2170 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 2171 interruptMode = REINTERRUPT; 2172 if (node.nextWaiter != null) 2173 unlinkCancelledWaiters(); 2174 if (interruptMode != 0) 2175 reportInterruptAfterWait(interruptMode); 2176 return !timedout; 2177 } 2178 2179 // support for instrumentation 2180 2181 /** 2182 * Returns true if this condition was created by the given 2183 * synchronization object. 2184 * 2185 * @return {@code true} if owned 2186 */ isOwnedBy(AbstractQueuedSynchronizer sync)2187 final boolean isOwnedBy(AbstractQueuedSynchronizer sync) { 2188 return sync == AbstractQueuedSynchronizer.this; 2189 } 2190 2191 /** 2192 * Queries whether any threads are waiting on this condition. 2193 * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}. 2194 * 2195 * @return {@code true} if there are any waiting threads 2196 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 2197 * returns {@code false} 2198 */ hasWaiters()2199 protected final boolean hasWaiters() { 2200 if (!isHeldExclusively()) 2201 throw new IllegalMonitorStateException(); 2202 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 2203 if (w.waitStatus == Node.CONDITION) 2204 return true; 2205 } 2206 return false; 2207 } 2208 2209 /** 2210 * Returns an estimate of the number of threads waiting on 2211 * this condition. 2212 * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}. 2213 * 2214 * @return the estimated number of waiting threads 2215 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 2216 * returns {@code false} 2217 */ getWaitQueueLength()2218 protected final int getWaitQueueLength() { 2219 if (!isHeldExclusively()) 2220 throw new IllegalMonitorStateException(); 2221 int n = 0; 2222 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 2223 if (w.waitStatus == Node.CONDITION) 2224 ++n; 2225 } 2226 return n; 2227 } 2228 2229 /** 2230 * Returns a collection containing those threads that may be 2231 * waiting on this Condition. 2232 * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}. 2233 * 2234 * @return the collection of threads 2235 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 2236 * returns {@code false} 2237 */ getWaitingThreads()2238 protected final Collection<Thread> getWaitingThreads() { 2239 if (!isHeldExclusively()) 2240 throw new IllegalMonitorStateException(); 2241 ArrayList<Thread> list = new ArrayList<>(); 2242 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 2243 if (w.waitStatus == Node.CONDITION) { 2244 Thread t = w.thread; 2245 if (t != null) 2246 list.add(t); 2247 } 2248 } 2249 return list; 2250 } 2251 } 2252 2253 /** 2254 * Setup to support compareAndSet. We need to natively implement 2255 * this here: For the sake of permitting future enhancements, we 2256 * cannot explicitly subclass AtomicInteger, which would be 2257 * efficient and useful otherwise. So, as the lesser of evils, we 2258 * natively implement using hotspot intrinsics API. And while we 2259 * are at it, we do the same for other CASable fields (which could 2260 * otherwise be done with atomic field updaters). 2261 */ 2262 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); 2263 private static final long STATE; 2264 private static final long HEAD; 2265 private static final long TAIL; 2266 2267 static { 2268 try { 2269 STATE = U.objectFieldOffset 2270 (AbstractQueuedSynchronizer.class.getDeclaredField("state")); 2271 HEAD = U.objectFieldOffset 2272 (AbstractQueuedSynchronizer.class.getDeclaredField("head")); 2273 TAIL = U.objectFieldOffset 2274 (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); 2275 } catch (ReflectiveOperationException e) { 2276 throw new Error(e); 2277 } 2278 2279 // Reduce the risk of rare disastrous classloading in first call to 2280 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 2281 Class<?> ensureLoaded = LockSupport.class; 2282 } 2283 2284 /** 2285 * Initializes head and tail fields on first contention. 2286 */ initializeSyncQueue()2287 private final void initializeSyncQueue() { 2288 Node h; 2289 if (U.compareAndSwapObject(this, HEAD, null, (h = new Node()))) 2290 tail = h; 2291 } 2292 2293 /** 2294 * CASes tail field. 2295 */ compareAndSetTail(Node expect, Node update)2296 private final boolean compareAndSetTail(Node expect, Node update) { 2297 return U.compareAndSwapObject(this, TAIL, expect, update); 2298 } 2299 } 2300