1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent.locks; 37 38 import java.util.ArrayList; 39 import java.util.Collection; 40 import java.util.Date; 41 import java.util.concurrent.TimeUnit; 42 import java.util.concurrent.ForkJoinPool; 43 import java.util.concurrent.RejectedExecutionException; 44 import jdk.internal.misc.Unsafe; 45 46 /** 47 * Provides a framework for implementing blocking locks and related 48 * synchronizers (semaphores, events, etc) that rely on 49 * first-in-first-out (FIFO) wait queues. This class is designed to 50 * be a useful basis for most kinds of synchronizers that rely on a 51 * single atomic {@code int} value to represent state. Subclasses 52 * must define the protected methods that change this state, and which 53 * define what that state means in terms of this object being acquired 54 * or released. Given these, the other methods in this class carry 55 * out all queuing and blocking mechanics. Subclasses can maintain 56 * other state fields, but only the atomically updated {@code int} 57 * value manipulated using methods {@link #getState}, {@link 58 * #setState} and {@link #compareAndSetState} is tracked with respect 59 * to synchronization. 60 * 61 * <p>Subclasses should be defined as non-public internal helper 62 * classes that are used to implement the synchronization properties 63 * of their enclosing class. Class 64 * {@code AbstractQueuedSynchronizer} does not implement any 65 * synchronization interface. Instead it defines methods such as 66 * {@link #acquireInterruptibly} that can be invoked as 67 * appropriate by concrete locks and related synchronizers to 68 * implement their public methods. 69 * 70 * <p>This class supports either or both a default <em>exclusive</em> 71 * mode and a <em>shared</em> mode. When acquired in exclusive mode, 72 * attempted acquires by other threads cannot succeed. Shared mode 73 * acquires by multiple threads may (but need not) succeed. This class 74 * does not "understand" these differences except in the 75 * mechanical sense that when a shared mode acquire succeeds, the next 76 * waiting thread (if one exists) must also determine whether it can 77 * acquire as well. Threads waiting in the different modes share the 78 * same FIFO queue. Usually, implementation subclasses support only 79 * one of these modes, but both can come into play for example in a 80 * {@link ReadWriteLock}. Subclasses that support only exclusive or 81 * only shared modes need not define the methods supporting the unused mode. 82 * 83 * <p>This class defines a nested {@link ConditionObject} class that 84 * can be used as a {@link Condition} implementation by subclasses 85 * supporting exclusive mode for which method {@link 86 * #isHeldExclusively} reports whether synchronization is exclusively 87 * held with respect to the current thread, method {@link #release} 88 * invoked with the current {@link #getState} value fully releases 89 * this object, and {@link #acquire}, given this saved state value, 90 * eventually restores this object to its previous acquired state. No 91 * {@code AbstractQueuedSynchronizer} method otherwise creates such a 92 * condition, so if this constraint cannot be met, do not use it. The 93 * behavior of {@link ConditionObject} depends of course on the 94 * semantics of its synchronizer implementation. 95 * 96 * <p>This class provides inspection, instrumentation, and monitoring 97 * methods for the internal queue, as well as similar methods for 98 * condition objects. These can be exported as desired into classes 99 * using an {@code AbstractQueuedSynchronizer} for their 100 * synchronization mechanics. 101 * 102 * <p>Serialization of this class stores only the underlying atomic 103 * integer maintaining state, so deserialized objects have empty 104 * thread queues. Typical subclasses requiring serializability will 105 * define a {@code readObject} method that restores this to a known 106 * initial state upon deserialization. 107 * 108 * <h2>Usage</h2> 109 * 110 * <p>To use this class as the basis of a synchronizer, redefine the 111 * following methods, as applicable, by inspecting and/or modifying 112 * the synchronization state using {@link #getState}, {@link 113 * #setState} and/or {@link #compareAndSetState}: 114 * 115 * <ul> 116 * <li>{@link #tryAcquire} 117 * <li>{@link #tryRelease} 118 * <li>{@link #tryAcquireShared} 119 * <li>{@link #tryReleaseShared} 120 * <li>{@link #isHeldExclusively} 121 * </ul> 122 * 123 * Each of these methods by default throws {@link 124 * UnsupportedOperationException}. Implementations of these methods 125 * must be internally thread-safe, and should in general be short and 126 * not block. Defining these methods is the <em>only</em> supported 127 * means of using this class. All other methods are declared 128 * {@code final} because they cannot be independently varied. 129 * 130 * <p>You may also find the inherited methods from {@link 131 * AbstractOwnableSynchronizer} useful to keep track of the thread 132 * owning an exclusive synchronizer. You are encouraged to use them 133 * -- this enables monitoring and diagnostic tools to assist users in 134 * determining which threads hold locks. 135 * 136 * <p>Even though this class is based on an internal FIFO queue, it 137 * does not automatically enforce FIFO acquisition policies. The core 138 * of exclusive synchronization takes the form: 139 * 140 * <pre> 141 * <em>Acquire:</em> 142 * while (!tryAcquire(arg)) { 143 * <em>enqueue thread if it is not already queued</em>; 144 * <em>possibly block current thread</em>; 145 * } 146 * 147 * <em>Release:</em> 148 * if (tryRelease(arg)) 149 * <em>unblock the first queued thread</em>; 150 * </pre> 151 * 152 * (Shared mode is similar but may involve cascading signals.) 153 * 154 * <p id="barging">Because checks in acquire are invoked before 155 * enqueuing, a newly acquiring thread may <em>barge</em> ahead of 156 * others that are blocked and queued. However, you can, if desired, 157 * define {@code tryAcquire} and/or {@code tryAcquireShared} to 158 * disable barging by internally invoking one or more of the inspection 159 * methods, thereby providing a <em>fair</em> FIFO acquisition order. 160 * In particular, most fair synchronizers can define {@code tryAcquire} 161 * to return {@code false} if {@link #hasQueuedPredecessors} (a method 162 * specifically designed to be used by fair synchronizers) returns 163 * {@code true}. Other variations are possible. 164 * 165 * <p>Throughput and scalability are generally highest for the 166 * default barging (also known as <em>greedy</em>, 167 * <em>renouncement</em>, and <em>convoy-avoidance</em>) strategy. 168 * While this is not guaranteed to be fair or starvation-free, earlier 169 * queued threads are allowed to recontend before later queued 170 * threads, and each recontention has an unbiased chance to succeed 171 * against incoming threads. Also, while acquires do not 172 * "spin" in the usual sense, they may perform multiple 173 * invocations of {@code tryAcquire} interspersed with other 174 * computations before blocking. This gives most of the benefits of 175 * spins when exclusive synchronization is only briefly held, without 176 * most of the liabilities when it isn't. If so desired, you can 177 * augment this by preceding calls to acquire methods with 178 * "fast-path" checks, possibly prechecking {@link #hasContended} 179 * and/or {@link #hasQueuedThreads} to only do so if the synchronizer 180 * is likely not to be contended. 181 * 182 * <p>This class provides an efficient and scalable basis for 183 * synchronization in part by specializing its range of use to 184 * synchronizers that can rely on {@code int} state, acquire, and 185 * release parameters, and an internal FIFO wait queue. When this does 186 * not suffice, you can build synchronizers from a lower level using 187 * {@link java.util.concurrent.atomic atomic} classes, your own custom 188 * {@link java.util.Queue} classes, and {@link LockSupport} blocking 189 * support. 190 * 191 * <h2>Usage Examples</h2> 192 * 193 * <p>Here is a non-reentrant mutual exclusion lock class that uses 194 * the value zero to represent the unlocked state, and one to 195 * represent the locked state. While a non-reentrant lock 196 * does not strictly require recording of the current owner 197 * thread, this class does so anyway to make usage easier to monitor. 198 * It also supports conditions and exposes some instrumentation methods: 199 * 200 * <pre> {@code 201 * class Mutex implements Lock, java.io.Serializable { 202 * 203 * // Our internal helper class 204 * private static class Sync extends AbstractQueuedSynchronizer { 205 * // Acquires the lock if state is zero 206 * public boolean tryAcquire(int acquires) { 207 * assert acquires == 1; // Otherwise unused 208 * if (compareAndSetState(0, 1)) { 209 * setExclusiveOwnerThread(Thread.currentThread()); 210 * return true; 211 * } 212 * return false; 213 * } 214 * 215 * // Releases the lock by setting state to zero 216 * protected boolean tryRelease(int releases) { 217 * assert releases == 1; // Otherwise unused 218 * if (!isHeldExclusively()) 219 * throw new IllegalMonitorStateException(); 220 * setExclusiveOwnerThread(null); 221 * setState(0); 222 * return true; 223 * } 224 * 225 * // Reports whether in locked state 226 * public boolean isLocked() { 227 * return getState() != 0; 228 * } 229 * 230 * public boolean isHeldExclusively() { 231 * // a data race, but safe due to out-of-thin-air guarantees 232 * return getExclusiveOwnerThread() == Thread.currentThread(); 233 * } 234 * 235 * // Provides a Condition 236 * public Condition newCondition() { 237 * return new ConditionObject(); 238 * } 239 * 240 * // Deserializes properly 241 * private void readObject(ObjectInputStream s) 242 * throws IOException, ClassNotFoundException { 243 * s.defaultReadObject(); 244 * setState(0); // reset to unlocked state 245 * } 246 * } 247 * 248 * // The sync object does all the hard work. We just forward to it. 249 * private final Sync sync = new Sync(); 250 * 251 * public void lock() { sync.acquire(1); } 252 * public boolean tryLock() { return sync.tryAcquire(1); } 253 * public void unlock() { sync.release(1); } 254 * public Condition newCondition() { return sync.newCondition(); } 255 * public boolean isLocked() { return sync.isLocked(); } 256 * public boolean isHeldByCurrentThread() { 257 * return sync.isHeldExclusively(); 258 * } 259 * public boolean hasQueuedThreads() { 260 * return sync.hasQueuedThreads(); 261 * } 262 * public void lockInterruptibly() throws InterruptedException { 263 * sync.acquireInterruptibly(1); 264 * } 265 * public boolean tryLock(long timeout, TimeUnit unit) 266 * throws InterruptedException { 267 * return sync.tryAcquireNanos(1, unit.toNanos(timeout)); 268 * } 269 * }}</pre> 270 * 271 * <p>Here is a latch class that is like a 272 * {@link java.util.concurrent.CountDownLatch CountDownLatch} 273 * except that it only requires a single {@code signal} to 274 * fire. Because a latch is non-exclusive, it uses the {@code shared} 275 * acquire and release methods. 276 * 277 * <pre> {@code 278 * class BooleanLatch { 279 * 280 * private static class Sync extends AbstractQueuedSynchronizer { 281 * boolean isSignalled() { return getState() != 0; } 282 * 283 * protected int tryAcquireShared(int ignore) { 284 * return isSignalled() ? 1 : -1; 285 * } 286 * 287 * protected boolean tryReleaseShared(int ignore) { 288 * setState(1); 289 * return true; 290 * } 291 * } 292 * 293 * private final Sync sync = new Sync(); 294 * public boolean isSignalled() { return sync.isSignalled(); } 295 * public void signal() { sync.releaseShared(1); } 296 * public void await() throws InterruptedException { 297 * sync.acquireSharedInterruptibly(1); 298 * } 299 * }}</pre> 300 * 301 * @since 1.5 302 * @author Doug Lea 303 */ 304 public abstract class AbstractQueuedSynchronizer 305 extends AbstractOwnableSynchronizer 306 implements java.io.Serializable { 307 308 private static final long serialVersionUID = 7373984972572414691L; 309 310 /** 311 * Creates a new {@code AbstractQueuedSynchronizer} instance 312 * with initial synchronization state of zero. 313 */ AbstractQueuedSynchronizer()314 protected AbstractQueuedSynchronizer() { } 315 316 /* 317 * Overview. 318 * 319 * The wait queue is a variant of a "CLH" (Craig, Landin, and 320 * Hagersten) lock queue. CLH locks are normally used for 321 * spinlocks. We instead use them for blocking synchronizers by 322 * including explicit ("prev" and "next") links plus a "status" 323 * field that allow nodes to signal successors when releasing 324 * locks, and handle cancellation due to interrupts and timeouts. 325 * The status field includes bits that track whether a thread 326 * needs a signal (using LockSupport.unpark). Despite these 327 * additions, we maintain most CLH locality properties. 328 * 329 * To enqueue into a CLH lock, you atomically splice it in as new 330 * tail. To dequeue, you set the head field, so the next eligible 331 * waiter becomes first. 332 * 333 * +------+ prev +-------+ +------+ 334 * | head | <---- | first | <---- | tail | 335 * +------+ +-------+ +------+ 336 * 337 * Insertion into a CLH queue requires only a single atomic 338 * operation on "tail", so there is a simple point of demarcation 339 * from unqueued to queued. The "next" link of the predecessor is 340 * set by the enqueuing thread after successful CAS. Even though 341 * non-atomic, this suffices to ensure that any blocked thread is 342 * signalled by a predecessor when eligible (although in the case 343 * of cancellation, possibly with the assistance of a signal in 344 * method cleanQueue). Signalling is based in part on a 345 * Dekker-like scheme in which the to-be waiting thread indicates 346 * WAITING status, then retries acquiring, and then rechecks 347 * status before blocking. The signaller atomically clears WAITING 348 * status when unparking. 349 * 350 * Dequeuing on acquire involves detaching (nulling) a node's 351 * "prev" node and then updating the "head". Other threads check 352 * if a node is or was dequeued by checking "prev" rather than 353 * head. We enforce the nulling then setting order by spin-waiting 354 * if necessary. Because of this, the lock algorithm is not itself 355 * strictly "lock-free" because an acquiring thread may need to 356 * wait for a previous acquire to make progress. When used with 357 * exclusive locks, such progress is required anyway. However 358 * Shared mode may (uncommonly) require a spin-wait before 359 * setting head field to ensure proper propagation. (Historical 360 * note: This allows some simplifications and efficiencies 361 * compared to previous versions of this class.) 362 * 363 * A node's predecessor can change due to cancellation while it is 364 * waiting, until the node is first in queue, at which point it 365 * cannot change. The acquire methods cope with this by rechecking 366 * "prev" before waiting. The prev and next fields are modified 367 * only via CAS by cancelled nodes in method cleanQueue. The 368 * unsplice strategy is reminiscent of Michael-Scott queues in 369 * that after a successful CAS to prev field, other threads help 370 * fix next fields. Because cancellation often occurs in bunches 371 * that complicate decisions about necessary signals, each call to 372 * cleanQueue traverses the queue until a clean sweep. Nodes that 373 * become relinked as first are unconditionally unparked 374 * (sometimes unnecessarily, but those cases are not worth 375 * avoiding). 376 * 377 * A thread may try to acquire if it is first (frontmost) in the 378 * queue, and sometimes before. Being first does not guarantee 379 * success; it only gives the right to contend. We balance 380 * throughput, overhead, and fairness by allowing incoming threads 381 * to "barge" and acquire the synchronizer while in the process of 382 * enqueuing, in which case an awakened first thread may need to 383 * rewait. To counteract possible repeated unlucky rewaits, we 384 * exponentially increase retries (up to 256) to acquire each time 385 * a thread is unparked. Except in this case, AQS locks do not 386 * spin; they instead interleave attempts to acquire with 387 * bookkeeping steps. (Users who want spinlocks can use 388 * tryAcquire.) 389 * 390 * To improve garbage collectibility, fields of nodes not yet on 391 * list are null. (It is not rare to create and then throw away a 392 * node without using it.) Fields of nodes coming off the list are 393 * nulled out as soon as possible. This accentuates the challenge 394 * of externally determining the first waiting thread (as in 395 * method getFirstQueuedThread). This sometimes requires the 396 * fallback of traversing backwards from the atomically updated 397 * "tail" when fields appear null. (This is never needed in the 398 * process of signalling though.) 399 * 400 * CLH queues need a dummy header node to get started. But 401 * we don't create them on construction, because it would be wasted 402 * effort if there is never contention. Instead, the node 403 * is constructed and head and tail pointers are set upon first 404 * contention. 405 * 406 * Shared mode operations differ from Exclusive in that an acquire 407 * signals the next waiter to try to acquire if it is also 408 * Shared. The tryAcquireShared API allows users to indicate the 409 * degree of propagation, but in most applications, it is more 410 * efficient to ignore this, allowing the successor to try 411 * acquiring in any case. 412 * 413 * Threads waiting on Conditions use nodes with an additional 414 * link to maintain the (FIFO) list of conditions. Conditions only 415 * need to link nodes in simple (non-concurrent) linked queues 416 * because they are only accessed when exclusively held. Upon 417 * await, a node is inserted into a condition queue. Upon signal, 418 * the node is enqueued on the main queue. A special status field 419 * value is used to track and atomically trigger this. 420 * 421 * Accesses to fields head, tail, and state use full Volatile 422 * mode, along with CAS. Node fields status, prev and next also do 423 * so while threads may be signallable, but sometimes use weaker 424 * modes otherwise. Accesses to field "waiter" (the thread to be 425 * signalled) are always sandwiched between other atomic accesses 426 * so are used in Plain mode. We use jdk.internal Unsafe versions 427 * of atomic access methods rather than VarHandles to avoid 428 * potential VM bootstrap issues. 429 * 430 * Most of the above is performed by primary internal method 431 * acquire, that is invoked in some way by all exported acquire 432 * methods. (It is usually easy for compilers to optimize 433 * call-site specializations when heavily used.) 434 * 435 * There are several arbitrary decisions about when and how to 436 * check interrupts in both acquire and await before and/or after 437 * blocking. The decisions are less arbitrary in implementation 438 * updates because some users appear to rely on original behaviors 439 * in ways that are racy and so (rarely) wrong in general but hard 440 * to justify changing. 441 * 442 * Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill 443 * Scherer and Michael Scott, along with members of JSR-166 444 * expert group, for helpful ideas, discussions, and critiques 445 * on the design of this class. 446 */ 447 448 // Node status bits, also used as argument and return values 449 static final int WAITING = 1; // must be 1 450 static final int CANCELLED = 0x80000000; // must be negative 451 static final int COND = 2; // in a condition wait 452 453 /** CLH Nodes */ 454 abstract static class Node { 455 volatile Node prev; // initially attached via casTail 456 volatile Node next; // visibly nonnull when signallable 457 Thread waiter; // visibly nonnull when enqueued 458 volatile int status; // written by owner, atomic bit ops by others 459 460 // methods for atomic operations casPrev(Node c, Node v)461 final boolean casPrev(Node c, Node v) { // for cleanQueue 462 return U.weakCompareAndSetReference(this, PREV, c, v); 463 } casNext(Node c, Node v)464 final boolean casNext(Node c, Node v) { // for cleanQueue 465 return U.weakCompareAndSetReference(this, NEXT, c, v); 466 } getAndUnsetStatus(int v)467 final int getAndUnsetStatus(int v) { // for signalling 468 return U.getAndBitwiseAndInt(this, STATUS, ~v); 469 } setPrevRelaxed(Node p)470 final void setPrevRelaxed(Node p) { // for off-queue assignment 471 U.putReference(this, PREV, p); 472 } setStatusRelaxed(int s)473 final void setStatusRelaxed(int s) { // for off-queue assignment 474 U.putInt(this, STATUS, s); 475 } clearStatus()476 final void clearStatus() { // for reducing unneeded signals 477 U.putIntOpaque(this, STATUS, 0); 478 } 479 480 private static final long STATUS 481 = U.objectFieldOffset(Node.class, "status"); 482 private static final long NEXT 483 = U.objectFieldOffset(Node.class, "next"); 484 private static final long PREV 485 = U.objectFieldOffset(Node.class, "prev"); 486 } 487 488 // Concrete classes tagged by type 489 static final class ExclusiveNode extends Node { } 490 static final class SharedNode extends Node { } 491 492 static final class ConditionNode extends Node 493 implements ForkJoinPool.ManagedBlocker { 494 ConditionNode nextWaiter; // link to next waiting node 495 496 /** 497 * Allows Conditions to be used in ForkJoinPools without 498 * risking fixed pool exhaustion. This is usable only for 499 * untimed Condition waits, not timed versions. 500 */ isReleasable()501 public final boolean isReleasable() { 502 return status <= 1 || Thread.currentThread().isInterrupted(); 503 } 504 block()505 public final boolean block() { 506 while (!isReleasable()) LockSupport.park(); 507 return true; 508 } 509 } 510 511 /** 512 * Head of the wait queue, lazily initialized. 513 */ 514 private transient volatile Node head; 515 516 /** 517 * Tail of the wait queue. After initialization, modified only via casTail. 518 */ 519 private transient volatile Node tail; 520 521 /** 522 * The synchronization state. 523 */ 524 private volatile int state; 525 526 /** 527 * Returns the current value of synchronization state. 528 * This operation has memory semantics of a {@code volatile} read. 529 * @return current state value 530 */ getState()531 protected final int getState() { 532 return state; 533 } 534 535 /** 536 * Sets the value of synchronization state. 537 * This operation has memory semantics of a {@code volatile} write. 538 * @param newState the new state value 539 */ setState(int newState)540 protected final void setState(int newState) { 541 state = newState; 542 } 543 544 /** 545 * Atomically sets synchronization state to the given updated 546 * value if the current state value equals the expected value. 547 * This operation has memory semantics of a {@code volatile} read 548 * and write. 549 * 550 * @param expect the expected value 551 * @param update the new value 552 * @return {@code true} if successful. False return indicates that the actual 553 * value was not equal to the expected value. 554 */ compareAndSetState(int expect, int update)555 protected final boolean compareAndSetState(int expect, int update) { 556 return U.compareAndSetInt(this, STATE, expect, update); 557 } 558 559 // Queuing utilities 560 casTail(Node c, Node v)561 private boolean casTail(Node c, Node v) { 562 return U.compareAndSetReference(this, TAIL, c, v); 563 } 564 565 /** tries once to CAS a new dummy node for head */ tryInitializeHead()566 private void tryInitializeHead() { 567 Node h = new ExclusiveNode(); 568 if (U.compareAndSetReference(this, HEAD, null, h)) 569 tail = h; 570 } 571 572 /** 573 * Enqueues the node unless null. (Currently used only for 574 * ConditionNodes; other cases are interleaved with acquires.) 575 */ enqueue(Node node)576 final void enqueue(Node node) { 577 if (node != null) { 578 for (;;) { 579 Node t = tail; 580 node.setPrevRelaxed(t); // avoid unnecessary fence 581 if (t == null) // initialize 582 tryInitializeHead(); 583 else if (casTail(t, node)) { 584 t.next = node; 585 if (t.status < 0) // wake up to clean link 586 LockSupport.unpark(node.waiter); 587 break; 588 } 589 } 590 } 591 } 592 593 /** Returns true if node is found in traversal from tail */ isEnqueued(Node node)594 final boolean isEnqueued(Node node) { 595 for (Node t = tail; t != null; t = t.prev) 596 if (t == node) 597 return true; 598 return false; 599 } 600 601 /** 602 * Wakes up the successor of given node, if one exists, and unsets its 603 * WAITING status to avoid park race. This may fail to wake up an 604 * eligible thread when one or more have been cancelled, but 605 * cancelAcquire ensures liveness. 606 */ signalNext(Node h)607 private static void signalNext(Node h) { 608 Node s; 609 if (h != null && (s = h.next) != null && s.status != 0) { 610 s.getAndUnsetStatus(WAITING); 611 LockSupport.unpark(s.waiter); 612 } 613 } 614 615 /** Wakes up the given node if in shared mode */ signalNextIfShared(Node h)616 private static void signalNextIfShared(Node h) { 617 Node s; 618 if (h != null && (s = h.next) != null && 619 (s instanceof SharedNode) && s.status != 0) { 620 s.getAndUnsetStatus(WAITING); 621 LockSupport.unpark(s.waiter); 622 } 623 } 624 625 /** 626 * Main acquire method, invoked by all exported acquire methods. 627 * 628 * @param node null unless a reacquiring Condition 629 * @param arg the acquire argument 630 * @param shared true if shared mode else exclusive 631 * @param interruptible if abort and return negative on interrupt 632 * @param timed if true use timed waits 633 * @param time if timed, the System.nanoTime value to timeout 634 * @return positive if acquired, 0 if timed out, negative if interrupted 635 */ acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time)636 final int acquire(Node node, int arg, boolean shared, 637 boolean interruptible, boolean timed, long time) { 638 Thread current = Thread.currentThread(); 639 byte spins = 0, postSpins = 0; // retries upon unpark of first thread 640 boolean interrupted = false, first = false; 641 Node pred = null; // predecessor of node when enqueued 642 643 /* 644 * Repeatedly: 645 * Check if node now first 646 * if so, ensure head stable, else ensure valid predecessor 647 * if node is first or not yet enqueued, try acquiring 648 * else if node not yet created, create it 649 * else if not yet enqueued, try once to enqueue 650 * else if woken from park, retry (up to postSpins times) 651 * else if WAITING status not set, set and retry 652 * else park and clear WAITING status, and check cancellation 653 */ 654 655 for (;;) { 656 if (!first && (pred = (node == null) ? null : node.prev) != null && 657 !(first = (head == pred))) { 658 if (pred.status < 0) { 659 cleanQueue(); // predecessor cancelled 660 continue; 661 } else if (pred.prev == null) { 662 Thread.onSpinWait(); // ensure serialization 663 continue; 664 } 665 } 666 if (first || pred == null) { 667 boolean acquired; 668 try { 669 if (shared) 670 acquired = (tryAcquireShared(arg) >= 0); 671 else 672 acquired = tryAcquire(arg); 673 } catch (Throwable ex) { 674 cancelAcquire(node, interrupted, false); 675 throw ex; 676 } 677 if (acquired) { 678 if (first) { 679 node.prev = null; 680 head = node; 681 pred.next = null; 682 node.waiter = null; 683 if (shared) 684 signalNextIfShared(node); 685 if (interrupted) 686 current.interrupt(); 687 } 688 return 1; 689 } 690 } 691 if (node == null) { // allocate; retry before enqueue 692 if (shared) 693 node = new SharedNode(); 694 else 695 node = new ExclusiveNode(); 696 } else if (pred == null) { // try to enqueue 697 node.waiter = current; 698 Node t = tail; 699 node.setPrevRelaxed(t); // avoid unnecessary fence 700 if (t == null) 701 tryInitializeHead(); 702 else if (!casTail(t, node)) 703 node.setPrevRelaxed(null); // back out 704 else 705 t.next = node; 706 } else if (first && spins != 0) { 707 --spins; // reduce unfairness on rewaits 708 Thread.onSpinWait(); 709 } else if (node.status == 0) { 710 node.status = WAITING; // enable signal and recheck 711 } else { 712 long nanos; 713 spins = postSpins = (byte)((postSpins << 1) | 1); 714 if (!timed) 715 LockSupport.park(this); 716 else if ((nanos = time - System.nanoTime()) > 0L) 717 LockSupport.parkNanos(this, nanos); 718 else 719 break; 720 node.clearStatus(); 721 if ((interrupted |= Thread.interrupted()) && interruptible) 722 break; 723 } 724 } 725 return cancelAcquire(node, interrupted, interruptible); 726 } 727 728 /** 729 * Possibly repeatedly traverses from tail, unsplicing cancelled 730 * nodes until none are found. Unparks nodes that may have been 731 * relinked to be next eligible acquirer. 732 */ cleanQueue()733 private void cleanQueue() { 734 for (;;) { // restart point 735 for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples 736 if (q == null || (p = q.prev) == null) 737 return; // end of list 738 if (s == null ? tail != q : (s.prev != q || s.status < 0)) 739 break; // inconsistent 740 if (q.status < 0) { // cancelled 741 if ((s == null ? casTail(q, p) : s.casPrev(q, p)) && 742 q.prev == p) { 743 p.casNext(q, s); // OK if fails 744 if (p.prev == null) 745 signalNext(p); 746 } 747 break; 748 } 749 if ((n = p.next) != q) { // help finish 750 if (n != null && q.prev == p) { 751 p.casNext(n, q); 752 if (p.prev == null) 753 signalNext(p); 754 } 755 break; 756 } 757 s = q; 758 q = q.prev; 759 } 760 } 761 } 762 763 /** 764 * Cancels an ongoing attempt to acquire. 765 * 766 * @param node the node (may be null if cancelled before enqueuing) 767 * @param interrupted true if thread interrupted 768 * @param interruptible if should report interruption vs reset 769 */ cancelAcquire(Node node, boolean interrupted, boolean interruptible)770 private int cancelAcquire(Node node, boolean interrupted, 771 boolean interruptible) { 772 if (node != null) { 773 node.waiter = null; 774 node.status = CANCELLED; 775 if (node.prev != null) 776 cleanQueue(); 777 } 778 if (interrupted) { 779 if (interruptible) 780 return CANCELLED; 781 else 782 Thread.currentThread().interrupt(); 783 } 784 return 0; 785 } 786 787 // Main exported methods 788 789 /** 790 * Attempts to acquire in exclusive mode. This method should query 791 * if the state of the object permits it to be acquired in the 792 * exclusive mode, and if so to acquire it. 793 * 794 * <p>This method is always invoked by the thread performing 795 * acquire. If this method reports failure, the acquire method 796 * may queue the thread, if it is not already queued, until it is 797 * signalled by a release from some other thread. This can be used 798 * to implement method {@link Lock#tryLock()}. 799 * 800 * <p>The default 801 * implementation throws {@link UnsupportedOperationException}. 802 * 803 * @param arg the acquire argument. This value is always the one 804 * passed to an acquire method, or is the value saved on entry 805 * to a condition wait. The value is otherwise uninterpreted 806 * and can represent anything you like. 807 * @return {@code true} if successful. Upon success, this object has 808 * been acquired. 809 * @throws IllegalMonitorStateException if acquiring would place this 810 * synchronizer in an illegal state. This exception must be 811 * thrown in a consistent fashion for synchronization to work 812 * correctly. 813 * @throws UnsupportedOperationException if exclusive mode is not supported 814 */ tryAcquire(int arg)815 protected boolean tryAcquire(int arg) { 816 throw new UnsupportedOperationException(); 817 } 818 819 /** 820 * Attempts to set the state to reflect a release in exclusive 821 * mode. 822 * 823 * <p>This method is always invoked by the thread performing release. 824 * 825 * <p>The default implementation throws 826 * {@link UnsupportedOperationException}. 827 * 828 * @param arg the release argument. This value is always the one 829 * passed to a release method, or the current state value upon 830 * entry to a condition wait. The value is otherwise 831 * uninterpreted and can represent anything you like. 832 * @return {@code true} if this object is now in a fully released 833 * state, so that any waiting threads may attempt to acquire; 834 * and {@code false} otherwise. 835 * @throws IllegalMonitorStateException if releasing would place this 836 * synchronizer in an illegal state. This exception must be 837 * thrown in a consistent fashion for synchronization to work 838 * correctly. 839 * @throws UnsupportedOperationException if exclusive mode is not supported 840 */ tryRelease(int arg)841 protected boolean tryRelease(int arg) { 842 throw new UnsupportedOperationException(); 843 } 844 845 /** 846 * Attempts to acquire in shared mode. This method should query if 847 * the state of the object permits it to be acquired in the shared 848 * mode, and if so to acquire it. 849 * 850 * <p>This method is always invoked by the thread performing 851 * acquire. If this method reports failure, the acquire method 852 * may queue the thread, if it is not already queued, until it is 853 * signalled by a release from some other thread. 854 * 855 * <p>The default implementation throws {@link 856 * UnsupportedOperationException}. 857 * 858 * @param arg the acquire argument. This value is always the one 859 * passed to an acquire method, or is the value saved on entry 860 * to a condition wait. The value is otherwise uninterpreted 861 * and can represent anything you like. 862 * @return a negative value on failure; zero if acquisition in shared 863 * mode succeeded but no subsequent shared-mode acquire can 864 * succeed; and a positive value if acquisition in shared 865 * mode succeeded and subsequent shared-mode acquires might 866 * also succeed, in which case a subsequent waiting thread 867 * must check availability. (Support for three different 868 * return values enables this method to be used in contexts 869 * where acquires only sometimes act exclusively.) Upon 870 * success, this object has been acquired. 871 * @throws IllegalMonitorStateException if acquiring would place this 872 * synchronizer in an illegal state. This exception must be 873 * thrown in a consistent fashion for synchronization to work 874 * correctly. 875 * @throws UnsupportedOperationException if shared mode is not supported 876 */ tryAcquireShared(int arg)877 protected int tryAcquireShared(int arg) { 878 throw new UnsupportedOperationException(); 879 } 880 881 /** 882 * Attempts to set the state to reflect a release in shared mode. 883 * 884 * <p>This method is always invoked by the thread performing release. 885 * 886 * <p>The default implementation throws 887 * {@link UnsupportedOperationException}. 888 * 889 * @param arg the release argument. This value is always the one 890 * passed to a release method, or the current state value upon 891 * entry to a condition wait. The value is otherwise 892 * uninterpreted and can represent anything you like. 893 * @return {@code true} if this release of shared mode may permit a 894 * waiting acquire (shared or exclusive) to succeed; and 895 * {@code false} otherwise 896 * @throws IllegalMonitorStateException if releasing would place this 897 * synchronizer in an illegal state. This exception must be 898 * thrown in a consistent fashion for synchronization to work 899 * correctly. 900 * @throws UnsupportedOperationException if shared mode is not supported 901 */ tryReleaseShared(int arg)902 protected boolean tryReleaseShared(int arg) { 903 throw new UnsupportedOperationException(); 904 } 905 906 /** 907 * Returns {@code true} if synchronization is held exclusively with 908 * respect to the current (calling) thread. This method is invoked 909 * upon each call to a {@link ConditionObject} method. 910 * 911 * <p>The default implementation throws {@link 912 * UnsupportedOperationException}. This method is invoked 913 * internally only within {@link ConditionObject} methods, so need 914 * not be defined if conditions are not used. 915 * 916 * @return {@code true} if synchronization is held exclusively; 917 * {@code false} otherwise 918 * @throws UnsupportedOperationException if conditions are not supported 919 */ isHeldExclusively()920 protected boolean isHeldExclusively() { 921 throw new UnsupportedOperationException(); 922 } 923 924 /** 925 * Acquires in exclusive mode, ignoring interrupts. Implemented 926 * by invoking at least once {@link #tryAcquire}, 927 * returning on success. Otherwise the thread is queued, possibly 928 * repeatedly blocking and unblocking, invoking {@link 929 * #tryAcquire} until success. This method can be used 930 * to implement method {@link Lock#lock}. 931 * 932 * @param arg the acquire argument. This value is conveyed to 933 * {@link #tryAcquire} but is otherwise uninterpreted and 934 * can represent anything you like. 935 */ acquire(int arg)936 public final void acquire(int arg) { 937 if (!tryAcquire(arg)) 938 acquire(null, arg, false, false, false, 0L); 939 } 940 941 /** 942 * Acquires in exclusive mode, aborting if interrupted. 943 * Implemented by first checking interrupt status, then invoking 944 * at least once {@link #tryAcquire}, returning on 945 * success. Otherwise the thread is queued, possibly repeatedly 946 * blocking and unblocking, invoking {@link #tryAcquire} 947 * until success or the thread is interrupted. This method can be 948 * used to implement method {@link Lock#lockInterruptibly}. 949 * 950 * @param arg the acquire argument. This value is conveyed to 951 * {@link #tryAcquire} but is otherwise uninterpreted and 952 * can represent anything you like. 953 * @throws InterruptedException if the current thread is interrupted 954 */ acquireInterruptibly(int arg)955 public final void acquireInterruptibly(int arg) 956 throws InterruptedException { 957 if (Thread.interrupted() || 958 (!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0)) 959 throw new InterruptedException(); 960 } 961 962 /** 963 * Attempts to acquire in exclusive mode, aborting if interrupted, 964 * and failing if the given timeout elapses. Implemented by first 965 * checking interrupt status, then invoking at least once {@link 966 * #tryAcquire}, returning on success. Otherwise, the thread is 967 * queued, possibly repeatedly blocking and unblocking, invoking 968 * {@link #tryAcquire} until success or the thread is interrupted 969 * or the timeout elapses. This method can be used to implement 970 * method {@link Lock#tryLock(long, TimeUnit)}. 971 * 972 * @param arg the acquire argument. This value is conveyed to 973 * {@link #tryAcquire} but is otherwise uninterpreted and 974 * can represent anything you like. 975 * @param nanosTimeout the maximum number of nanoseconds to wait 976 * @return {@code true} if acquired; {@code false} if timed out 977 * @throws InterruptedException if the current thread is interrupted 978 */ tryAcquireNanos(int arg, long nanosTimeout)979 public final boolean tryAcquireNanos(int arg, long nanosTimeout) 980 throws InterruptedException { 981 if (!Thread.interrupted()) { 982 if (tryAcquire(arg)) 983 return true; 984 if (nanosTimeout <= 0L) 985 return false; 986 int stat = acquire(null, arg, false, true, true, 987 System.nanoTime() + nanosTimeout); 988 if (stat > 0) 989 return true; 990 if (stat == 0) 991 return false; 992 } 993 throw new InterruptedException(); 994 } 995 996 /** 997 * Releases in exclusive mode. Implemented by unblocking one or 998 * more threads if {@link #tryRelease} returns true. 999 * This method can be used to implement method {@link Lock#unlock}. 1000 * 1001 * @param arg the release argument. This value is conveyed to 1002 * {@link #tryRelease} but is otherwise uninterpreted and 1003 * can represent anything you like. 1004 * @return the value returned from {@link #tryRelease} 1005 */ release(int arg)1006 public final boolean release(int arg) { 1007 if (tryRelease(arg)) { 1008 signalNext(head); 1009 return true; 1010 } 1011 return false; 1012 } 1013 1014 /** 1015 * Acquires in shared mode, ignoring interrupts. Implemented by 1016 * first invoking at least once {@link #tryAcquireShared}, 1017 * returning on success. Otherwise the thread is queued, possibly 1018 * repeatedly blocking and unblocking, invoking {@link 1019 * #tryAcquireShared} until success. 1020 * 1021 * @param arg the acquire argument. This value is conveyed to 1022 * {@link #tryAcquireShared} but is otherwise uninterpreted 1023 * and can represent anything you like. 1024 */ acquireShared(int arg)1025 public final void acquireShared(int arg) { 1026 if (tryAcquireShared(arg) < 0) 1027 acquire(null, arg, true, false, false, 0L); 1028 } 1029 1030 /** 1031 * Acquires in shared mode, aborting if interrupted. Implemented 1032 * by first checking interrupt status, then invoking at least once 1033 * {@link #tryAcquireShared}, returning on success. Otherwise the 1034 * thread is queued, possibly repeatedly blocking and unblocking, 1035 * invoking {@link #tryAcquireShared} until success or the thread 1036 * is interrupted. 1037 * @param arg the acquire argument. 1038 * This value is conveyed to {@link #tryAcquireShared} but is 1039 * otherwise uninterpreted and can represent anything 1040 * you like. 1041 * @throws InterruptedException if the current thread is interrupted 1042 */ acquireSharedInterruptibly(int arg)1043 public final void acquireSharedInterruptibly(int arg) 1044 throws InterruptedException { 1045 if (Thread.interrupted() || 1046 (tryAcquireShared(arg) < 0 && 1047 acquire(null, arg, true, true, false, 0L) < 0)) 1048 throw new InterruptedException(); 1049 } 1050 1051 /** 1052 * Attempts to acquire in shared mode, aborting if interrupted, and 1053 * failing if the given timeout elapses. Implemented by first 1054 * checking interrupt status, then invoking at least once {@link 1055 * #tryAcquireShared}, returning on success. Otherwise, the 1056 * thread is queued, possibly repeatedly blocking and unblocking, 1057 * invoking {@link #tryAcquireShared} until success or the thread 1058 * is interrupted or the timeout elapses. 1059 * 1060 * @param arg the acquire argument. This value is conveyed to 1061 * {@link #tryAcquireShared} but is otherwise uninterpreted 1062 * and can represent anything you like. 1063 * @param nanosTimeout the maximum number of nanoseconds to wait 1064 * @return {@code true} if acquired; {@code false} if timed out 1065 * @throws InterruptedException if the current thread is interrupted 1066 */ tryAcquireSharedNanos(int arg, long nanosTimeout)1067 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) 1068 throws InterruptedException { 1069 if (!Thread.interrupted()) { 1070 if (tryAcquireShared(arg) >= 0) 1071 return true; 1072 if (nanosTimeout <= 0L) 1073 return false; 1074 int stat = acquire(null, arg, true, true, true, 1075 System.nanoTime() + nanosTimeout); 1076 if (stat > 0) 1077 return true; 1078 if (stat == 0) 1079 return false; 1080 } 1081 throw new InterruptedException(); 1082 } 1083 1084 /** 1085 * Releases in shared mode. Implemented by unblocking one or more 1086 * threads if {@link #tryReleaseShared} returns true. 1087 * 1088 * @param arg the release argument. This value is conveyed to 1089 * {@link #tryReleaseShared} but is otherwise uninterpreted 1090 * and can represent anything you like. 1091 * @return the value returned from {@link #tryReleaseShared} 1092 */ releaseShared(int arg)1093 public final boolean releaseShared(int arg) { 1094 if (tryReleaseShared(arg)) { 1095 signalNext(head); 1096 return true; 1097 } 1098 return false; 1099 } 1100 1101 // Queue inspection methods 1102 1103 /** 1104 * Queries whether any threads are waiting to acquire. Note that 1105 * because cancellations due to interrupts and timeouts may occur 1106 * at any time, a {@code true} return does not guarantee that any 1107 * other thread will ever acquire. 1108 * 1109 * @return {@code true} if there may be other threads waiting to acquire 1110 */ hasQueuedThreads()1111 public final boolean hasQueuedThreads() { 1112 for (Node p = tail, h = head; p != h && p != null; p = p.prev) 1113 if (p.status >= 0) 1114 return true; 1115 return false; 1116 } 1117 1118 /** 1119 * Queries whether any threads have ever contended to acquire this 1120 * synchronizer; that is, if an acquire method has ever blocked. 1121 * 1122 * <p>In this implementation, this operation returns in 1123 * constant time. 1124 * 1125 * @return {@code true} if there has ever been contention 1126 */ hasContended()1127 public final boolean hasContended() { 1128 return head != null; 1129 } 1130 1131 /** 1132 * Returns the first (longest-waiting) thread in the queue, or 1133 * {@code null} if no threads are currently queued. 1134 * 1135 * <p>In this implementation, this operation normally returns in 1136 * constant time, but may iterate upon contention if other threads are 1137 * concurrently modifying the queue. 1138 * 1139 * @return the first (longest-waiting) thread in the queue, or 1140 * {@code null} if no threads are currently queued 1141 */ getFirstQueuedThread()1142 public final Thread getFirstQueuedThread() { 1143 Thread first = null, w; Node h, s; 1144 if ((h = head) != null && ((s = h.next) == null || 1145 (first = s.waiter) == null || 1146 s.prev == null)) { 1147 // traverse from tail on stale reads 1148 for (Node p = tail, q; p != null && (q = p.prev) != null; p = q) 1149 if ((w = p.waiter) != null) 1150 first = w; 1151 } 1152 return first; 1153 } 1154 1155 /** 1156 * Returns true if the given thread is currently queued. 1157 * 1158 * <p>This implementation traverses the queue to determine 1159 * presence of the given thread. 1160 * 1161 * @param thread the thread 1162 * @return {@code true} if the given thread is on the queue 1163 * @throws NullPointerException if the thread is null 1164 */ isQueued(Thread thread)1165 public final boolean isQueued(Thread thread) { 1166 if (thread == null) 1167 throw new NullPointerException(); 1168 for (Node p = tail; p != null; p = p.prev) 1169 if (p.waiter == thread) 1170 return true; 1171 return false; 1172 } 1173 1174 /** 1175 * Returns {@code true} if the apparent first queued thread, if one 1176 * exists, is waiting in exclusive mode. If this method returns 1177 * {@code true}, and the current thread is attempting to acquire in 1178 * shared mode (that is, this method is invoked from {@link 1179 * #tryAcquireShared}) then it is guaranteed that the current thread 1180 * is not the first queued thread. Used only as a heuristic in 1181 * ReentrantReadWriteLock. 1182 */ apparentlyFirstQueuedIsExclusive()1183 final boolean apparentlyFirstQueuedIsExclusive() { 1184 Node h, s; 1185 return (h = head) != null && (s = h.next) != null && 1186 !(s instanceof SharedNode) && s.waiter != null; 1187 } 1188 1189 /** 1190 * Queries whether any threads have been waiting to acquire longer 1191 * than the current thread. 1192 * 1193 * <p>An invocation of this method is equivalent to (but may be 1194 * more efficient than): 1195 * <pre> {@code 1196 * getFirstQueuedThread() != Thread.currentThread() 1197 * && hasQueuedThreads()}</pre> 1198 * 1199 * <p>Note that because cancellations due to interrupts and 1200 * timeouts may occur at any time, a {@code true} return does not 1201 * guarantee that some other thread will acquire before the current 1202 * thread. Likewise, it is possible for another thread to win a 1203 * race to enqueue after this method has returned {@code false}, 1204 * due to the queue being empty. 1205 * 1206 * <p>This method is designed to be used by a fair synchronizer to 1207 * avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>. 1208 * Such a synchronizer's {@link #tryAcquire} method should return 1209 * {@code false}, and its {@link #tryAcquireShared} method should 1210 * return a negative value, if this method returns {@code true} 1211 * (unless this is a reentrant acquire). For example, the {@code 1212 * tryAcquire} method for a fair, reentrant, exclusive mode 1213 * synchronizer might look like this: 1214 * 1215 * <pre> {@code 1216 * protected boolean tryAcquire(int arg) { 1217 * if (isHeldExclusively()) { 1218 * // A reentrant acquire; increment hold count 1219 * return true; 1220 * } else if (hasQueuedPredecessors()) { 1221 * return false; 1222 * } else { 1223 * // try to acquire normally 1224 * } 1225 * }}</pre> 1226 * 1227 * @return {@code true} if there is a queued thread preceding the 1228 * current thread, and {@code false} if the current thread 1229 * is at the head of the queue or the queue is empty 1230 * @since 1.7 1231 */ hasQueuedPredecessors()1232 public final boolean hasQueuedPredecessors() { 1233 Thread first = null; Node h, s; 1234 if ((h = head) != null && ((s = h.next) == null || 1235 (first = s.waiter) == null || 1236 s.prev == null)) 1237 first = getFirstQueuedThread(); // retry via getFirstQueuedThread 1238 return first != null && first != Thread.currentThread(); 1239 } 1240 1241 // Instrumentation and monitoring methods 1242 1243 /** 1244 * Returns an estimate of the number of threads waiting to 1245 * acquire. The value is only an estimate because the number of 1246 * threads may change dynamically while this method traverses 1247 * internal data structures. This method is designed for use in 1248 * monitoring system state, not for synchronization control. 1249 * 1250 * @return the estimated number of threads waiting to acquire 1251 */ getQueueLength()1252 public final int getQueueLength() { 1253 int n = 0; 1254 for (Node p = tail; p != null; p = p.prev) { 1255 if (p.waiter != null) 1256 ++n; 1257 } 1258 return n; 1259 } 1260 1261 /** 1262 * Returns a collection containing threads that may be waiting to 1263 * acquire. Because the actual set of threads may change 1264 * dynamically while constructing this result, the returned 1265 * collection is only a best-effort estimate. The elements of the 1266 * returned collection are in no particular order. This method is 1267 * designed to facilitate construction of subclasses that provide 1268 * more extensive monitoring facilities. 1269 * 1270 * @return the collection of threads 1271 */ getQueuedThreads()1272 public final Collection<Thread> getQueuedThreads() { 1273 ArrayList<Thread> list = new ArrayList<>(); 1274 for (Node p = tail; p != null; p = p.prev) { 1275 Thread t = p.waiter; 1276 if (t != null) 1277 list.add(t); 1278 } 1279 return list; 1280 } 1281 1282 /** 1283 * Returns a collection containing threads that may be waiting to 1284 * acquire in exclusive mode. This has the same properties 1285 * as {@link #getQueuedThreads} except that it only returns 1286 * those threads waiting due to an exclusive acquire. 1287 * 1288 * @return the collection of threads 1289 */ getExclusiveQueuedThreads()1290 public final Collection<Thread> getExclusiveQueuedThreads() { 1291 ArrayList<Thread> list = new ArrayList<>(); 1292 for (Node p = tail; p != null; p = p.prev) { 1293 if (!(p instanceof SharedNode)) { 1294 Thread t = p.waiter; 1295 if (t != null) 1296 list.add(t); 1297 } 1298 } 1299 return list; 1300 } 1301 1302 /** 1303 * Returns a collection containing threads that may be waiting to 1304 * acquire in shared mode. This has the same properties 1305 * as {@link #getQueuedThreads} except that it only returns 1306 * those threads waiting due to a shared acquire. 1307 * 1308 * @return the collection of threads 1309 */ getSharedQueuedThreads()1310 public final Collection<Thread> getSharedQueuedThreads() { 1311 ArrayList<Thread> list = new ArrayList<>(); 1312 for (Node p = tail; p != null; p = p.prev) { 1313 if (p instanceof SharedNode) { 1314 Thread t = p.waiter; 1315 if (t != null) 1316 list.add(t); 1317 } 1318 } 1319 return list; 1320 } 1321 1322 /** 1323 * Returns a string identifying this synchronizer, as well as its state. 1324 * The state, in brackets, includes the String {@code "State ="} 1325 * followed by the current value of {@link #getState}, and either 1326 * {@code "nonempty"} or {@code "empty"} depending on whether the 1327 * queue is empty. 1328 * 1329 * @return a string identifying this synchronizer, as well as its state 1330 */ toString()1331 public String toString() { 1332 return super.toString() 1333 + "[State = " + getState() + ", " 1334 + (hasQueuedThreads() ? "non" : "") + "empty queue]"; 1335 } 1336 1337 // Instrumentation methods for conditions 1338 1339 /** 1340 * Queries whether the given ConditionObject 1341 * uses this synchronizer as its lock. 1342 * 1343 * @param condition the condition 1344 * @return {@code true} if owned 1345 * @throws NullPointerException if the condition is null 1346 */ owns(ConditionObject condition)1347 public final boolean owns(ConditionObject condition) { 1348 return condition.isOwnedBy(this); 1349 } 1350 1351 /** 1352 * Queries whether any threads are waiting on the given condition 1353 * associated with this synchronizer. Note that because timeouts 1354 * and interrupts may occur at any time, a {@code true} return 1355 * does not guarantee that a future {@code signal} will awaken 1356 * any threads. This method is designed primarily for use in 1357 * monitoring of the system state. 1358 * 1359 * @param condition the condition 1360 * @return {@code true} if there are any waiting threads 1361 * @throws IllegalMonitorStateException if exclusive synchronization 1362 * is not held 1363 * @throws IllegalArgumentException if the given condition is 1364 * not associated with this synchronizer 1365 * @throws NullPointerException if the condition is null 1366 */ hasWaiters(ConditionObject condition)1367 public final boolean hasWaiters(ConditionObject condition) { 1368 if (!owns(condition)) 1369 throw new IllegalArgumentException("Not owner"); 1370 return condition.hasWaiters(); 1371 } 1372 1373 /** 1374 * Returns an estimate of the number of threads waiting on the 1375 * given condition associated with this synchronizer. Note that 1376 * because timeouts and interrupts may occur at any time, the 1377 * estimate serves only as an upper bound on the actual number of 1378 * waiters. This method is designed for use in monitoring system 1379 * state, not for synchronization control. 1380 * 1381 * @param condition the condition 1382 * @return the estimated number of waiting threads 1383 * @throws IllegalMonitorStateException if exclusive synchronization 1384 * is not held 1385 * @throws IllegalArgumentException if the given condition is 1386 * not associated with this synchronizer 1387 * @throws NullPointerException if the condition is null 1388 */ getWaitQueueLength(ConditionObject condition)1389 public final int getWaitQueueLength(ConditionObject condition) { 1390 if (!owns(condition)) 1391 throw new IllegalArgumentException("Not owner"); 1392 return condition.getWaitQueueLength(); 1393 } 1394 1395 /** 1396 * Returns a collection containing those threads that may be 1397 * waiting on the given condition associated with this 1398 * synchronizer. Because the actual set of threads may change 1399 * dynamically while constructing this result, the returned 1400 * collection is only a best-effort estimate. The elements of the 1401 * returned collection are in no particular order. 1402 * 1403 * @param condition the condition 1404 * @return the collection of threads 1405 * @throws IllegalMonitorStateException if exclusive synchronization 1406 * is not held 1407 * @throws IllegalArgumentException if the given condition is 1408 * not associated with this synchronizer 1409 * @throws NullPointerException if the condition is null 1410 */ getWaitingThreads(ConditionObject condition)1411 public final Collection<Thread> getWaitingThreads(ConditionObject condition) { 1412 if (!owns(condition)) 1413 throw new IllegalArgumentException("Not owner"); 1414 return condition.getWaitingThreads(); 1415 } 1416 1417 /** 1418 * Condition implementation for a {@link AbstractQueuedSynchronizer} 1419 * serving as the basis of a {@link Lock} implementation. 1420 * 1421 * <p>Method documentation for this class describes mechanics, 1422 * not behavioral specifications from the point of view of Lock 1423 * and Condition users. Exported versions of this class will in 1424 * general need to be accompanied by documentation describing 1425 * condition semantics that rely on those of the associated 1426 * {@code AbstractQueuedSynchronizer}. 1427 * 1428 * <p>This class is Serializable, but all fields are transient, 1429 * so deserialized conditions have no waiters. 1430 */ 1431 public class ConditionObject implements Condition, java.io.Serializable { 1432 private static final long serialVersionUID = 1173984872572414699L; 1433 /** First node of condition queue. */ 1434 private transient ConditionNode firstWaiter; 1435 /** Last node of condition queue. */ 1436 private transient ConditionNode lastWaiter; 1437 1438 /** 1439 * Creates a new {@code ConditionObject} instance. 1440 */ ConditionObject()1441 public ConditionObject() { } 1442 1443 // Signalling methods 1444 1445 /** 1446 * Removes and transfers one or all waiters to sync queue. 1447 */ doSignal(ConditionNode first, boolean all)1448 private void doSignal(ConditionNode first, boolean all) { 1449 while (first != null) { 1450 ConditionNode next = first.nextWaiter; 1451 if ((firstWaiter = next) == null) 1452 lastWaiter = null; 1453 if ((first.getAndUnsetStatus(COND) & COND) != 0) { 1454 enqueue(first); 1455 if (!all) 1456 break; 1457 } 1458 first = next; 1459 } 1460 } 1461 1462 /** 1463 * Moves the longest-waiting thread, if one exists, from the 1464 * wait queue for this condition to the wait queue for the 1465 * owning lock. 1466 * 1467 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1468 * returns {@code false} 1469 */ signal()1470 public final void signal() { 1471 ConditionNode first = firstWaiter; 1472 if (!isHeldExclusively()) 1473 throw new IllegalMonitorStateException(); 1474 if (first != null) 1475 doSignal(first, false); 1476 } 1477 1478 /** 1479 * Moves all threads from the wait queue for this condition to 1480 * the wait queue for the owning lock. 1481 * 1482 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1483 * returns {@code false} 1484 */ signalAll()1485 public final void signalAll() { 1486 ConditionNode first = firstWaiter; 1487 if (!isHeldExclusively()) 1488 throw new IllegalMonitorStateException(); 1489 if (first != null) 1490 doSignal(first, true); 1491 } 1492 1493 // Waiting methods 1494 1495 /** 1496 * Adds node to condition list and releases lock. 1497 * 1498 * @param node the node 1499 * @return savedState to reacquire after wait 1500 */ enableWait(ConditionNode node)1501 private int enableWait(ConditionNode node) { 1502 if (isHeldExclusively()) { 1503 node.waiter = Thread.currentThread(); 1504 node.setStatusRelaxed(COND | WAITING); 1505 ConditionNode last = lastWaiter; 1506 if (last == null) 1507 firstWaiter = node; 1508 else 1509 last.nextWaiter = node; 1510 lastWaiter = node; 1511 int savedState = getState(); 1512 if (release(savedState)) 1513 return savedState; 1514 } 1515 node.status = CANCELLED; // lock not held or inconsistent 1516 throw new IllegalMonitorStateException(); 1517 } 1518 1519 /** 1520 * Returns true if a node that was initially placed on a condition 1521 * queue is now ready to reacquire on sync queue. 1522 * @param node the node 1523 * @return true if is reacquiring 1524 */ canReacquire(ConditionNode node)1525 private boolean canReacquire(ConditionNode node) { 1526 // check links, not status to avoid enqueue race 1527 return node != null && node.prev != null && isEnqueued(node); 1528 } 1529 1530 /** 1531 * Unlinks the given node and other non-waiting nodes from 1532 * condition queue unless already unlinked. 1533 */ unlinkCancelledWaiters(ConditionNode node)1534 private void unlinkCancelledWaiters(ConditionNode node) { 1535 if (node == null || node.nextWaiter != null || node == lastWaiter) { 1536 ConditionNode w = firstWaiter, trail = null; 1537 while (w != null) { 1538 ConditionNode next = w.nextWaiter; 1539 if ((w.status & COND) == 0) { 1540 w.nextWaiter = null; 1541 if (trail == null) 1542 firstWaiter = next; 1543 else 1544 trail.nextWaiter = next; 1545 if (next == null) 1546 lastWaiter = trail; 1547 } else 1548 trail = w; 1549 w = next; 1550 } 1551 } 1552 } 1553 1554 /** 1555 * Implements uninterruptible condition wait. 1556 * <ol> 1557 * <li>Save lock state returned by {@link #getState}. 1558 * <li>Invoke {@link #release} with saved state as argument, 1559 * throwing IllegalMonitorStateException if it fails. 1560 * <li>Block until signalled. 1561 * <li>Reacquire by invoking specialized version of 1562 * {@link #acquire} with saved state as argument. 1563 * </ol> 1564 */ awaitUninterruptibly()1565 public final void awaitUninterruptibly() { 1566 ConditionNode node = new ConditionNode(); 1567 int savedState = enableWait(node); 1568 LockSupport.setCurrentBlocker(this); // for back-compatibility 1569 boolean interrupted = false, rejected = false; 1570 while (!canReacquire(node)) { 1571 if (Thread.interrupted()) 1572 interrupted = true; 1573 else if ((node.status & COND) != 0) { 1574 try { 1575 if (rejected) 1576 node.block(); 1577 else 1578 ForkJoinPool.managedBlock(node); 1579 } catch (RejectedExecutionException ex) { 1580 rejected = true; 1581 } catch (InterruptedException ie) { 1582 interrupted = true; 1583 } 1584 } else 1585 Thread.onSpinWait(); // awoke while enqueuing 1586 } 1587 LockSupport.setCurrentBlocker(null); 1588 node.clearStatus(); 1589 acquire(node, savedState, false, false, false, 0L); 1590 if (interrupted) 1591 Thread.currentThread().interrupt(); 1592 } 1593 1594 /** 1595 * Implements interruptible condition wait. 1596 * <ol> 1597 * <li>If current thread is interrupted, throw InterruptedException. 1598 * <li>Save lock state returned by {@link #getState}. 1599 * <li>Invoke {@link #release} with saved state as argument, 1600 * throwing IllegalMonitorStateException if it fails. 1601 * <li>Block until signalled or interrupted. 1602 * <li>Reacquire by invoking specialized version of 1603 * {@link #acquire} with saved state as argument. 1604 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1605 * </ol> 1606 */ await()1607 public final void await() throws InterruptedException { 1608 if (Thread.interrupted()) 1609 throw new InterruptedException(); 1610 ConditionNode node = new ConditionNode(); 1611 int savedState = enableWait(node); 1612 LockSupport.setCurrentBlocker(this); // for back-compatibility 1613 boolean interrupted = false, cancelled = false, rejected = false; 1614 while (!canReacquire(node)) { 1615 if (interrupted |= Thread.interrupted()) { 1616 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) 1617 break; // else interrupted after signal 1618 } else if ((node.status & COND) != 0) { 1619 try { 1620 if (rejected) 1621 node.block(); 1622 else 1623 ForkJoinPool.managedBlock(node); 1624 } catch (RejectedExecutionException ex) { 1625 rejected = true; 1626 } catch (InterruptedException ie) { 1627 interrupted = true; 1628 } 1629 } else 1630 Thread.onSpinWait(); // awoke while enqueuing 1631 } 1632 LockSupport.setCurrentBlocker(null); 1633 node.clearStatus(); 1634 acquire(node, savedState, false, false, false, 0L); 1635 if (interrupted) { 1636 if (cancelled) { 1637 unlinkCancelledWaiters(node); 1638 throw new InterruptedException(); 1639 } 1640 Thread.currentThread().interrupt(); 1641 } 1642 } 1643 1644 /** 1645 * Implements timed condition wait. 1646 * <ol> 1647 * <li>If current thread is interrupted, throw InterruptedException. 1648 * <li>Save lock state returned by {@link #getState}. 1649 * <li>Invoke {@link #release} with saved state as argument, 1650 * throwing IllegalMonitorStateException if it fails. 1651 * <li>Block until signalled, interrupted, or timed out. 1652 * <li>Reacquire by invoking specialized version of 1653 * {@link #acquire} with saved state as argument. 1654 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1655 * </ol> 1656 */ awaitNanos(long nanosTimeout)1657 public final long awaitNanos(long nanosTimeout) 1658 throws InterruptedException { 1659 if (Thread.interrupted()) 1660 throw new InterruptedException(); 1661 ConditionNode node = new ConditionNode(); 1662 int savedState = enableWait(node); 1663 long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout; 1664 long deadline = System.nanoTime() + nanos; 1665 boolean cancelled = false, interrupted = false; 1666 while (!canReacquire(node)) { 1667 if ((interrupted |= Thread.interrupted()) || 1668 (nanos = deadline - System.nanoTime()) <= 0L) { 1669 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) 1670 break; 1671 } else 1672 LockSupport.parkNanos(this, nanos); 1673 } 1674 node.clearStatus(); 1675 acquire(node, savedState, false, false, false, 0L); 1676 if (cancelled) { 1677 unlinkCancelledWaiters(node); 1678 if (interrupted) 1679 throw new InterruptedException(); 1680 } else if (interrupted) 1681 Thread.currentThread().interrupt(); 1682 long remaining = deadline - System.nanoTime(); // avoid overflow 1683 return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE; 1684 } 1685 1686 /** 1687 * Implements absolute timed condition wait. 1688 * <ol> 1689 * <li>If current thread is interrupted, throw InterruptedException. 1690 * <li>Save lock state returned by {@link #getState}. 1691 * <li>Invoke {@link #release} with saved state as argument, 1692 * throwing IllegalMonitorStateException if it fails. 1693 * <li>Block until signalled, interrupted, or timed out. 1694 * <li>Reacquire by invoking specialized version of 1695 * {@link #acquire} with saved state as argument. 1696 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1697 * <li>If timed out while blocked in step 4, return false, else true. 1698 * </ol> 1699 */ awaitUntil(Date deadline)1700 public final boolean awaitUntil(Date deadline) 1701 throws InterruptedException { 1702 long abstime = deadline.getTime(); 1703 if (Thread.interrupted()) 1704 throw new InterruptedException(); 1705 ConditionNode node = new ConditionNode(); 1706 int savedState = enableWait(node); 1707 boolean cancelled = false, interrupted = false; 1708 while (!canReacquire(node)) { 1709 if ((interrupted |= Thread.interrupted()) || 1710 System.currentTimeMillis() >= abstime) { 1711 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) 1712 break; 1713 } else 1714 LockSupport.parkUntil(this, abstime); 1715 } 1716 node.clearStatus(); 1717 acquire(node, savedState, false, false, false, 0L); 1718 if (cancelled) { 1719 unlinkCancelledWaiters(node); 1720 if (interrupted) 1721 throw new InterruptedException(); 1722 } else if (interrupted) 1723 Thread.currentThread().interrupt(); 1724 return !cancelled; 1725 } 1726 1727 /** 1728 * Implements timed condition wait. 1729 * <ol> 1730 * <li>If current thread is interrupted, throw InterruptedException. 1731 * <li>Save lock state returned by {@link #getState}. 1732 * <li>Invoke {@link #release} with saved state as argument, 1733 * throwing IllegalMonitorStateException if it fails. 1734 * <li>Block until signalled, interrupted, or timed out. 1735 * <li>Reacquire by invoking specialized version of 1736 * {@link #acquire} with saved state as argument. 1737 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1738 * <li>If timed out while blocked in step 4, return false, else true. 1739 * </ol> 1740 */ await(long time, TimeUnit unit)1741 public final boolean await(long time, TimeUnit unit) 1742 throws InterruptedException { 1743 long nanosTimeout = unit.toNanos(time); 1744 if (Thread.interrupted()) 1745 throw new InterruptedException(); 1746 ConditionNode node = new ConditionNode(); 1747 int savedState = enableWait(node); 1748 long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout; 1749 long deadline = System.nanoTime() + nanos; 1750 boolean cancelled = false, interrupted = false; 1751 while (!canReacquire(node)) { 1752 if ((interrupted |= Thread.interrupted()) || 1753 (nanos = deadline - System.nanoTime()) <= 0L) { 1754 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) 1755 break; 1756 } else 1757 LockSupport.parkNanos(this, nanos); 1758 } 1759 node.clearStatus(); 1760 acquire(node, savedState, false, false, false, 0L); 1761 if (cancelled) { 1762 unlinkCancelledWaiters(node); 1763 if (interrupted) 1764 throw new InterruptedException(); 1765 } else if (interrupted) 1766 Thread.currentThread().interrupt(); 1767 return !cancelled; 1768 } 1769 1770 // support for instrumentation 1771 1772 /** 1773 * Returns true if this condition was created by the given 1774 * synchronization object. 1775 * 1776 * @return {@code true} if owned 1777 */ isOwnedBy(AbstractQueuedSynchronizer sync)1778 final boolean isOwnedBy(AbstractQueuedSynchronizer sync) { 1779 return sync == AbstractQueuedSynchronizer.this; 1780 } 1781 1782 /** 1783 * Queries whether any threads are waiting on this condition. 1784 * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}. 1785 * 1786 * @return {@code true} if there are any waiting threads 1787 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1788 * returns {@code false} 1789 */ hasWaiters()1790 protected final boolean hasWaiters() { 1791 if (!isHeldExclusively()) 1792 throw new IllegalMonitorStateException(); 1793 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { 1794 if ((w.status & COND) != 0) 1795 return true; 1796 } 1797 return false; 1798 } 1799 1800 /** 1801 * Returns an estimate of the number of threads waiting on 1802 * this condition. 1803 * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}. 1804 * 1805 * @return the estimated number of waiting threads 1806 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1807 * returns {@code false} 1808 */ getWaitQueueLength()1809 protected final int getWaitQueueLength() { 1810 if (!isHeldExclusively()) 1811 throw new IllegalMonitorStateException(); 1812 int n = 0; 1813 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { 1814 if ((w.status & COND) != 0) 1815 ++n; 1816 } 1817 return n; 1818 } 1819 1820 /** 1821 * Returns a collection containing those threads that may be 1822 * waiting on this Condition. 1823 * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}. 1824 * 1825 * @return the collection of threads 1826 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1827 * returns {@code false} 1828 */ getWaitingThreads()1829 protected final Collection<Thread> getWaitingThreads() { 1830 if (!isHeldExclusively()) 1831 throw new IllegalMonitorStateException(); 1832 ArrayList<Thread> list = new ArrayList<>(); 1833 for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) { 1834 if ((w.status & COND) != 0) { 1835 Thread t = w.waiter; 1836 if (t != null) 1837 list.add(t); 1838 } 1839 } 1840 return list; 1841 } 1842 } 1843 1844 // Unsafe 1845 private static final Unsafe U = Unsafe.getUnsafe(); 1846 private static final long STATE 1847 = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "state"); 1848 private static final long HEAD 1849 = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "head"); 1850 private static final long TAIL 1851 = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "tail"); 1852 1853 static { 1854 Class<?> ensureLoaded = LockSupport.class; 1855 } 1856 } 1857