1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7 package java.util.concurrent.locks; 8 9 import java.util.ArrayList; 10 import java.util.Collection; 11 import java.util.Date; 12 import java.util.concurrent.TimeUnit; 13 import java.util.concurrent.locks.AbstractQueuedSynchronizer.Node; 14 15 /** 16 * A version of {@link AbstractQueuedSynchronizer} in 17 * which synchronization state is maintained as a {@code long}. 18 * This class has exactly the same structure, properties, and methods 19 * as {@code AbstractQueuedSynchronizer} with the exception 20 * that all state-related parameters and results are defined 21 * as {@code long} rather than {@code int}. This class 22 * may be useful when creating synchronizers such as 23 * multilevel locks and barriers that require 24 * 64 bits of state. 25 * 26 * <p>See {@link AbstractQueuedSynchronizer} for usage 27 * notes and examples. 28 * 29 * @since 1.6 30 * @author Doug Lea 31 */ 32 public abstract class AbstractQueuedLongSynchronizer 33 extends AbstractOwnableSynchronizer 34 implements java.io.Serializable { 35 36 private static final long serialVersionUID = 7373984972572414692L; 37 38 /* 39 To keep sources in sync, the remainder of this source file is 40 exactly cloned from AbstractQueuedSynchronizer, replacing class 41 name and changing ints related with sync state to longs. Please 42 keep it that way. 43 */ 44 45 /** 46 * Creates a new {@code AbstractQueuedLongSynchronizer} instance 47 * with initial synchronization state of zero. 48 */ AbstractQueuedLongSynchronizer()49 protected AbstractQueuedLongSynchronizer() { } 50 51 /** 52 * Head of the wait queue, lazily initialized. Except for 53 * initialization, it is modified only via method setHead. Note: 54 * If head exists, its waitStatus is guaranteed not to be 55 * CANCELLED. 56 */ 57 private transient volatile Node head; 58 59 /** 60 * Tail of the wait queue, lazily initialized. Modified only via 61 * method enq to add new wait node. 62 */ 63 private transient volatile Node tail; 64 65 /** 66 * The synchronization state. 67 */ 68 private volatile long state; 69 70 /** 71 * Returns the current value of synchronization state. 72 * This operation has memory semantics of a {@code volatile} read. 73 * @return current state value 74 */ getState()75 protected final long getState() { 76 return state; 77 } 78 79 /** 80 * Sets the value of synchronization state. 81 * This operation has memory semantics of a {@code volatile} write. 82 * @param newState the new state value 83 */ setState(long newState)84 protected final void setState(long newState) { 85 // Use putLongVolatile instead of ordinary volatile store when 86 // using compareAndSwapLong, for sake of some 32bit systems. 87 U.putLongVolatile(this, STATE, newState); 88 } 89 90 /** 91 * Atomically sets synchronization state to the given updated 92 * value if the current state value equals the expected value. 93 * This operation has memory semantics of a {@code volatile} read 94 * and write. 95 * 96 * @param expect the expected value 97 * @param update the new value 98 * @return {@code true} if successful. False return indicates that the actual 99 * value was not equal to the expected value. 100 */ compareAndSetState(long expect, long update)101 protected final boolean compareAndSetState(long expect, long update) { 102 return U.compareAndSwapLong(this, STATE, expect, update); 103 } 104 105 // Queuing utilities 106 107 /** 108 * The number of nanoseconds for which it is faster to spin 109 * rather than to use timed park. A rough estimate suffices 110 * to improve responsiveness with very short timeouts. 111 */ 112 static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L; 113 114 /** 115 * Inserts node into queue, initializing if necessary. See picture above. 116 * @param node the node to insert 117 * @return node's predecessor 118 */ enq(Node node)119 private Node enq(Node node) { 120 for (;;) { 121 Node oldTail = tail; 122 if (oldTail != null) { 123 U.putObject(node, Node.PREV, oldTail); 124 if (compareAndSetTail(oldTail, node)) { 125 oldTail.next = node; 126 return oldTail; 127 } 128 } else { 129 initializeSyncQueue(); 130 } 131 } 132 } 133 134 /** 135 * Creates and enqueues node for current thread and given mode. 136 * 137 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 138 * @return the new node 139 */ addWaiter(Node mode)140 private Node addWaiter(Node mode) { 141 Node node = new Node(mode); 142 143 for (;;) { 144 Node oldTail = tail; 145 if (oldTail != null) { 146 U.putObject(node, Node.PREV, oldTail); 147 if (compareAndSetTail(oldTail, node)) { 148 oldTail.next = node; 149 return node; 150 } 151 } else { 152 initializeSyncQueue(); 153 } 154 } 155 } 156 157 /** 158 * Sets head of queue to be node, thus dequeuing. Called only by 159 * acquire methods. Also nulls out unused fields for sake of GC 160 * and to suppress unnecessary signals and traversals. 161 * 162 * @param node the node 163 */ setHead(Node node)164 private void setHead(Node node) { 165 head = node; 166 node.thread = null; 167 node.prev = null; 168 } 169 170 /** 171 * Wakes up node's successor, if one exists. 172 * 173 * @param node the node 174 */ unparkSuccessor(Node node)175 private void unparkSuccessor(Node node) { 176 /* 177 * If status is negative (i.e., possibly needing signal) try 178 * to clear in anticipation of signalling. It is OK if this 179 * fails or if status is changed by waiting thread. 180 */ 181 int ws = node.waitStatus; 182 if (ws < 0) 183 node.compareAndSetWaitStatus(ws, 0); 184 185 /* 186 * Thread to unpark is held in successor, which is normally 187 * just the next node. But if cancelled or apparently null, 188 * traverse backwards from tail to find the actual 189 * non-cancelled successor. 190 */ 191 Node s = node.next; 192 if (s == null || s.waitStatus > 0) { 193 s = null; 194 for (Node p = tail; p != node && p != null; p = p.prev) 195 if (p.waitStatus <= 0) 196 s = p; 197 } 198 if (s != null) 199 LockSupport.unpark(s.thread); 200 } 201 202 /** 203 * Release action for shared mode -- signals successor and ensures 204 * propagation. (Note: For exclusive mode, release just amounts 205 * to calling unparkSuccessor of head if it needs signal.) 206 */ doReleaseShared()207 private void doReleaseShared() { 208 /* 209 * Ensure that a release propagates, even if there are other 210 * in-progress acquires/releases. This proceeds in the usual 211 * way of trying to unparkSuccessor of head if it needs 212 * signal. But if it does not, status is set to PROPAGATE to 213 * ensure that upon release, propagation continues. 214 * Additionally, we must loop in case a new node is added 215 * while we are doing this. Also, unlike other uses of 216 * unparkSuccessor, we need to know if CAS to reset status 217 * fails, if so rechecking. 218 */ 219 for (;;) { 220 Node h = head; 221 if (h != null && h != tail) { 222 int ws = h.waitStatus; 223 if (ws == Node.SIGNAL) { 224 if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) 225 continue; // loop to recheck cases 226 unparkSuccessor(h); 227 } 228 else if (ws == 0 && 229 !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) 230 continue; // loop on failed CAS 231 } 232 if (h == head) // loop if head changed 233 break; 234 } 235 } 236 237 /** 238 * Sets head of queue, and checks if successor may be waiting 239 * in shared mode, if so propagating if either propagate > 0 or 240 * PROPAGATE status was set. 241 * 242 * @param node the node 243 * @param propagate the return value from a tryAcquireShared 244 */ setHeadAndPropagate(Node node, long propagate)245 private void setHeadAndPropagate(Node node, long propagate) { 246 Node h = head; // Record old head for check below 247 setHead(node); 248 /* 249 * Try to signal next queued node if: 250 * Propagation was indicated by caller, 251 * or was recorded (as h.waitStatus either before 252 * or after setHead) by a previous operation 253 * (note: this uses sign-check of waitStatus because 254 * PROPAGATE status may transition to SIGNAL.) 255 * and 256 * The next node is waiting in shared mode, 257 * or we don't know, because it appears null 258 * 259 * The conservatism in both of these checks may cause 260 * unnecessary wake-ups, but only when there are multiple 261 * racing acquires/releases, so most need signals now or soon 262 * anyway. 263 */ 264 if (propagate > 0 || h == null || h.waitStatus < 0 || 265 (h = head) == null || h.waitStatus < 0) { 266 Node s = node.next; 267 if (s == null || s.isShared()) 268 doReleaseShared(); 269 } 270 } 271 272 // Utilities for various versions of acquire 273 274 /** 275 * Cancels an ongoing attempt to acquire. 276 * 277 * @param node the node 278 */ cancelAcquire(Node node)279 private void cancelAcquire(Node node) { 280 // Ignore if node doesn't exist 281 if (node == null) 282 return; 283 284 node.thread = null; 285 286 // Skip cancelled predecessors 287 Node pred = node.prev; 288 while (pred.waitStatus > 0) 289 node.prev = pred = pred.prev; 290 291 // predNext is the apparent node to unsplice. CASes below will 292 // fail if not, in which case, we lost race vs another cancel 293 // or signal, so no further action is necessary. 294 Node predNext = pred.next; 295 296 // Can use unconditional write instead of CAS here. 297 // After this atomic step, other Nodes can skip past us. 298 // Before, we are free of interference from other threads. 299 node.waitStatus = Node.CANCELLED; 300 301 // If we are the tail, remove ourselves. 302 if (node == tail && compareAndSetTail(node, pred)) { 303 pred.compareAndSetNext(predNext, null); 304 } else { 305 // If successor needs signal, try to set pred's next-link 306 // so it will get one. Otherwise wake it up to propagate. 307 int ws; 308 if (pred != head && 309 ((ws = pred.waitStatus) == Node.SIGNAL || 310 (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && 311 pred.thread != null) { 312 Node next = node.next; 313 if (next != null && next.waitStatus <= 0) 314 pred.compareAndSetNext(predNext, next); 315 } else { 316 unparkSuccessor(node); 317 } 318 319 node.next = node; // help GC 320 } 321 } 322 323 /** 324 * Checks and updates status for a node that failed to acquire. 325 * Returns true if thread should block. This is the main signal 326 * control in all acquire loops. Requires that pred == node.prev. 327 * 328 * @param pred node's predecessor holding status 329 * @param node the node 330 * @return {@code true} if thread should block 331 */ shouldParkAfterFailedAcquire(Node pred, Node node)332 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 333 int ws = pred.waitStatus; 334 if (ws == Node.SIGNAL) 335 /* 336 * This node has already set status asking a release 337 * to signal it, so it can safely park. 338 */ 339 return true; 340 if (ws > 0) { 341 /* 342 * Predecessor was cancelled. Skip over predecessors and 343 * indicate retry. 344 */ 345 do { 346 node.prev = pred = pred.prev; 347 } while (pred.waitStatus > 0); 348 pred.next = node; 349 } else { 350 /* 351 * waitStatus must be 0 or PROPAGATE. Indicate that we 352 * need a signal, but don't park yet. Caller will need to 353 * retry to make sure it cannot acquire before parking. 354 */ 355 pred.compareAndSetWaitStatus(ws, Node.SIGNAL); 356 } 357 return false; 358 } 359 360 /** 361 * Convenience method to interrupt current thread. 362 */ selfInterrupt()363 static void selfInterrupt() { 364 Thread.currentThread().interrupt(); 365 } 366 367 /** 368 * Convenience method to park and then check if interrupted. 369 * 370 * @return {@code true} if interrupted 371 */ parkAndCheckInterrupt()372 private final boolean parkAndCheckInterrupt() { 373 LockSupport.park(this); 374 return Thread.interrupted(); 375 } 376 377 /* 378 * Various flavors of acquire, varying in exclusive/shared and 379 * control modes. Each is mostly the same, but annoyingly 380 * different. Only a little bit of factoring is possible due to 381 * interactions of exception mechanics (including ensuring that we 382 * cancel if tryAcquire throws exception) and other control, at 383 * least not without hurting performance too much. 384 */ 385 386 /** 387 * Acquires in exclusive uninterruptible mode for thread already in 388 * queue. Used by condition wait methods as well as acquire. 389 * 390 * @param node the node 391 * @param arg the acquire argument 392 * @return {@code true} if interrupted while waiting 393 */ acquireQueued(final Node node, long arg)394 final boolean acquireQueued(final Node node, long arg) { 395 try { 396 boolean interrupted = false; 397 for (;;) { 398 final Node p = node.predecessor(); 399 if (p == head && tryAcquire(arg)) { 400 setHead(node); 401 p.next = null; // help GC 402 return interrupted; 403 } 404 if (shouldParkAfterFailedAcquire(p, node) && 405 parkAndCheckInterrupt()) 406 interrupted = true; 407 } 408 } catch (Throwable t) { 409 cancelAcquire(node); 410 throw t; 411 } 412 } 413 414 /** 415 * Acquires in exclusive interruptible mode. 416 * @param arg the acquire argument 417 */ doAcquireInterruptibly(long arg)418 private void doAcquireInterruptibly(long arg) 419 throws InterruptedException { 420 final Node node = addWaiter(Node.EXCLUSIVE); 421 try { 422 for (;;) { 423 final Node p = node.predecessor(); 424 if (p == head && tryAcquire(arg)) { 425 setHead(node); 426 p.next = null; // help GC 427 return; 428 } 429 if (shouldParkAfterFailedAcquire(p, node) && 430 parkAndCheckInterrupt()) 431 throw new InterruptedException(); 432 } 433 } catch (Throwable t) { 434 cancelAcquire(node); 435 throw t; 436 } 437 } 438 439 /** 440 * Acquires in exclusive timed mode. 441 * 442 * @param arg the acquire argument 443 * @param nanosTimeout max wait time 444 * @return {@code true} if acquired 445 */ doAcquireNanos(long arg, long nanosTimeout)446 private boolean doAcquireNanos(long arg, long nanosTimeout) 447 throws InterruptedException { 448 if (nanosTimeout <= 0L) 449 return false; 450 final long deadline = System.nanoTime() + nanosTimeout; 451 final Node node = addWaiter(Node.EXCLUSIVE); 452 try { 453 for (;;) { 454 final Node p = node.predecessor(); 455 if (p == head && tryAcquire(arg)) { 456 setHead(node); 457 p.next = null; // help GC 458 return true; 459 } 460 nanosTimeout = deadline - System.nanoTime(); 461 if (nanosTimeout <= 0L) { 462 cancelAcquire(node); 463 return false; 464 } 465 if (shouldParkAfterFailedAcquire(p, node) && 466 nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) 467 LockSupport.parkNanos(this, nanosTimeout); 468 if (Thread.interrupted()) 469 throw new InterruptedException(); 470 } 471 } catch (Throwable t) { 472 cancelAcquire(node); 473 throw t; 474 } 475 } 476 477 /** 478 * Acquires in shared uninterruptible mode. 479 * @param arg the acquire argument 480 */ doAcquireShared(long arg)481 private void doAcquireShared(long arg) { 482 final Node node = addWaiter(Node.SHARED); 483 try { 484 boolean interrupted = false; 485 for (;;) { 486 final Node p = node.predecessor(); 487 if (p == head) { 488 long r = tryAcquireShared(arg); 489 if (r >= 0) { 490 setHeadAndPropagate(node, r); 491 p.next = null; // help GC 492 if (interrupted) 493 selfInterrupt(); 494 return; 495 } 496 } 497 if (shouldParkAfterFailedAcquire(p, node) && 498 parkAndCheckInterrupt()) 499 interrupted = true; 500 } 501 } catch (Throwable t) { 502 cancelAcquire(node); 503 throw t; 504 } 505 } 506 507 /** 508 * Acquires in shared interruptible mode. 509 * @param arg the acquire argument 510 */ doAcquireSharedInterruptibly(long arg)511 private void doAcquireSharedInterruptibly(long arg) 512 throws InterruptedException { 513 final Node node = addWaiter(Node.SHARED); 514 try { 515 for (;;) { 516 final Node p = node.predecessor(); 517 if (p == head) { 518 long r = tryAcquireShared(arg); 519 if (r >= 0) { 520 setHeadAndPropagate(node, r); 521 p.next = null; // help GC 522 return; 523 } 524 } 525 if (shouldParkAfterFailedAcquire(p, node) && 526 parkAndCheckInterrupt()) 527 throw new InterruptedException(); 528 } 529 } catch (Throwable t) { 530 cancelAcquire(node); 531 throw t; 532 } 533 } 534 535 /** 536 * Acquires in shared timed mode. 537 * 538 * @param arg the acquire argument 539 * @param nanosTimeout max wait time 540 * @return {@code true} if acquired 541 */ doAcquireSharedNanos(long arg, long nanosTimeout)542 private boolean doAcquireSharedNanos(long arg, long nanosTimeout) 543 throws InterruptedException { 544 if (nanosTimeout <= 0L) 545 return false; 546 final long deadline = System.nanoTime() + nanosTimeout; 547 final Node node = addWaiter(Node.SHARED); 548 try { 549 for (;;) { 550 final Node p = node.predecessor(); 551 if (p == head) { 552 long r = tryAcquireShared(arg); 553 if (r >= 0) { 554 setHeadAndPropagate(node, r); 555 p.next = null; // help GC 556 return true; 557 } 558 } 559 nanosTimeout = deadline - System.nanoTime(); 560 if (nanosTimeout <= 0L) { 561 cancelAcquire(node); 562 return false; 563 } 564 if (shouldParkAfterFailedAcquire(p, node) && 565 nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) 566 LockSupport.parkNanos(this, nanosTimeout); 567 if (Thread.interrupted()) 568 throw new InterruptedException(); 569 } 570 } catch (Throwable t) { 571 cancelAcquire(node); 572 throw t; 573 } 574 } 575 576 // Main exported methods 577 578 /** 579 * Attempts to acquire in exclusive mode. This method should query 580 * if the state of the object permits it to be acquired in the 581 * exclusive mode, and if so to acquire it. 582 * 583 * <p>This method is always invoked by the thread performing 584 * acquire. If this method reports failure, the acquire method 585 * may queue the thread, if it is not already queued, until it is 586 * signalled by a release from some other thread. This can be used 587 * to implement method {@link Lock#tryLock()}. 588 * 589 * <p>The default 590 * implementation throws {@link UnsupportedOperationException}. 591 * 592 * @param arg the acquire argument. This value is always the one 593 * passed to an acquire method, or is the value saved on entry 594 * to a condition wait. The value is otherwise uninterpreted 595 * and can represent anything you like. 596 * @return {@code true} if successful. Upon success, this object has 597 * been acquired. 598 * @throws IllegalMonitorStateException if acquiring would place this 599 * synchronizer in an illegal state. This exception must be 600 * thrown in a consistent fashion for synchronization to work 601 * correctly. 602 * @throws UnsupportedOperationException if exclusive mode is not supported 603 */ tryAcquire(long arg)604 protected boolean tryAcquire(long arg) { 605 throw new UnsupportedOperationException(); 606 } 607 608 /** 609 * Attempts to set the state to reflect a release in exclusive 610 * mode. 611 * 612 * <p>This method is always invoked by the thread performing release. 613 * 614 * <p>The default implementation throws 615 * {@link UnsupportedOperationException}. 616 * 617 * @param arg the release argument. This value is always the one 618 * passed to a release method, or the current state value upon 619 * entry to a condition wait. The value is otherwise 620 * uninterpreted and can represent anything you like. 621 * @return {@code true} if this object is now in a fully released 622 * state, so that any waiting threads may attempt to acquire; 623 * and {@code false} otherwise. 624 * @throws IllegalMonitorStateException if releasing would place this 625 * synchronizer in an illegal state. This exception must be 626 * thrown in a consistent fashion for synchronization to work 627 * correctly. 628 * @throws UnsupportedOperationException if exclusive mode is not supported 629 */ tryRelease(long arg)630 protected boolean tryRelease(long arg) { 631 throw new UnsupportedOperationException(); 632 } 633 634 /** 635 * Attempts to acquire in shared mode. This method should query if 636 * the state of the object permits it to be acquired in the shared 637 * mode, and if so to acquire it. 638 * 639 * <p>This method is always invoked by the thread performing 640 * acquire. If this method reports failure, the acquire method 641 * may queue the thread, if it is not already queued, until it is 642 * signalled by a release from some other thread. 643 * 644 * <p>The default implementation throws {@link 645 * UnsupportedOperationException}. 646 * 647 * @param arg the acquire argument. This value is always the one 648 * passed to an acquire method, or is the value saved on entry 649 * to a condition wait. The value is otherwise uninterpreted 650 * and can represent anything you like. 651 * @return a negative value on failure; zero if acquisition in shared 652 * mode succeeded but no subsequent shared-mode acquire can 653 * succeed; and a positive value if acquisition in shared 654 * mode succeeded and subsequent shared-mode acquires might 655 * also succeed, in which case a subsequent waiting thread 656 * must check availability. (Support for three different 657 * return values enables this method to be used in contexts 658 * where acquires only sometimes act exclusively.) Upon 659 * success, this object has been acquired. 660 * @throws IllegalMonitorStateException if acquiring would place this 661 * synchronizer in an illegal state. This exception must be 662 * thrown in a consistent fashion for synchronization to work 663 * correctly. 664 * @throws UnsupportedOperationException if shared mode is not supported 665 */ tryAcquireShared(long arg)666 protected long tryAcquireShared(long arg) { 667 throw new UnsupportedOperationException(); 668 } 669 670 /** 671 * Attempts to set the state to reflect a release in shared mode. 672 * 673 * <p>This method is always invoked by the thread performing release. 674 * 675 * <p>The default implementation throws 676 * {@link UnsupportedOperationException}. 677 * 678 * @param arg the release argument. This value is always the one 679 * passed to a release method, or the current state value upon 680 * entry to a condition wait. The value is otherwise 681 * uninterpreted and can represent anything you like. 682 * @return {@code true} if this release of shared mode may permit a 683 * waiting acquire (shared or exclusive) to succeed; and 684 * {@code false} otherwise 685 * @throws IllegalMonitorStateException if releasing would place this 686 * synchronizer in an illegal state. This exception must be 687 * thrown in a consistent fashion for synchronization to work 688 * correctly. 689 * @throws UnsupportedOperationException if shared mode is not supported 690 */ tryReleaseShared(long arg)691 protected boolean tryReleaseShared(long arg) { 692 throw new UnsupportedOperationException(); 693 } 694 695 /** 696 * Returns {@code true} if synchronization is held exclusively with 697 * respect to the current (calling) thread. This method is invoked 698 * upon each call to a non-waiting {@link ConditionObject} method. 699 * (Waiting methods instead invoke {@link #release}.) 700 * 701 * <p>The default implementation throws {@link 702 * UnsupportedOperationException}. This method is invoked 703 * internally only within {@link ConditionObject} methods, so need 704 * not be defined if conditions are not used. 705 * 706 * @return {@code true} if synchronization is held exclusively; 707 * {@code false} otherwise 708 * @throws UnsupportedOperationException if conditions are not supported 709 */ isHeldExclusively()710 protected boolean isHeldExclusively() { 711 throw new UnsupportedOperationException(); 712 } 713 714 /** 715 * Acquires in exclusive mode, ignoring interrupts. Implemented 716 * by invoking at least once {@link #tryAcquire}, 717 * returning on success. Otherwise the thread is queued, possibly 718 * repeatedly blocking and unblocking, invoking {@link 719 * #tryAcquire} until success. This method can be used 720 * to implement method {@link Lock#lock}. 721 * 722 * @param arg the acquire argument. This value is conveyed to 723 * {@link #tryAcquire} but is otherwise uninterpreted and 724 * can represent anything you like. 725 */ acquire(long arg)726 public final void acquire(long arg) { 727 if (!tryAcquire(arg) && 728 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 729 selfInterrupt(); 730 } 731 732 /** 733 * Acquires in exclusive mode, aborting if interrupted. 734 * Implemented by first checking interrupt status, then invoking 735 * at least once {@link #tryAcquire}, returning on 736 * success. Otherwise the thread is queued, possibly repeatedly 737 * blocking and unblocking, invoking {@link #tryAcquire} 738 * until success or the thread is interrupted. This method can be 739 * used to implement method {@link Lock#lockInterruptibly}. 740 * 741 * @param arg the acquire argument. This value is conveyed to 742 * {@link #tryAcquire} but is otherwise uninterpreted and 743 * can represent anything you like. 744 * @throws InterruptedException if the current thread is interrupted 745 */ acquireInterruptibly(long arg)746 public final void acquireInterruptibly(long arg) 747 throws InterruptedException { 748 if (Thread.interrupted()) 749 throw new InterruptedException(); 750 if (!tryAcquire(arg)) 751 doAcquireInterruptibly(arg); 752 } 753 754 /** 755 * Attempts to acquire in exclusive mode, aborting if interrupted, 756 * and failing if the given timeout elapses. Implemented by first 757 * checking interrupt status, then invoking at least once {@link 758 * #tryAcquire}, returning on success. Otherwise, the thread is 759 * queued, possibly repeatedly blocking and unblocking, invoking 760 * {@link #tryAcquire} until success or the thread is interrupted 761 * or the timeout elapses. This method can be used to implement 762 * method {@link Lock#tryLock(long, TimeUnit)}. 763 * 764 * @param arg the acquire argument. This value is conveyed to 765 * {@link #tryAcquire} but is otherwise uninterpreted and 766 * can represent anything you like. 767 * @param nanosTimeout the maximum number of nanoseconds to wait 768 * @return {@code true} if acquired; {@code false} if timed out 769 * @throws InterruptedException if the current thread is interrupted 770 */ tryAcquireNanos(long arg, long nanosTimeout)771 public final boolean tryAcquireNanos(long arg, long nanosTimeout) 772 throws InterruptedException { 773 if (Thread.interrupted()) 774 throw new InterruptedException(); 775 return tryAcquire(arg) || 776 doAcquireNanos(arg, nanosTimeout); 777 } 778 779 /** 780 * Releases in exclusive mode. Implemented by unblocking one or 781 * more threads if {@link #tryRelease} returns true. 782 * This method can be used to implement method {@link Lock#unlock}. 783 * 784 * @param arg the release argument. This value is conveyed to 785 * {@link #tryRelease} but is otherwise uninterpreted and 786 * can represent anything you like. 787 * @return the value returned from {@link #tryRelease} 788 */ release(long arg)789 public final boolean release(long arg) { 790 if (tryRelease(arg)) { 791 Node h = head; 792 if (h != null && h.waitStatus != 0) 793 unparkSuccessor(h); 794 return true; 795 } 796 return false; 797 } 798 799 /** 800 * Acquires in shared mode, ignoring interrupts. Implemented by 801 * first invoking at least once {@link #tryAcquireShared}, 802 * returning on success. Otherwise the thread is queued, possibly 803 * repeatedly blocking and unblocking, invoking {@link 804 * #tryAcquireShared} until success. 805 * 806 * @param arg the acquire argument. This value is conveyed to 807 * {@link #tryAcquireShared} but is otherwise uninterpreted 808 * and can represent anything you like. 809 */ acquireShared(long arg)810 public final void acquireShared(long arg) { 811 if (tryAcquireShared(arg) < 0) 812 doAcquireShared(arg); 813 } 814 815 /** 816 * Acquires in shared mode, aborting if interrupted. Implemented 817 * by first checking interrupt status, then invoking at least once 818 * {@link #tryAcquireShared}, returning on success. Otherwise the 819 * thread is queued, possibly repeatedly blocking and unblocking, 820 * invoking {@link #tryAcquireShared} until success or the thread 821 * is interrupted. 822 * @param arg the acquire argument. 823 * This value is conveyed to {@link #tryAcquireShared} but is 824 * otherwise uninterpreted and can represent anything 825 * you like. 826 * @throws InterruptedException if the current thread is interrupted 827 */ acquireSharedInterruptibly(long arg)828 public final void acquireSharedInterruptibly(long arg) 829 throws InterruptedException { 830 if (Thread.interrupted()) 831 throw new InterruptedException(); 832 if (tryAcquireShared(arg) < 0) 833 doAcquireSharedInterruptibly(arg); 834 } 835 836 /** 837 * Attempts to acquire in shared mode, aborting if interrupted, and 838 * failing if the given timeout elapses. Implemented by first 839 * checking interrupt status, then invoking at least once {@link 840 * #tryAcquireShared}, returning on success. Otherwise, the 841 * thread is queued, possibly repeatedly blocking and unblocking, 842 * invoking {@link #tryAcquireShared} until success or the thread 843 * is interrupted or the timeout elapses. 844 * 845 * @param arg the acquire argument. This value is conveyed to 846 * {@link #tryAcquireShared} but is otherwise uninterpreted 847 * and can represent anything you like. 848 * @param nanosTimeout the maximum number of nanoseconds to wait 849 * @return {@code true} if acquired; {@code false} if timed out 850 * @throws InterruptedException if the current thread is interrupted 851 */ tryAcquireSharedNanos(long arg, long nanosTimeout)852 public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout) 853 throws InterruptedException { 854 if (Thread.interrupted()) 855 throw new InterruptedException(); 856 return tryAcquireShared(arg) >= 0 || 857 doAcquireSharedNanos(arg, nanosTimeout); 858 } 859 860 /** 861 * Releases in shared mode. Implemented by unblocking one or more 862 * threads if {@link #tryReleaseShared} returns true. 863 * 864 * @param arg the release argument. This value is conveyed to 865 * {@link #tryReleaseShared} but is otherwise uninterpreted 866 * and can represent anything you like. 867 * @return the value returned from {@link #tryReleaseShared} 868 */ releaseShared(long arg)869 public final boolean releaseShared(long arg) { 870 if (tryReleaseShared(arg)) { 871 doReleaseShared(); 872 return true; 873 } 874 return false; 875 } 876 877 // Queue inspection methods 878 879 /** 880 * Queries whether any threads are waiting to acquire. Note that 881 * because cancellations due to interrupts and timeouts may occur 882 * at any time, a {@code true} return does not guarantee that any 883 * other thread will ever acquire. 884 * 885 * <p>In this implementation, this operation returns in 886 * constant time. 887 * 888 * @return {@code true} if there may be other threads waiting to acquire 889 */ hasQueuedThreads()890 public final boolean hasQueuedThreads() { 891 return head != tail; 892 } 893 894 /** 895 * Queries whether any threads have ever contended to acquire this 896 * synchronizer; that is, if an acquire method has ever blocked. 897 * 898 * <p>In this implementation, this operation returns in 899 * constant time. 900 * 901 * @return {@code true} if there has ever been contention 902 */ hasContended()903 public final boolean hasContended() { 904 return head != null; 905 } 906 907 /** 908 * Returns the first (longest-waiting) thread in the queue, or 909 * {@code null} if no threads are currently queued. 910 * 911 * <p>In this implementation, this operation normally returns in 912 * constant time, but may iterate upon contention if other threads are 913 * concurrently modifying the queue. 914 * 915 * @return the first (longest-waiting) thread in the queue, or 916 * {@code null} if no threads are currently queued 917 */ getFirstQueuedThread()918 public final Thread getFirstQueuedThread() { 919 // handle only fast path, else relay 920 return (head == tail) ? null : fullGetFirstQueuedThread(); 921 } 922 923 /** 924 * Version of getFirstQueuedThread called when fastpath fails. 925 */ fullGetFirstQueuedThread()926 private Thread fullGetFirstQueuedThread() { 927 /* 928 * The first node is normally head.next. Try to get its 929 * thread field, ensuring consistent reads: If thread 930 * field is nulled out or s.prev is no longer head, then 931 * some other thread(s) concurrently performed setHead in 932 * between some of our reads. We try this twice before 933 * resorting to traversal. 934 */ 935 Node h, s; 936 Thread st; 937 if (((h = head) != null && (s = h.next) != null && 938 s.prev == head && (st = s.thread) != null) || 939 ((h = head) != null && (s = h.next) != null && 940 s.prev == head && (st = s.thread) != null)) 941 return st; 942 943 /* 944 * Head's next field might not have been set yet, or may have 945 * been unset after setHead. So we must check to see if tail 946 * is actually first node. If not, we continue on, safely 947 * traversing from tail back to head to find first, 948 * guaranteeing termination. 949 */ 950 951 Thread firstThread = null; 952 for (Node p = tail; p != null && p != head; p = p.prev) { 953 Thread t = p.thread; 954 if (t != null) 955 firstThread = t; 956 } 957 return firstThread; 958 } 959 960 /** 961 * Returns true if the given thread is currently queued. 962 * 963 * <p>This implementation traverses the queue to determine 964 * presence of the given thread. 965 * 966 * @param thread the thread 967 * @return {@code true} if the given thread is on the queue 968 * @throws NullPointerException if the thread is null 969 */ isQueued(Thread thread)970 public final boolean isQueued(Thread thread) { 971 if (thread == null) 972 throw new NullPointerException(); 973 for (Node p = tail; p != null; p = p.prev) 974 if (p.thread == thread) 975 return true; 976 return false; 977 } 978 979 /** 980 * Returns {@code true} if the apparent first queued thread, if one 981 * exists, is waiting in exclusive mode. If this method returns 982 * {@code true}, and the current thread is attempting to acquire in 983 * shared mode (that is, this method is invoked from {@link 984 * #tryAcquireShared}) then it is guaranteed that the current thread 985 * is not the first queued thread. Used only as a heuristic in 986 * ReentrantReadWriteLock. 987 */ apparentlyFirstQueuedIsExclusive()988 final boolean apparentlyFirstQueuedIsExclusive() { 989 Node h, s; 990 return (h = head) != null && 991 (s = h.next) != null && 992 !s.isShared() && 993 s.thread != null; 994 } 995 996 /** 997 * Queries whether any threads have been waiting to acquire longer 998 * than the current thread. 999 * 1000 * <p>An invocation of this method is equivalent to (but may be 1001 * more efficient than): 1002 * <pre> {@code 1003 * getFirstQueuedThread() != Thread.currentThread() 1004 * && hasQueuedThreads()}</pre> 1005 * 1006 * <p>Note that because cancellations due to interrupts and 1007 * timeouts may occur at any time, a {@code true} return does not 1008 * guarantee that some other thread will acquire before the current 1009 * thread. Likewise, it is possible for another thread to win a 1010 * race to enqueue after this method has returned {@code false}, 1011 * due to the queue being empty. 1012 * 1013 * <p>This method is designed to be used by a fair synchronizer to 1014 * avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>. 1015 * Such a synchronizer's {@link #tryAcquire} method should return 1016 * {@code false}, and its {@link #tryAcquireShared} method should 1017 * return a negative value, if this method returns {@code true} 1018 * (unless this is a reentrant acquire). For example, the {@code 1019 * tryAcquire} method for a fair, reentrant, exclusive mode 1020 * synchronizer might look like this: 1021 * 1022 * <pre> {@code 1023 * protected boolean tryAcquire(int arg) { 1024 * if (isHeldExclusively()) { 1025 * // A reentrant acquire; increment hold count 1026 * return true; 1027 * } else if (hasQueuedPredecessors()) { 1028 * return false; 1029 * } else { 1030 * // try to acquire normally 1031 * } 1032 * }}</pre> 1033 * 1034 * @return {@code true} if there is a queued thread preceding the 1035 * current thread, and {@code false} if the current thread 1036 * is at the head of the queue or the queue is empty 1037 * @since 1.7 1038 */ hasQueuedPredecessors()1039 public final boolean hasQueuedPredecessors() { 1040 // The correctness of this depends on head being initialized 1041 // before tail and on head.next being accurate if the current 1042 // thread is first in queue. 1043 Node t = tail; // Read fields in reverse initialization order 1044 Node h = head; 1045 Node s; 1046 return h != t && 1047 ((s = h.next) == null || s.thread != Thread.currentThread()); 1048 } 1049 1050 1051 // Instrumentation and monitoring methods 1052 1053 /** 1054 * Returns an estimate of the number of threads waiting to 1055 * acquire. The value is only an estimate because the number of 1056 * threads may change dynamically while this method traverses 1057 * internal data structures. This method is designed for use in 1058 * monitoring system state, not for synchronization control. 1059 * 1060 * @return the estimated number of threads waiting to acquire 1061 */ getQueueLength()1062 public final int getQueueLength() { 1063 int n = 0; 1064 for (Node p = tail; p != null; p = p.prev) { 1065 if (p.thread != null) 1066 ++n; 1067 } 1068 return n; 1069 } 1070 1071 /** 1072 * Returns a collection containing threads that may be waiting to 1073 * acquire. Because the actual set of threads may change 1074 * dynamically while constructing this result, the returned 1075 * collection is only a best-effort estimate. The elements of the 1076 * returned collection are in no particular order. This method is 1077 * designed to facilitate construction of subclasses that provide 1078 * more extensive monitoring facilities. 1079 * 1080 * @return the collection of threads 1081 */ getQueuedThreads()1082 public final Collection<Thread> getQueuedThreads() { 1083 ArrayList<Thread> list = new ArrayList<>(); 1084 for (Node p = tail; p != null; p = p.prev) { 1085 Thread t = p.thread; 1086 if (t != null) 1087 list.add(t); 1088 } 1089 return list; 1090 } 1091 1092 /** 1093 * Returns a collection containing threads that may be waiting to 1094 * acquire in exclusive mode. This has the same properties 1095 * as {@link #getQueuedThreads} except that it only returns 1096 * those threads waiting due to an exclusive acquire. 1097 * 1098 * @return the collection of threads 1099 */ getExclusiveQueuedThreads()1100 public final Collection<Thread> getExclusiveQueuedThreads() { 1101 ArrayList<Thread> list = new ArrayList<>(); 1102 for (Node p = tail; p != null; p = p.prev) { 1103 if (!p.isShared()) { 1104 Thread t = p.thread; 1105 if (t != null) 1106 list.add(t); 1107 } 1108 } 1109 return list; 1110 } 1111 1112 /** 1113 * Returns a collection containing threads that may be waiting to 1114 * acquire in shared mode. This has the same properties 1115 * as {@link #getQueuedThreads} except that it only returns 1116 * those threads waiting due to a shared acquire. 1117 * 1118 * @return the collection of threads 1119 */ getSharedQueuedThreads()1120 public final Collection<Thread> getSharedQueuedThreads() { 1121 ArrayList<Thread> list = new ArrayList<>(); 1122 for (Node p = tail; p != null; p = p.prev) { 1123 if (p.isShared()) { 1124 Thread t = p.thread; 1125 if (t != null) 1126 list.add(t); 1127 } 1128 } 1129 return list; 1130 } 1131 1132 /** 1133 * Returns a string identifying this synchronizer, as well as its state. 1134 * The state, in brackets, includes the String {@code "State ="} 1135 * followed by the current value of {@link #getState}, and either 1136 * {@code "nonempty"} or {@code "empty"} depending on whether the 1137 * queue is empty. 1138 * 1139 * @return a string identifying this synchronizer, as well as its state 1140 */ toString()1141 public String toString() { 1142 return super.toString() 1143 + "[State = " + getState() + ", " 1144 + (hasQueuedThreads() ? "non" : "") + "empty queue]"; 1145 } 1146 1147 1148 // Internal support methods for Conditions 1149 1150 /** 1151 * Returns true if a node, always one that was initially placed on 1152 * a condition queue, is now waiting to reacquire on sync queue. 1153 * @param node the node 1154 * @return true if is reacquiring 1155 */ isOnSyncQueue(Node node)1156 final boolean isOnSyncQueue(Node node) { 1157 if (node.waitStatus == Node.CONDITION || node.prev == null) 1158 return false; 1159 if (node.next != null) // If has successor, it must be on queue 1160 return true; 1161 /* 1162 * node.prev can be non-null, but not yet on queue because 1163 * the CAS to place it on queue can fail. So we have to 1164 * traverse from tail to make sure it actually made it. It 1165 * will always be near the tail in calls to this method, and 1166 * unless the CAS failed (which is unlikely), it will be 1167 * there, so we hardly ever traverse much. 1168 */ 1169 return findNodeFromTail(node); 1170 } 1171 1172 /** 1173 * Returns true if node is on sync queue by searching backwards from tail. 1174 * Called only when needed by isOnSyncQueue. 1175 * @return true if present 1176 */ findNodeFromTail(Node node)1177 private boolean findNodeFromTail(Node node) { 1178 // We check for node first, since it's likely to be at or near tail. 1179 // tail is known to be non-null, so we could re-order to "save" 1180 // one null check, but we leave it this way to help the VM. 1181 for (Node p = tail;;) { 1182 if (p == node) 1183 return true; 1184 if (p == null) 1185 return false; 1186 p = p.prev; 1187 } 1188 } 1189 1190 /** 1191 * Transfers a node from a condition queue onto sync queue. 1192 * Returns true if successful. 1193 * @param node the node 1194 * @return true if successfully transferred (else the node was 1195 * cancelled before signal) 1196 */ transferForSignal(Node node)1197 final boolean transferForSignal(Node node) { 1198 /* 1199 * If cannot change waitStatus, the node has been cancelled. 1200 */ 1201 if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) 1202 return false; 1203 1204 /* 1205 * Splice onto queue and try to set waitStatus of predecessor to 1206 * indicate that thread is (probably) waiting. If cancelled or 1207 * attempt to set waitStatus fails, wake up to resync (in which 1208 * case the waitStatus can be transiently and harmlessly wrong). 1209 */ 1210 Node p = enq(node); 1211 int ws = p.waitStatus; 1212 if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) 1213 LockSupport.unpark(node.thread); 1214 return true; 1215 } 1216 1217 /** 1218 * Transfers node, if necessary, to sync queue after a cancelled wait. 1219 * Returns true if thread was cancelled before being signalled. 1220 * 1221 * @param node the node 1222 * @return true if cancelled before the node was signalled 1223 */ transferAfterCancelledWait(Node node)1224 final boolean transferAfterCancelledWait(Node node) { 1225 if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) { 1226 enq(node); 1227 return true; 1228 } 1229 /* 1230 * If we lost out to a signal(), then we can't proceed 1231 * until it finishes its enq(). Cancelling during an 1232 * incomplete transfer is both rare and transient, so just 1233 * spin. 1234 */ 1235 while (!isOnSyncQueue(node)) 1236 Thread.yield(); 1237 return false; 1238 } 1239 1240 /** 1241 * Invokes release with current state value; returns saved state. 1242 * Cancels node and throws exception on failure. 1243 * @param node the condition node for this wait 1244 * @return previous sync state 1245 */ fullyRelease(Node node)1246 final long fullyRelease(Node node) { 1247 try { 1248 long savedState = getState(); 1249 if (release(savedState)) 1250 return savedState; 1251 throw new IllegalMonitorStateException(); 1252 } catch (Throwable t) { 1253 node.waitStatus = Node.CANCELLED; 1254 throw t; 1255 } 1256 } 1257 1258 // Instrumentation methods for conditions 1259 1260 /** 1261 * Queries whether the given ConditionObject 1262 * uses this synchronizer as its lock. 1263 * 1264 * @param condition the condition 1265 * @return {@code true} if owned 1266 * @throws NullPointerException if the condition is null 1267 */ owns(ConditionObject condition)1268 public final boolean owns(ConditionObject condition) { 1269 return condition.isOwnedBy(this); 1270 } 1271 1272 /** 1273 * Queries whether any threads are waiting on the given condition 1274 * associated with this synchronizer. Note that because timeouts 1275 * and interrupts may occur at any time, a {@code true} return 1276 * does not guarantee that a future {@code signal} will awaken 1277 * any threads. This method is designed primarily for use in 1278 * monitoring of the system state. 1279 * 1280 * @param condition the condition 1281 * @return {@code true} if there are any waiting threads 1282 * @throws IllegalMonitorStateException if exclusive synchronization 1283 * is not held 1284 * @throws IllegalArgumentException if the given condition is 1285 * not associated with this synchronizer 1286 * @throws NullPointerException if the condition is null 1287 */ hasWaiters(ConditionObject condition)1288 public final boolean hasWaiters(ConditionObject condition) { 1289 if (!owns(condition)) 1290 throw new IllegalArgumentException("Not owner"); 1291 return condition.hasWaiters(); 1292 } 1293 1294 /** 1295 * Returns an estimate of the number of threads waiting on the 1296 * given condition associated with this synchronizer. Note that 1297 * because timeouts and interrupts may occur at any time, the 1298 * estimate serves only as an upper bound on the actual number of 1299 * waiters. This method is designed for use in monitoring system 1300 * state, not for synchronization control. 1301 * 1302 * @param condition the condition 1303 * @return the estimated number of waiting threads 1304 * @throws IllegalMonitorStateException if exclusive synchronization 1305 * is not held 1306 * @throws IllegalArgumentException if the given condition is 1307 * not associated with this synchronizer 1308 * @throws NullPointerException if the condition is null 1309 */ getWaitQueueLength(ConditionObject condition)1310 public final int getWaitQueueLength(ConditionObject condition) { 1311 if (!owns(condition)) 1312 throw new IllegalArgumentException("Not owner"); 1313 return condition.getWaitQueueLength(); 1314 } 1315 1316 /** 1317 * Returns a collection containing those threads that may be 1318 * waiting on the given condition associated with this 1319 * synchronizer. Because the actual set of threads may change 1320 * dynamically while constructing this result, the returned 1321 * collection is only a best-effort estimate. The elements of the 1322 * returned collection are in no particular order. 1323 * 1324 * @param condition the condition 1325 * @return the collection of threads 1326 * @throws IllegalMonitorStateException if exclusive synchronization 1327 * is not held 1328 * @throws IllegalArgumentException if the given condition is 1329 * not associated with this synchronizer 1330 * @throws NullPointerException if the condition is null 1331 */ getWaitingThreads(ConditionObject condition)1332 public final Collection<Thread> getWaitingThreads(ConditionObject condition) { 1333 if (!owns(condition)) 1334 throw new IllegalArgumentException("Not owner"); 1335 return condition.getWaitingThreads(); 1336 } 1337 1338 /** 1339 * Condition implementation for a {@link 1340 * AbstractQueuedLongSynchronizer} serving as the basis of a {@link 1341 * Lock} implementation. 1342 * 1343 * <p>Method documentation for this class describes mechanics, 1344 * not behavioral specifications from the point of view of Lock 1345 * and Condition users. Exported versions of this class will in 1346 * general need to be accompanied by documentation describing 1347 * condition semantics that rely on those of the associated 1348 * {@code AbstractQueuedLongSynchronizer}. 1349 * 1350 * <p>This class is Serializable, but all fields are transient, 1351 * so deserialized conditions have no waiters. 1352 * 1353 * @since 1.6 1354 */ 1355 public class ConditionObject implements Condition, java.io.Serializable { 1356 private static final long serialVersionUID = 1173984872572414699L; 1357 /** First node of condition queue. */ 1358 private transient Node firstWaiter; 1359 /** Last node of condition queue. */ 1360 private transient Node lastWaiter; 1361 1362 /** 1363 * Creates a new {@code ConditionObject} instance. 1364 */ ConditionObject()1365 public ConditionObject() { } 1366 1367 // Internal methods 1368 1369 /** 1370 * Adds a new waiter to wait queue. 1371 * @return its new wait node 1372 */ addConditionWaiter()1373 private Node addConditionWaiter() { 1374 Node t = lastWaiter; 1375 // If lastWaiter is cancelled, clean out. 1376 if (t != null && t.waitStatus != Node.CONDITION) { 1377 unlinkCancelledWaiters(); 1378 t = lastWaiter; 1379 } 1380 1381 Node node = new Node(Node.CONDITION); 1382 1383 if (t == null) 1384 firstWaiter = node; 1385 else 1386 t.nextWaiter = node; 1387 lastWaiter = node; 1388 return node; 1389 } 1390 1391 /** 1392 * Removes and transfers nodes until hit non-cancelled one or 1393 * null. Split out from signal in part to encourage compilers 1394 * to inline the case of no waiters. 1395 * @param first (non-null) the first node on condition queue 1396 */ doSignal(Node first)1397 private void doSignal(Node first) { 1398 do { 1399 if ( (firstWaiter = first.nextWaiter) == null) 1400 lastWaiter = null; 1401 first.nextWaiter = null; 1402 } while (!transferForSignal(first) && 1403 (first = firstWaiter) != null); 1404 } 1405 1406 /** 1407 * Removes and transfers all nodes. 1408 * @param first (non-null) the first node on condition queue 1409 */ doSignalAll(Node first)1410 private void doSignalAll(Node first) { 1411 lastWaiter = firstWaiter = null; 1412 do { 1413 Node next = first.nextWaiter; 1414 first.nextWaiter = null; 1415 transferForSignal(first); 1416 first = next; 1417 } while (first != null); 1418 } 1419 1420 /** 1421 * Unlinks cancelled waiter nodes from condition queue. 1422 * Called only while holding lock. This is called when 1423 * cancellation occurred during condition wait, and upon 1424 * insertion of a new waiter when lastWaiter is seen to have 1425 * been cancelled. This method is needed to avoid garbage 1426 * retention in the absence of signals. So even though it may 1427 * require a full traversal, it comes into play only when 1428 * timeouts or cancellations occur in the absence of 1429 * signals. It traverses all nodes rather than stopping at a 1430 * particular target to unlink all pointers to garbage nodes 1431 * without requiring many re-traversals during cancellation 1432 * storms. 1433 */ unlinkCancelledWaiters()1434 private void unlinkCancelledWaiters() { 1435 Node t = firstWaiter; 1436 Node trail = null; 1437 while (t != null) { 1438 Node next = t.nextWaiter; 1439 if (t.waitStatus != Node.CONDITION) { 1440 t.nextWaiter = null; 1441 if (trail == null) 1442 firstWaiter = next; 1443 else 1444 trail.nextWaiter = next; 1445 if (next == null) 1446 lastWaiter = trail; 1447 } 1448 else 1449 trail = t; 1450 t = next; 1451 } 1452 } 1453 1454 // public methods 1455 1456 /** 1457 * Moves the longest-waiting thread, if one exists, from the 1458 * wait queue for this condition to the wait queue for the 1459 * owning lock. 1460 * 1461 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1462 * returns {@code false} 1463 */ signal()1464 public final void signal() { 1465 if (!isHeldExclusively()) 1466 throw new IllegalMonitorStateException(); 1467 Node first = firstWaiter; 1468 if (first != null) 1469 doSignal(first); 1470 } 1471 1472 /** 1473 * Moves all threads from the wait queue for this condition to 1474 * the wait queue for the owning lock. 1475 * 1476 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1477 * returns {@code false} 1478 */ signalAll()1479 public final void signalAll() { 1480 if (!isHeldExclusively()) 1481 throw new IllegalMonitorStateException(); 1482 Node first = firstWaiter; 1483 if (first != null) 1484 doSignalAll(first); 1485 } 1486 1487 /** 1488 * Implements uninterruptible condition wait. 1489 * <ol> 1490 * <li>Save lock state returned by {@link #getState}. 1491 * <li>Invoke {@link #release} with saved state as argument, 1492 * throwing IllegalMonitorStateException if it fails. 1493 * <li>Block until signalled. 1494 * <li>Reacquire by invoking specialized version of 1495 * {@link #acquire} with saved state as argument. 1496 * </ol> 1497 */ awaitUninterruptibly()1498 public final void awaitUninterruptibly() { 1499 Node node = addConditionWaiter(); 1500 long savedState = fullyRelease(node); 1501 boolean interrupted = false; 1502 while (!isOnSyncQueue(node)) { 1503 LockSupport.park(this); 1504 if (Thread.interrupted()) 1505 interrupted = true; 1506 } 1507 if (acquireQueued(node, savedState) || interrupted) 1508 selfInterrupt(); 1509 } 1510 1511 /* 1512 * For interruptible waits, we need to track whether to throw 1513 * InterruptedException, if interrupted while blocked on 1514 * condition, versus reinterrupt current thread, if 1515 * interrupted while blocked waiting to re-acquire. 1516 */ 1517 1518 /** Mode meaning to reinterrupt on exit from wait */ 1519 private static final int REINTERRUPT = 1; 1520 /** Mode meaning to throw InterruptedException on exit from wait */ 1521 private static final int THROW_IE = -1; 1522 1523 /** 1524 * Checks for interrupt, returning THROW_IE if interrupted 1525 * before signalled, REINTERRUPT if after signalled, or 1526 * 0 if not interrupted. 1527 */ checkInterruptWhileWaiting(Node node)1528 private int checkInterruptWhileWaiting(Node node) { 1529 return Thread.interrupted() ? 1530 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 1531 0; 1532 } 1533 1534 /** 1535 * Throws InterruptedException, reinterrupts current thread, or 1536 * does nothing, depending on mode. 1537 */ reportInterruptAfterWait(int interruptMode)1538 private void reportInterruptAfterWait(int interruptMode) 1539 throws InterruptedException { 1540 if (interruptMode == THROW_IE) 1541 throw new InterruptedException(); 1542 else if (interruptMode == REINTERRUPT) 1543 selfInterrupt(); 1544 } 1545 1546 /** 1547 * Implements interruptible condition wait. 1548 * <ol> 1549 * <li>If current thread is interrupted, throw InterruptedException. 1550 * <li>Save lock state returned by {@link #getState}. 1551 * <li>Invoke {@link #release} with saved state as argument, 1552 * throwing IllegalMonitorStateException if it fails. 1553 * <li>Block until signalled or interrupted. 1554 * <li>Reacquire by invoking specialized version of 1555 * {@link #acquire} with saved state as argument. 1556 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1557 * </ol> 1558 */ await()1559 public final void await() throws InterruptedException { 1560 if (Thread.interrupted()) 1561 throw new InterruptedException(); 1562 Node node = addConditionWaiter(); 1563 long savedState = fullyRelease(node); 1564 int interruptMode = 0; 1565 while (!isOnSyncQueue(node)) { 1566 LockSupport.park(this); 1567 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1568 break; 1569 } 1570 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1571 interruptMode = REINTERRUPT; 1572 if (node.nextWaiter != null) // clean up if cancelled 1573 unlinkCancelledWaiters(); 1574 if (interruptMode != 0) 1575 reportInterruptAfterWait(interruptMode); 1576 } 1577 1578 /** 1579 * Implements timed condition wait. 1580 * <ol> 1581 * <li>If current thread is interrupted, throw InterruptedException. 1582 * <li>Save lock state returned by {@link #getState}. 1583 * <li>Invoke {@link #release} with saved state as argument, 1584 * throwing IllegalMonitorStateException if it fails. 1585 * <li>Block until signalled, interrupted, or timed out. 1586 * <li>Reacquire by invoking specialized version of 1587 * {@link #acquire} with saved state as argument. 1588 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1589 * </ol> 1590 */ awaitNanos(long nanosTimeout)1591 public final long awaitNanos(long nanosTimeout) 1592 throws InterruptedException { 1593 if (Thread.interrupted()) 1594 throw new InterruptedException(); 1595 // We don't check for nanosTimeout <= 0L here, to allow 1596 // awaitNanos(0) as a way to "yield the lock". 1597 final long deadline = System.nanoTime() + nanosTimeout; 1598 long initialNanos = nanosTimeout; 1599 Node node = addConditionWaiter(); 1600 long savedState = fullyRelease(node); 1601 int interruptMode = 0; 1602 while (!isOnSyncQueue(node)) { 1603 if (nanosTimeout <= 0L) { 1604 transferAfterCancelledWait(node); 1605 break; 1606 } 1607 if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) 1608 LockSupport.parkNanos(this, nanosTimeout); 1609 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1610 break; 1611 nanosTimeout = deadline - System.nanoTime(); 1612 } 1613 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1614 interruptMode = REINTERRUPT; 1615 if (node.nextWaiter != null) 1616 unlinkCancelledWaiters(); 1617 if (interruptMode != 0) 1618 reportInterruptAfterWait(interruptMode); 1619 long remaining = deadline - System.nanoTime(); // avoid overflow 1620 return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE; 1621 } 1622 1623 /** 1624 * Implements absolute timed condition wait. 1625 * <ol> 1626 * <li>If current thread is interrupted, throw InterruptedException. 1627 * <li>Save lock state returned by {@link #getState}. 1628 * <li>Invoke {@link #release} with saved state as argument, 1629 * throwing IllegalMonitorStateException if it fails. 1630 * <li>Block until signalled, interrupted, or timed out. 1631 * <li>Reacquire by invoking specialized version of 1632 * {@link #acquire} with saved state as argument. 1633 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1634 * <li>If timed out while blocked in step 4, return false, else true. 1635 * </ol> 1636 */ awaitUntil(Date deadline)1637 public final boolean awaitUntil(Date deadline) 1638 throws InterruptedException { 1639 long abstime = deadline.getTime(); 1640 if (Thread.interrupted()) 1641 throw new InterruptedException(); 1642 Node node = addConditionWaiter(); 1643 long savedState = fullyRelease(node); 1644 boolean timedout = false; 1645 int interruptMode = 0; 1646 while (!isOnSyncQueue(node)) { 1647 if (System.currentTimeMillis() >= abstime) { 1648 timedout = transferAfterCancelledWait(node); 1649 break; 1650 } 1651 LockSupport.parkUntil(this, abstime); 1652 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1653 break; 1654 } 1655 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1656 interruptMode = REINTERRUPT; 1657 if (node.nextWaiter != null) 1658 unlinkCancelledWaiters(); 1659 if (interruptMode != 0) 1660 reportInterruptAfterWait(interruptMode); 1661 return !timedout; 1662 } 1663 1664 /** 1665 * Implements timed condition wait. 1666 * <ol> 1667 * <li>If current thread is interrupted, throw InterruptedException. 1668 * <li>Save lock state returned by {@link #getState}. 1669 * <li>Invoke {@link #release} with saved state as argument, 1670 * throwing IllegalMonitorStateException if it fails. 1671 * <li>Block until signalled, interrupted, or timed out. 1672 * <li>Reacquire by invoking specialized version of 1673 * {@link #acquire} with saved state as argument. 1674 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1675 * <li>If timed out while blocked in step 4, return false, else true. 1676 * </ol> 1677 */ await(long time, TimeUnit unit)1678 public final boolean await(long time, TimeUnit unit) 1679 throws InterruptedException { 1680 long nanosTimeout = unit.toNanos(time); 1681 if (Thread.interrupted()) 1682 throw new InterruptedException(); 1683 // We don't check for nanosTimeout <= 0L here, to allow 1684 // await(0, unit) as a way to "yield the lock". 1685 final long deadline = System.nanoTime() + nanosTimeout; 1686 Node node = addConditionWaiter(); 1687 long savedState = fullyRelease(node); 1688 boolean timedout = false; 1689 int interruptMode = 0; 1690 while (!isOnSyncQueue(node)) { 1691 if (nanosTimeout <= 0L) { 1692 timedout = transferAfterCancelledWait(node); 1693 break; 1694 } 1695 if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) 1696 LockSupport.parkNanos(this, nanosTimeout); 1697 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1698 break; 1699 nanosTimeout = deadline - System.nanoTime(); 1700 } 1701 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1702 interruptMode = REINTERRUPT; 1703 if (node.nextWaiter != null) 1704 unlinkCancelledWaiters(); 1705 if (interruptMode != 0) 1706 reportInterruptAfterWait(interruptMode); 1707 return !timedout; 1708 } 1709 1710 // support for instrumentation 1711 1712 /** 1713 * Returns true if this condition was created by the given 1714 * synchronization object. 1715 * 1716 * @return {@code true} if owned 1717 */ isOwnedBy(AbstractQueuedLongSynchronizer sync)1718 final boolean isOwnedBy(AbstractQueuedLongSynchronizer sync) { 1719 return sync == AbstractQueuedLongSynchronizer.this; 1720 } 1721 1722 /** 1723 * Queries whether any threads are waiting on this condition. 1724 * Implements {@link AbstractQueuedLongSynchronizer#hasWaiters(ConditionObject)}. 1725 * 1726 * @return {@code true} if there are any waiting threads 1727 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1728 * returns {@code false} 1729 */ hasWaiters()1730 protected final boolean hasWaiters() { 1731 if (!isHeldExclusively()) 1732 throw new IllegalMonitorStateException(); 1733 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 1734 if (w.waitStatus == Node.CONDITION) 1735 return true; 1736 } 1737 return false; 1738 } 1739 1740 /** 1741 * Returns an estimate of the number of threads waiting on 1742 * this condition. 1743 * Implements {@link AbstractQueuedLongSynchronizer#getWaitQueueLength(ConditionObject)}. 1744 * 1745 * @return the estimated number of waiting threads 1746 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1747 * returns {@code false} 1748 */ getWaitQueueLength()1749 protected final int getWaitQueueLength() { 1750 if (!isHeldExclusively()) 1751 throw new IllegalMonitorStateException(); 1752 int n = 0; 1753 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 1754 if (w.waitStatus == Node.CONDITION) 1755 ++n; 1756 } 1757 return n; 1758 } 1759 1760 /** 1761 * Returns a collection containing those threads that may be 1762 * waiting on this Condition. 1763 * Implements {@link AbstractQueuedLongSynchronizer#getWaitingThreads(ConditionObject)}. 1764 * 1765 * @return the collection of threads 1766 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1767 * returns {@code false} 1768 */ getWaitingThreads()1769 protected final Collection<Thread> getWaitingThreads() { 1770 if (!isHeldExclusively()) 1771 throw new IllegalMonitorStateException(); 1772 ArrayList<Thread> list = new ArrayList<>(); 1773 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 1774 if (w.waitStatus == Node.CONDITION) { 1775 Thread t = w.thread; 1776 if (t != null) 1777 list.add(t); 1778 } 1779 } 1780 return list; 1781 } 1782 } 1783 1784 /** 1785 * Setup to support compareAndSet. We need to natively implement 1786 * this here: For the sake of permitting future enhancements, we 1787 * cannot explicitly subclass AtomicLong, which would be 1788 * efficient and useful otherwise. So, as the lesser of evils, we 1789 * natively implement using hotspot intrinsics API. And while we 1790 * are at it, we do the same for other CASable fields (which could 1791 * otherwise be done with atomic field updaters). 1792 */ 1793 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); 1794 private static final long STATE; 1795 private static final long HEAD; 1796 private static final long TAIL; 1797 1798 static { 1799 try { 1800 STATE = U.objectFieldOffset 1801 (AbstractQueuedLongSynchronizer.class.getDeclaredField("state")); 1802 HEAD = U.objectFieldOffset 1803 (AbstractQueuedLongSynchronizer.class.getDeclaredField("head")); 1804 TAIL = U.objectFieldOffset 1805 (AbstractQueuedLongSynchronizer.class.getDeclaredField("tail")); 1806 } catch (ReflectiveOperationException e) { 1807 throw new Error(e); 1808 } 1809 1810 // Reduce the risk of rare disastrous classloading in first call to 1811 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 1812 Class<?> ensureLoaded = LockSupport.class; 1813 } 1814 1815 /** 1816 * Initializes head and tail fields on first contention. 1817 */ initializeSyncQueue()1818 private final void initializeSyncQueue() { 1819 Node h; 1820 if (U.compareAndSwapObject(this, HEAD, null, (h = new Node()))) 1821 tail = h; 1822 } 1823 1824 /** 1825 * CASes tail field. 1826 */ compareAndSetTail(Node expect, Node update)1827 private final boolean compareAndSetTail(Node expect, Node update) { 1828 return U.compareAndSwapObject(this, TAIL, expect, update); 1829 } 1830 } 1831