1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent.locks; 37 38 import java.util.ArrayList; 39 import java.util.Collection; 40 import java.util.Date; 41 import java.util.concurrent.TimeUnit; 42 import java.util.concurrent.locks.AbstractQueuedSynchronizer.Node; 43 44 /** 45 * A version of {@link AbstractQueuedSynchronizer} in 46 * which synchronization state is maintained as a {@code long}. 47 * This class has exactly the same structure, properties, and methods 48 * as {@code AbstractQueuedSynchronizer} with the exception 49 * that all state-related parameters and results are defined 50 * as {@code long} rather than {@code int}. This class 51 * may be useful when creating synchronizers such as 52 * multilevel locks and barriers that require 53 * 64 bits of state. 54 * 55 * <p>See {@link AbstractQueuedSynchronizer} for usage 56 * notes and examples. 57 * 58 * @since 1.6 59 * @author Doug Lea 60 */ 61 public abstract class AbstractQueuedLongSynchronizer 62 extends AbstractOwnableSynchronizer 63 implements java.io.Serializable { 64 65 private static final long serialVersionUID = 7373984972572414692L; 66 67 /* 68 To keep sources in sync, the remainder of this source file is 69 exactly cloned from AbstractQueuedSynchronizer, replacing class 70 name and changing ints related with sync state to longs. Please 71 keep it that way. 72 */ 73 74 /** 75 * Creates a new {@code AbstractQueuedLongSynchronizer} instance 76 * with initial synchronization state of zero. 77 */ AbstractQueuedLongSynchronizer()78 protected AbstractQueuedLongSynchronizer() { } 79 80 /** 81 * Head of the wait queue, lazily initialized. Except for 82 * initialization, it is modified only via method setHead. Note: 83 * If head exists, its waitStatus is guaranteed not to be 84 * CANCELLED. 85 */ 86 private transient volatile Node head; 87 88 /** 89 * Tail of the wait queue, lazily initialized. Modified only via 90 * method enq to add new wait node. 91 */ 92 private transient volatile Node tail; 93 94 /** 95 * The synchronization state. 96 */ 97 private volatile long state; 98 99 /** 100 * Returns the current value of synchronization state. 101 * This operation has memory semantics of a {@code volatile} read. 102 * @return current state value 103 */ getState()104 protected final long getState() { 105 return state; 106 } 107 108 /** 109 * Sets the value of synchronization state. 110 * This operation has memory semantics of a {@code volatile} write. 111 * @param newState the new state value 112 */ setState(long newState)113 protected final void setState(long newState) { 114 // Use putLongVolatile instead of ordinary volatile store when 115 // using compareAndSwapLong, for sake of some 32bit systems. 116 U.putLongVolatile(this, STATE, newState); 117 } 118 119 /** 120 * Atomically sets synchronization state to the given updated 121 * value if the current state value equals the expected value. 122 * This operation has memory semantics of a {@code volatile} read 123 * and write. 124 * 125 * @param expect the expected value 126 * @param update the new value 127 * @return {@code true} if successful. False return indicates that the actual 128 * value was not equal to the expected value. 129 */ compareAndSetState(long expect, long update)130 protected final boolean compareAndSetState(long expect, long update) { 131 return U.compareAndSwapLong(this, STATE, expect, update); 132 } 133 134 // Queuing utilities 135 136 /** 137 * The number of nanoseconds for which it is faster to spin 138 * rather than to use timed park. A rough estimate suffices 139 * to improve responsiveness with very short timeouts. 140 */ 141 static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L; 142 143 /** 144 * Inserts node into queue, initializing if necessary. See picture above. 145 * @param node the node to insert 146 * @return node's predecessor 147 */ enq(Node node)148 private Node enq(Node node) { 149 for (;;) { 150 Node oldTail = tail; 151 if (oldTail != null) { 152 U.putObject(node, Node.PREV, oldTail); 153 if (compareAndSetTail(oldTail, node)) { 154 oldTail.next = node; 155 return oldTail; 156 } 157 } else { 158 initializeSyncQueue(); 159 } 160 } 161 } 162 163 /** 164 * Creates and enqueues node for current thread and given mode. 165 * 166 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 167 * @return the new node 168 */ addWaiter(Node mode)169 private Node addWaiter(Node mode) { 170 Node node = new Node(mode); 171 172 for (;;) { 173 Node oldTail = tail; 174 if (oldTail != null) { 175 U.putObject(node, Node.PREV, oldTail); 176 if (compareAndSetTail(oldTail, node)) { 177 oldTail.next = node; 178 return node; 179 } 180 } else { 181 initializeSyncQueue(); 182 } 183 } 184 } 185 186 /** 187 * Sets head of queue to be node, thus dequeuing. Called only by 188 * acquire methods. Also nulls out unused fields for sake of GC 189 * and to suppress unnecessary signals and traversals. 190 * 191 * @param node the node 192 */ setHead(Node node)193 private void setHead(Node node) { 194 head = node; 195 node.thread = null; 196 node.prev = null; 197 } 198 199 /** 200 * Wakes up node's successor, if one exists. 201 * 202 * @param node the node 203 */ unparkSuccessor(Node node)204 private void unparkSuccessor(Node node) { 205 /* 206 * If status is negative (i.e., possibly needing signal) try 207 * to clear in anticipation of signalling. It is OK if this 208 * fails or if status is changed by waiting thread. 209 */ 210 int ws = node.waitStatus; 211 if (ws < 0) 212 node.compareAndSetWaitStatus(ws, 0); 213 214 /* 215 * Thread to unpark is held in successor, which is normally 216 * just the next node. But if cancelled or apparently null, 217 * traverse backwards from tail to find the actual 218 * non-cancelled successor. 219 */ 220 Node s = node.next; 221 if (s == null || s.waitStatus > 0) { 222 s = null; 223 for (Node p = tail; p != node && p != null; p = p.prev) 224 if (p.waitStatus <= 0) 225 s = p; 226 } 227 if (s != null) 228 LockSupport.unpark(s.thread); 229 } 230 231 /** 232 * Release action for shared mode -- signals successor and ensures 233 * propagation. (Note: For exclusive mode, release just amounts 234 * to calling unparkSuccessor of head if it needs signal.) 235 */ doReleaseShared()236 private void doReleaseShared() { 237 /* 238 * Ensure that a release propagates, even if there are other 239 * in-progress acquires/releases. This proceeds in the usual 240 * way of trying to unparkSuccessor of head if it needs 241 * signal. But if it does not, status is set to PROPAGATE to 242 * ensure that upon release, propagation continues. 243 * Additionally, we must loop in case a new node is added 244 * while we are doing this. Also, unlike other uses of 245 * unparkSuccessor, we need to know if CAS to reset status 246 * fails, if so rechecking. 247 */ 248 for (;;) { 249 Node h = head; 250 if (h != null && h != tail) { 251 int ws = h.waitStatus; 252 if (ws == Node.SIGNAL) { 253 if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) 254 continue; // loop to recheck cases 255 unparkSuccessor(h); 256 } 257 else if (ws == 0 && 258 !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) 259 continue; // loop on failed CAS 260 } 261 if (h == head) // loop if head changed 262 break; 263 } 264 } 265 266 /** 267 * Sets head of queue, and checks if successor may be waiting 268 * in shared mode, if so propagating if either propagate > 0 or 269 * PROPAGATE status was set. 270 * 271 * @param node the node 272 * @param propagate the return value from a tryAcquireShared 273 */ setHeadAndPropagate(Node node, long propagate)274 private void setHeadAndPropagate(Node node, long propagate) { 275 Node h = head; // Record old head for check below 276 setHead(node); 277 /* 278 * Try to signal next queued node if: 279 * Propagation was indicated by caller, 280 * or was recorded (as h.waitStatus either before 281 * or after setHead) by a previous operation 282 * (note: this uses sign-check of waitStatus because 283 * PROPAGATE status may transition to SIGNAL.) 284 * and 285 * The next node is waiting in shared mode, 286 * or we don't know, because it appears null 287 * 288 * The conservatism in both of these checks may cause 289 * unnecessary wake-ups, but only when there are multiple 290 * racing acquires/releases, so most need signals now or soon 291 * anyway. 292 */ 293 if (propagate > 0 || h == null || h.waitStatus < 0 || 294 (h = head) == null || h.waitStatus < 0) { 295 Node s = node.next; 296 if (s == null || s.isShared()) 297 doReleaseShared(); 298 } 299 } 300 301 // Utilities for various versions of acquire 302 303 /** 304 * Cancels an ongoing attempt to acquire. 305 * 306 * @param node the node 307 */ cancelAcquire(Node node)308 private void cancelAcquire(Node node) { 309 // Ignore if node doesn't exist 310 if (node == null) 311 return; 312 313 node.thread = null; 314 315 // Skip cancelled predecessors 316 Node pred = node.prev; 317 while (pred.waitStatus > 0) 318 node.prev = pred = pred.prev; 319 320 // predNext is the apparent node to unsplice. CASes below will 321 // fail if not, in which case, we lost race vs another cancel 322 // or signal, so no further action is necessary. 323 Node predNext = pred.next; 324 325 // Can use unconditional write instead of CAS here. 326 // After this atomic step, other Nodes can skip past us. 327 // Before, we are free of interference from other threads. 328 node.waitStatus = Node.CANCELLED; 329 330 // If we are the tail, remove ourselves. 331 if (node == tail && compareAndSetTail(node, pred)) { 332 pred.compareAndSetNext(predNext, null); 333 } else { 334 // If successor needs signal, try to set pred's next-link 335 // so it will get one. Otherwise wake it up to propagate. 336 int ws; 337 if (pred != head && 338 ((ws = pred.waitStatus) == Node.SIGNAL || 339 (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && 340 pred.thread != null) { 341 Node next = node.next; 342 if (next != null && next.waitStatus <= 0) 343 pred.compareAndSetNext(predNext, next); 344 } else { 345 unparkSuccessor(node); 346 } 347 348 node.next = node; // help GC 349 } 350 } 351 352 /** 353 * Checks and updates status for a node that failed to acquire. 354 * Returns true if thread should block. This is the main signal 355 * control in all acquire loops. Requires that pred == node.prev. 356 * 357 * @param pred node's predecessor holding status 358 * @param node the node 359 * @return {@code true} if thread should block 360 */ shouldParkAfterFailedAcquire(Node pred, Node node)361 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 362 int ws = pred.waitStatus; 363 if (ws == Node.SIGNAL) 364 /* 365 * This node has already set status asking a release 366 * to signal it, so it can safely park. 367 */ 368 return true; 369 if (ws > 0) { 370 /* 371 * Predecessor was cancelled. Skip over predecessors and 372 * indicate retry. 373 */ 374 do { 375 node.prev = pred = pred.prev; 376 } while (pred.waitStatus > 0); 377 pred.next = node; 378 } else { 379 /* 380 * waitStatus must be 0 or PROPAGATE. Indicate that we 381 * need a signal, but don't park yet. Caller will need to 382 * retry to make sure it cannot acquire before parking. 383 */ 384 pred.compareAndSetWaitStatus(ws, Node.SIGNAL); 385 } 386 return false; 387 } 388 389 /** 390 * Convenience method to interrupt current thread. 391 */ selfInterrupt()392 static void selfInterrupt() { 393 Thread.currentThread().interrupt(); 394 } 395 396 /** 397 * Convenience method to park and then check if interrupted. 398 * 399 * @return {@code true} if interrupted 400 */ parkAndCheckInterrupt()401 private final boolean parkAndCheckInterrupt() { 402 LockSupport.park(this); 403 return Thread.interrupted(); 404 } 405 406 /* 407 * Various flavors of acquire, varying in exclusive/shared and 408 * control modes. Each is mostly the same, but annoyingly 409 * different. Only a little bit of factoring is possible due to 410 * interactions of exception mechanics (including ensuring that we 411 * cancel if tryAcquire throws exception) and other control, at 412 * least not without hurting performance too much. 413 */ 414 415 /** 416 * Acquires in exclusive uninterruptible mode for thread already in 417 * queue. Used by condition wait methods as well as acquire. 418 * 419 * @param node the node 420 * @param arg the acquire argument 421 * @return {@code true} if interrupted while waiting 422 */ acquireQueued(final Node node, long arg)423 final boolean acquireQueued(final Node node, long arg) { 424 try { 425 boolean interrupted = false; 426 for (;;) { 427 final Node p = node.predecessor(); 428 if (p == head && tryAcquire(arg)) { 429 setHead(node); 430 p.next = null; // help GC 431 return interrupted; 432 } 433 if (shouldParkAfterFailedAcquire(p, node) && 434 parkAndCheckInterrupt()) 435 interrupted = true; 436 } 437 } catch (Throwable t) { 438 cancelAcquire(node); 439 throw t; 440 } 441 } 442 443 /** 444 * Acquires in exclusive interruptible mode. 445 * @param arg the acquire argument 446 */ doAcquireInterruptibly(long arg)447 private void doAcquireInterruptibly(long arg) 448 throws InterruptedException { 449 final Node node = addWaiter(Node.EXCLUSIVE); 450 try { 451 for (;;) { 452 final Node p = node.predecessor(); 453 if (p == head && tryAcquire(arg)) { 454 setHead(node); 455 p.next = null; // help GC 456 return; 457 } 458 if (shouldParkAfterFailedAcquire(p, node) && 459 parkAndCheckInterrupt()) 460 throw new InterruptedException(); 461 } 462 } catch (Throwable t) { 463 cancelAcquire(node); 464 throw t; 465 } 466 } 467 468 /** 469 * Acquires in exclusive timed mode. 470 * 471 * @param arg the acquire argument 472 * @param nanosTimeout max wait time 473 * @return {@code true} if acquired 474 */ doAcquireNanos(long arg, long nanosTimeout)475 private boolean doAcquireNanos(long arg, long nanosTimeout) 476 throws InterruptedException { 477 if (nanosTimeout <= 0L) 478 return false; 479 final long deadline = System.nanoTime() + nanosTimeout; 480 final Node node = addWaiter(Node.EXCLUSIVE); 481 try { 482 for (;;) { 483 final Node p = node.predecessor(); 484 if (p == head && tryAcquire(arg)) { 485 setHead(node); 486 p.next = null; // help GC 487 return true; 488 } 489 nanosTimeout = deadline - System.nanoTime(); 490 if (nanosTimeout <= 0L) { 491 cancelAcquire(node); 492 return false; 493 } 494 if (shouldParkAfterFailedAcquire(p, node) && 495 nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) 496 LockSupport.parkNanos(this, nanosTimeout); 497 if (Thread.interrupted()) 498 throw new InterruptedException(); 499 } 500 } catch (Throwable t) { 501 cancelAcquire(node); 502 throw t; 503 } 504 } 505 506 /** 507 * Acquires in shared uninterruptible mode. 508 * @param arg the acquire argument 509 */ doAcquireShared(long arg)510 private void doAcquireShared(long arg) { 511 final Node node = addWaiter(Node.SHARED); 512 try { 513 boolean interrupted = false; 514 for (;;) { 515 final Node p = node.predecessor(); 516 if (p == head) { 517 long r = tryAcquireShared(arg); 518 if (r >= 0) { 519 setHeadAndPropagate(node, r); 520 p.next = null; // help GC 521 if (interrupted) 522 selfInterrupt(); 523 return; 524 } 525 } 526 if (shouldParkAfterFailedAcquire(p, node) && 527 parkAndCheckInterrupt()) 528 interrupted = true; 529 } 530 } catch (Throwable t) { 531 cancelAcquire(node); 532 throw t; 533 } 534 } 535 536 /** 537 * Acquires in shared interruptible mode. 538 * @param arg the acquire argument 539 */ doAcquireSharedInterruptibly(long arg)540 private void doAcquireSharedInterruptibly(long arg) 541 throws InterruptedException { 542 final Node node = addWaiter(Node.SHARED); 543 try { 544 for (;;) { 545 final Node p = node.predecessor(); 546 if (p == head) { 547 long r = tryAcquireShared(arg); 548 if (r >= 0) { 549 setHeadAndPropagate(node, r); 550 p.next = null; // help GC 551 return; 552 } 553 } 554 if (shouldParkAfterFailedAcquire(p, node) && 555 parkAndCheckInterrupt()) 556 throw new InterruptedException(); 557 } 558 } catch (Throwable t) { 559 cancelAcquire(node); 560 throw t; 561 } 562 } 563 564 /** 565 * Acquires in shared timed mode. 566 * 567 * @param arg the acquire argument 568 * @param nanosTimeout max wait time 569 * @return {@code true} if acquired 570 */ doAcquireSharedNanos(long arg, long nanosTimeout)571 private boolean doAcquireSharedNanos(long arg, long nanosTimeout) 572 throws InterruptedException { 573 if (nanosTimeout <= 0L) 574 return false; 575 final long deadline = System.nanoTime() + nanosTimeout; 576 final Node node = addWaiter(Node.SHARED); 577 try { 578 for (;;) { 579 final Node p = node.predecessor(); 580 if (p == head) { 581 long r = tryAcquireShared(arg); 582 if (r >= 0) { 583 setHeadAndPropagate(node, r); 584 p.next = null; // help GC 585 return true; 586 } 587 } 588 nanosTimeout = deadline - System.nanoTime(); 589 if (nanosTimeout <= 0L) { 590 cancelAcquire(node); 591 return false; 592 } 593 if (shouldParkAfterFailedAcquire(p, node) && 594 nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) 595 LockSupport.parkNanos(this, nanosTimeout); 596 if (Thread.interrupted()) 597 throw new InterruptedException(); 598 } 599 } catch (Throwable t) { 600 cancelAcquire(node); 601 throw t; 602 } 603 } 604 605 // Main exported methods 606 607 /** 608 * Attempts to acquire in exclusive mode. This method should query 609 * if the state of the object permits it to be acquired in the 610 * exclusive mode, and if so to acquire it. 611 * 612 * <p>This method is always invoked by the thread performing 613 * acquire. If this method reports failure, the acquire method 614 * may queue the thread, if it is not already queued, until it is 615 * signalled by a release from some other thread. This can be used 616 * to implement method {@link Lock#tryLock()}. 617 * 618 * <p>The default 619 * implementation throws {@link UnsupportedOperationException}. 620 * 621 * @param arg the acquire argument. This value is always the one 622 * passed to an acquire method, or is the value saved on entry 623 * to a condition wait. The value is otherwise uninterpreted 624 * and can represent anything you like. 625 * @return {@code true} if successful. Upon success, this object has 626 * been acquired. 627 * @throws IllegalMonitorStateException if acquiring would place this 628 * synchronizer in an illegal state. This exception must be 629 * thrown in a consistent fashion for synchronization to work 630 * correctly. 631 * @throws UnsupportedOperationException if exclusive mode is not supported 632 */ tryAcquire(long arg)633 protected boolean tryAcquire(long arg) { 634 throw new UnsupportedOperationException(); 635 } 636 637 /** 638 * Attempts to set the state to reflect a release in exclusive 639 * mode. 640 * 641 * <p>This method is always invoked by the thread performing release. 642 * 643 * <p>The default implementation throws 644 * {@link UnsupportedOperationException}. 645 * 646 * @param arg the release argument. This value is always the one 647 * passed to a release method, or the current state value upon 648 * entry to a condition wait. The value is otherwise 649 * uninterpreted and can represent anything you like. 650 * @return {@code true} if this object is now in a fully released 651 * state, so that any waiting threads may attempt to acquire; 652 * and {@code false} otherwise. 653 * @throws IllegalMonitorStateException if releasing would place this 654 * synchronizer in an illegal state. This exception must be 655 * thrown in a consistent fashion for synchronization to work 656 * correctly. 657 * @throws UnsupportedOperationException if exclusive mode is not supported 658 */ tryRelease(long arg)659 protected boolean tryRelease(long arg) { 660 throw new UnsupportedOperationException(); 661 } 662 663 /** 664 * Attempts to acquire in shared mode. This method should query if 665 * the state of the object permits it to be acquired in the shared 666 * mode, and if so to acquire it. 667 * 668 * <p>This method is always invoked by the thread performing 669 * acquire. If this method reports failure, the acquire method 670 * may queue the thread, if it is not already queued, until it is 671 * signalled by a release from some other thread. 672 * 673 * <p>The default implementation throws {@link 674 * UnsupportedOperationException}. 675 * 676 * @param arg the acquire argument. This value is always the one 677 * passed to an acquire method, or is the value saved on entry 678 * to a condition wait. The value is otherwise uninterpreted 679 * and can represent anything you like. 680 * @return a negative value on failure; zero if acquisition in shared 681 * mode succeeded but no subsequent shared-mode acquire can 682 * succeed; and a positive value if acquisition in shared 683 * mode succeeded and subsequent shared-mode acquires might 684 * also succeed, in which case a subsequent waiting thread 685 * must check availability. (Support for three different 686 * return values enables this method to be used in contexts 687 * where acquires only sometimes act exclusively.) Upon 688 * success, this object has been acquired. 689 * @throws IllegalMonitorStateException if acquiring would place this 690 * synchronizer in an illegal state. This exception must be 691 * thrown in a consistent fashion for synchronization to work 692 * correctly. 693 * @throws UnsupportedOperationException if shared mode is not supported 694 */ tryAcquireShared(long arg)695 protected long tryAcquireShared(long arg) { 696 throw new UnsupportedOperationException(); 697 } 698 699 /** 700 * Attempts to set the state to reflect a release in shared mode. 701 * 702 * <p>This method is always invoked by the thread performing release. 703 * 704 * <p>The default implementation throws 705 * {@link UnsupportedOperationException}. 706 * 707 * @param arg the release argument. This value is always the one 708 * passed to a release method, or the current state value upon 709 * entry to a condition wait. The value is otherwise 710 * uninterpreted and can represent anything you like. 711 * @return {@code true} if this release of shared mode may permit a 712 * waiting acquire (shared or exclusive) to succeed; and 713 * {@code false} otherwise 714 * @throws IllegalMonitorStateException if releasing would place this 715 * synchronizer in an illegal state. This exception must be 716 * thrown in a consistent fashion for synchronization to work 717 * correctly. 718 * @throws UnsupportedOperationException if shared mode is not supported 719 */ tryReleaseShared(long arg)720 protected boolean tryReleaseShared(long arg) { 721 throw new UnsupportedOperationException(); 722 } 723 724 /** 725 * Returns {@code true} if synchronization is held exclusively with 726 * respect to the current (calling) thread. This method is invoked 727 * upon each call to a non-waiting {@link ConditionObject} method. 728 * (Waiting methods instead invoke {@link #release}.) 729 * 730 * <p>The default implementation throws {@link 731 * UnsupportedOperationException}. This method is invoked 732 * internally only within {@link ConditionObject} methods, so need 733 * not be defined if conditions are not used. 734 * 735 * @return {@code true} if synchronization is held exclusively; 736 * {@code false} otherwise 737 * @throws UnsupportedOperationException if conditions are not supported 738 */ isHeldExclusively()739 protected boolean isHeldExclusively() { 740 throw new UnsupportedOperationException(); 741 } 742 743 /** 744 * Acquires in exclusive mode, ignoring interrupts. Implemented 745 * by invoking at least once {@link #tryAcquire}, 746 * returning on success. Otherwise the thread is queued, possibly 747 * repeatedly blocking and unblocking, invoking {@link 748 * #tryAcquire} until success. This method can be used 749 * to implement method {@link Lock#lock}. 750 * 751 * @param arg the acquire argument. This value is conveyed to 752 * {@link #tryAcquire} but is otherwise uninterpreted and 753 * can represent anything you like. 754 */ acquire(long arg)755 public final void acquire(long arg) { 756 if (!tryAcquire(arg) && 757 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 758 selfInterrupt(); 759 } 760 761 /** 762 * Acquires in exclusive mode, aborting if interrupted. 763 * Implemented by first checking interrupt status, then invoking 764 * at least once {@link #tryAcquire}, returning on 765 * success. Otherwise the thread is queued, possibly repeatedly 766 * blocking and unblocking, invoking {@link #tryAcquire} 767 * until success or the thread is interrupted. This method can be 768 * used to implement method {@link Lock#lockInterruptibly}. 769 * 770 * @param arg the acquire argument. This value is conveyed to 771 * {@link #tryAcquire} but is otherwise uninterpreted and 772 * can represent anything you like. 773 * @throws InterruptedException if the current thread is interrupted 774 */ acquireInterruptibly(long arg)775 public final void acquireInterruptibly(long arg) 776 throws InterruptedException { 777 if (Thread.interrupted()) 778 throw new InterruptedException(); 779 if (!tryAcquire(arg)) 780 doAcquireInterruptibly(arg); 781 } 782 783 /** 784 * Attempts to acquire in exclusive mode, aborting if interrupted, 785 * and failing if the given timeout elapses. Implemented by first 786 * checking interrupt status, then invoking at least once {@link 787 * #tryAcquire}, returning on success. Otherwise, the thread is 788 * queued, possibly repeatedly blocking and unblocking, invoking 789 * {@link #tryAcquire} until success or the thread is interrupted 790 * or the timeout elapses. This method can be used to implement 791 * method {@link Lock#tryLock(long, TimeUnit)}. 792 * 793 * @param arg the acquire argument. This value is conveyed to 794 * {@link #tryAcquire} but is otherwise uninterpreted and 795 * can represent anything you like. 796 * @param nanosTimeout the maximum number of nanoseconds to wait 797 * @return {@code true} if acquired; {@code false} if timed out 798 * @throws InterruptedException if the current thread is interrupted 799 */ tryAcquireNanos(long arg, long nanosTimeout)800 public final boolean tryAcquireNanos(long arg, long nanosTimeout) 801 throws InterruptedException { 802 if (Thread.interrupted()) 803 throw new InterruptedException(); 804 return tryAcquire(arg) || 805 doAcquireNanos(arg, nanosTimeout); 806 } 807 808 /** 809 * Releases in exclusive mode. Implemented by unblocking one or 810 * more threads if {@link #tryRelease} returns true. 811 * This method can be used to implement method {@link Lock#unlock}. 812 * 813 * @param arg the release argument. This value is conveyed to 814 * {@link #tryRelease} but is otherwise uninterpreted and 815 * can represent anything you like. 816 * @return the value returned from {@link #tryRelease} 817 */ release(long arg)818 public final boolean release(long arg) { 819 if (tryRelease(arg)) { 820 Node h = head; 821 if (h != null && h.waitStatus != 0) 822 unparkSuccessor(h); 823 return true; 824 } 825 return false; 826 } 827 828 /** 829 * Acquires in shared mode, ignoring interrupts. Implemented by 830 * first invoking at least once {@link #tryAcquireShared}, 831 * returning on success. Otherwise the thread is queued, possibly 832 * repeatedly blocking and unblocking, invoking {@link 833 * #tryAcquireShared} until success. 834 * 835 * @param arg the acquire argument. This value is conveyed to 836 * {@link #tryAcquireShared} but is otherwise uninterpreted 837 * and can represent anything you like. 838 */ acquireShared(long arg)839 public final void acquireShared(long arg) { 840 if (tryAcquireShared(arg) < 0) 841 doAcquireShared(arg); 842 } 843 844 /** 845 * Acquires in shared mode, aborting if interrupted. Implemented 846 * by first checking interrupt status, then invoking at least once 847 * {@link #tryAcquireShared}, returning on success. Otherwise the 848 * thread is queued, possibly repeatedly blocking and unblocking, 849 * invoking {@link #tryAcquireShared} until success or the thread 850 * is interrupted. 851 * @param arg the acquire argument. 852 * This value is conveyed to {@link #tryAcquireShared} but is 853 * otherwise uninterpreted and can represent anything 854 * you like. 855 * @throws InterruptedException if the current thread is interrupted 856 */ acquireSharedInterruptibly(long arg)857 public final void acquireSharedInterruptibly(long arg) 858 throws InterruptedException { 859 if (Thread.interrupted()) 860 throw new InterruptedException(); 861 if (tryAcquireShared(arg) < 0) 862 doAcquireSharedInterruptibly(arg); 863 } 864 865 /** 866 * Attempts to acquire in shared mode, aborting if interrupted, and 867 * failing if the given timeout elapses. Implemented by first 868 * checking interrupt status, then invoking at least once {@link 869 * #tryAcquireShared}, returning on success. Otherwise, the 870 * thread is queued, possibly repeatedly blocking and unblocking, 871 * invoking {@link #tryAcquireShared} until success or the thread 872 * is interrupted or the timeout elapses. 873 * 874 * @param arg the acquire argument. This value is conveyed to 875 * {@link #tryAcquireShared} but is otherwise uninterpreted 876 * and can represent anything you like. 877 * @param nanosTimeout the maximum number of nanoseconds to wait 878 * @return {@code true} if acquired; {@code false} if timed out 879 * @throws InterruptedException if the current thread is interrupted 880 */ tryAcquireSharedNanos(long arg, long nanosTimeout)881 public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout) 882 throws InterruptedException { 883 if (Thread.interrupted()) 884 throw new InterruptedException(); 885 return tryAcquireShared(arg) >= 0 || 886 doAcquireSharedNanos(arg, nanosTimeout); 887 } 888 889 /** 890 * Releases in shared mode. Implemented by unblocking one or more 891 * threads if {@link #tryReleaseShared} returns true. 892 * 893 * @param arg the release argument. This value is conveyed to 894 * {@link #tryReleaseShared} but is otherwise uninterpreted 895 * and can represent anything you like. 896 * @return the value returned from {@link #tryReleaseShared} 897 */ releaseShared(long arg)898 public final boolean releaseShared(long arg) { 899 if (tryReleaseShared(arg)) { 900 doReleaseShared(); 901 return true; 902 } 903 return false; 904 } 905 906 // Queue inspection methods 907 908 /** 909 * Queries whether any threads are waiting to acquire. Note that 910 * because cancellations due to interrupts and timeouts may occur 911 * at any time, a {@code true} return does not guarantee that any 912 * other thread will ever acquire. 913 * 914 * <p>In this implementation, this operation returns in 915 * constant time. 916 * 917 * @return {@code true} if there may be other threads waiting to acquire 918 */ hasQueuedThreads()919 public final boolean hasQueuedThreads() { 920 return head != tail; 921 } 922 923 /** 924 * Queries whether any threads have ever contended to acquire this 925 * synchronizer; that is, if an acquire method has ever blocked. 926 * 927 * <p>In this implementation, this operation returns in 928 * constant time. 929 * 930 * @return {@code true} if there has ever been contention 931 */ hasContended()932 public final boolean hasContended() { 933 return head != null; 934 } 935 936 /** 937 * Returns the first (longest-waiting) thread in the queue, or 938 * {@code null} if no threads are currently queued. 939 * 940 * <p>In this implementation, this operation normally returns in 941 * constant time, but may iterate upon contention if other threads are 942 * concurrently modifying the queue. 943 * 944 * @return the first (longest-waiting) thread in the queue, or 945 * {@code null} if no threads are currently queued 946 */ getFirstQueuedThread()947 public final Thread getFirstQueuedThread() { 948 // handle only fast path, else relay 949 return (head == tail) ? null : fullGetFirstQueuedThread(); 950 } 951 952 /** 953 * Version of getFirstQueuedThread called when fastpath fails. 954 */ fullGetFirstQueuedThread()955 private Thread fullGetFirstQueuedThread() { 956 /* 957 * The first node is normally head.next. Try to get its 958 * thread field, ensuring consistent reads: If thread 959 * field is nulled out or s.prev is no longer head, then 960 * some other thread(s) concurrently performed setHead in 961 * between some of our reads. We try this twice before 962 * resorting to traversal. 963 */ 964 Node h, s; 965 Thread st; 966 if (((h = head) != null && (s = h.next) != null && 967 s.prev == head && (st = s.thread) != null) || 968 ((h = head) != null && (s = h.next) != null && 969 s.prev == head && (st = s.thread) != null)) 970 return st; 971 972 /* 973 * Head's next field might not have been set yet, or may have 974 * been unset after setHead. So we must check to see if tail 975 * is actually first node. If not, we continue on, safely 976 * traversing from tail back to head to find first, 977 * guaranteeing termination. 978 */ 979 980 Thread firstThread = null; 981 for (Node p = tail; p != null && p != head; p = p.prev) { 982 Thread t = p.thread; 983 if (t != null) 984 firstThread = t; 985 } 986 return firstThread; 987 } 988 989 /** 990 * Returns true if the given thread is currently queued. 991 * 992 * <p>This implementation traverses the queue to determine 993 * presence of the given thread. 994 * 995 * @param thread the thread 996 * @return {@code true} if the given thread is on the queue 997 * @throws NullPointerException if the thread is null 998 */ isQueued(Thread thread)999 public final boolean isQueued(Thread thread) { 1000 if (thread == null) 1001 throw new NullPointerException(); 1002 for (Node p = tail; p != null; p = p.prev) 1003 if (p.thread == thread) 1004 return true; 1005 return false; 1006 } 1007 1008 /** 1009 * Returns {@code true} if the apparent first queued thread, if one 1010 * exists, is waiting in exclusive mode. If this method returns 1011 * {@code true}, and the current thread is attempting to acquire in 1012 * shared mode (that is, this method is invoked from {@link 1013 * #tryAcquireShared}) then it is guaranteed that the current thread 1014 * is not the first queued thread. Used only as a heuristic in 1015 * ReentrantReadWriteLock. 1016 */ apparentlyFirstQueuedIsExclusive()1017 final boolean apparentlyFirstQueuedIsExclusive() { 1018 Node h, s; 1019 return (h = head) != null && 1020 (s = h.next) != null && 1021 !s.isShared() && 1022 s.thread != null; 1023 } 1024 1025 /** 1026 * Queries whether any threads have been waiting to acquire longer 1027 * than the current thread. 1028 * 1029 * <p>An invocation of this method is equivalent to (but may be 1030 * more efficient than): 1031 * <pre> {@code 1032 * getFirstQueuedThread() != Thread.currentThread() 1033 * && hasQueuedThreads()}</pre> 1034 * 1035 * <p>Note that because cancellations due to interrupts and 1036 * timeouts may occur at any time, a {@code true} return does not 1037 * guarantee that some other thread will acquire before the current 1038 * thread. Likewise, it is possible for another thread to win a 1039 * race to enqueue after this method has returned {@code false}, 1040 * due to the queue being empty. 1041 * 1042 * <p>This method is designed to be used by a fair synchronizer to 1043 * avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>. 1044 * Such a synchronizer's {@link #tryAcquire} method should return 1045 * {@code false}, and its {@link #tryAcquireShared} method should 1046 * return a negative value, if this method returns {@code true} 1047 * (unless this is a reentrant acquire). For example, the {@code 1048 * tryAcquire} method for a fair, reentrant, exclusive mode 1049 * synchronizer might look like this: 1050 * 1051 * <pre> {@code 1052 * protected boolean tryAcquire(int arg) { 1053 * if (isHeldExclusively()) { 1054 * // A reentrant acquire; increment hold count 1055 * return true; 1056 * } else if (hasQueuedPredecessors()) { 1057 * return false; 1058 * } else { 1059 * // try to acquire normally 1060 * } 1061 * }}</pre> 1062 * 1063 * @return {@code true} if there is a queued thread preceding the 1064 * current thread, and {@code false} if the current thread 1065 * is at the head of the queue or the queue is empty 1066 * @since 1.7 1067 */ hasQueuedPredecessors()1068 public final boolean hasQueuedPredecessors() { 1069 // The correctness of this depends on head being initialized 1070 // before tail and on head.next being accurate if the current 1071 // thread is first in queue. 1072 Node t = tail; // Read fields in reverse initialization order 1073 Node h = head; 1074 Node s; 1075 return h != t && 1076 ((s = h.next) == null || s.thread != Thread.currentThread()); 1077 } 1078 1079 1080 // Instrumentation and monitoring methods 1081 1082 /** 1083 * Returns an estimate of the number of threads waiting to 1084 * acquire. The value is only an estimate because the number of 1085 * threads may change dynamically while this method traverses 1086 * internal data structures. This method is designed for use in 1087 * monitoring system state, not for synchronization control. 1088 * 1089 * @return the estimated number of threads waiting to acquire 1090 */ getQueueLength()1091 public final int getQueueLength() { 1092 int n = 0; 1093 for (Node p = tail; p != null; p = p.prev) { 1094 if (p.thread != null) 1095 ++n; 1096 } 1097 return n; 1098 } 1099 1100 /** 1101 * Returns a collection containing threads that may be waiting to 1102 * acquire. Because the actual set of threads may change 1103 * dynamically while constructing this result, the returned 1104 * collection is only a best-effort estimate. The elements of the 1105 * returned collection are in no particular order. This method is 1106 * designed to facilitate construction of subclasses that provide 1107 * more extensive monitoring facilities. 1108 * 1109 * @return the collection of threads 1110 */ getQueuedThreads()1111 public final Collection<Thread> getQueuedThreads() { 1112 ArrayList<Thread> list = new ArrayList<>(); 1113 for (Node p = tail; p != null; p = p.prev) { 1114 Thread t = p.thread; 1115 if (t != null) 1116 list.add(t); 1117 } 1118 return list; 1119 } 1120 1121 /** 1122 * Returns a collection containing threads that may be waiting to 1123 * acquire in exclusive mode. This has the same properties 1124 * as {@link #getQueuedThreads} except that it only returns 1125 * those threads waiting due to an exclusive acquire. 1126 * 1127 * @return the collection of threads 1128 */ getExclusiveQueuedThreads()1129 public final Collection<Thread> getExclusiveQueuedThreads() { 1130 ArrayList<Thread> list = new ArrayList<>(); 1131 for (Node p = tail; p != null; p = p.prev) { 1132 if (!p.isShared()) { 1133 Thread t = p.thread; 1134 if (t != null) 1135 list.add(t); 1136 } 1137 } 1138 return list; 1139 } 1140 1141 /** 1142 * Returns a collection containing threads that may be waiting to 1143 * acquire in shared mode. This has the same properties 1144 * as {@link #getQueuedThreads} except that it only returns 1145 * those threads waiting due to a shared acquire. 1146 * 1147 * @return the collection of threads 1148 */ getSharedQueuedThreads()1149 public final Collection<Thread> getSharedQueuedThreads() { 1150 ArrayList<Thread> list = new ArrayList<>(); 1151 for (Node p = tail; p != null; p = p.prev) { 1152 if (p.isShared()) { 1153 Thread t = p.thread; 1154 if (t != null) 1155 list.add(t); 1156 } 1157 } 1158 return list; 1159 } 1160 1161 /** 1162 * Returns a string identifying this synchronizer, as well as its state. 1163 * The state, in brackets, includes the String {@code "State ="} 1164 * followed by the current value of {@link #getState}, and either 1165 * {@code "nonempty"} or {@code "empty"} depending on whether the 1166 * queue is empty. 1167 * 1168 * @return a string identifying this synchronizer, as well as its state 1169 */ toString()1170 public String toString() { 1171 return super.toString() 1172 + "[State = " + getState() + ", " 1173 + (hasQueuedThreads() ? "non" : "") + "empty queue]"; 1174 } 1175 1176 1177 // Internal support methods for Conditions 1178 1179 /** 1180 * Returns true if a node, always one that was initially placed on 1181 * a condition queue, is now waiting to reacquire on sync queue. 1182 * @param node the node 1183 * @return true if is reacquiring 1184 */ isOnSyncQueue(Node node)1185 final boolean isOnSyncQueue(Node node) { 1186 if (node.waitStatus == Node.CONDITION || node.prev == null) 1187 return false; 1188 if (node.next != null) // If has successor, it must be on queue 1189 return true; 1190 /* 1191 * node.prev can be non-null, but not yet on queue because 1192 * the CAS to place it on queue can fail. So we have to 1193 * traverse from tail to make sure it actually made it. It 1194 * will always be near the tail in calls to this method, and 1195 * unless the CAS failed (which is unlikely), it will be 1196 * there, so we hardly ever traverse much. 1197 */ 1198 return findNodeFromTail(node); 1199 } 1200 1201 /** 1202 * Returns true if node is on sync queue by searching backwards from tail. 1203 * Called only when needed by isOnSyncQueue. 1204 * @return true if present 1205 */ findNodeFromTail(Node node)1206 private boolean findNodeFromTail(Node node) { 1207 // We check for node first, since it's likely to be at or near tail. 1208 // tail is known to be non-null, so we could re-order to "save" 1209 // one null check, but we leave it this way to help the VM. 1210 for (Node p = tail;;) { 1211 if (p == node) 1212 return true; 1213 if (p == null) 1214 return false; 1215 p = p.prev; 1216 } 1217 } 1218 1219 /** 1220 * Transfers a node from a condition queue onto sync queue. 1221 * Returns true if successful. 1222 * @param node the node 1223 * @return true if successfully transferred (else the node was 1224 * cancelled before signal) 1225 */ transferForSignal(Node node)1226 final boolean transferForSignal(Node node) { 1227 /* 1228 * If cannot change waitStatus, the node has been cancelled. 1229 */ 1230 if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) 1231 return false; 1232 1233 /* 1234 * Splice onto queue and try to set waitStatus of predecessor to 1235 * indicate that thread is (probably) waiting. If cancelled or 1236 * attempt to set waitStatus fails, wake up to resync (in which 1237 * case the waitStatus can be transiently and harmlessly wrong). 1238 */ 1239 Node p = enq(node); 1240 int ws = p.waitStatus; 1241 if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) 1242 LockSupport.unpark(node.thread); 1243 return true; 1244 } 1245 1246 /** 1247 * Transfers node, if necessary, to sync queue after a cancelled wait. 1248 * Returns true if thread was cancelled before being signalled. 1249 * 1250 * @param node the node 1251 * @return true if cancelled before the node was signalled 1252 */ transferAfterCancelledWait(Node node)1253 final boolean transferAfterCancelledWait(Node node) { 1254 if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) { 1255 enq(node); 1256 return true; 1257 } 1258 /* 1259 * If we lost out to a signal(), then we can't proceed 1260 * until it finishes its enq(). Cancelling during an 1261 * incomplete transfer is both rare and transient, so just 1262 * spin. 1263 */ 1264 while (!isOnSyncQueue(node)) 1265 Thread.yield(); 1266 return false; 1267 } 1268 1269 /** 1270 * Invokes release with current state value; returns saved state. 1271 * Cancels node and throws exception on failure. 1272 * @param node the condition node for this wait 1273 * @return previous sync state 1274 */ fullyRelease(Node node)1275 final long fullyRelease(Node node) { 1276 try { 1277 long savedState = getState(); 1278 if (release(savedState)) 1279 return savedState; 1280 throw new IllegalMonitorStateException(); 1281 } catch (Throwable t) { 1282 node.waitStatus = Node.CANCELLED; 1283 throw t; 1284 } 1285 } 1286 1287 // Instrumentation methods for conditions 1288 1289 /** 1290 * Queries whether the given ConditionObject 1291 * uses this synchronizer as its lock. 1292 * 1293 * @param condition the condition 1294 * @return {@code true} if owned 1295 * @throws NullPointerException if the condition is null 1296 */ owns(ConditionObject condition)1297 public final boolean owns(ConditionObject condition) { 1298 return condition.isOwnedBy(this); 1299 } 1300 1301 /** 1302 * Queries whether any threads are waiting on the given condition 1303 * associated with this synchronizer. Note that because timeouts 1304 * and interrupts may occur at any time, a {@code true} return 1305 * does not guarantee that a future {@code signal} will awaken 1306 * any threads. This method is designed primarily for use in 1307 * monitoring of the system state. 1308 * 1309 * @param condition the condition 1310 * @return {@code true} if there are any waiting threads 1311 * @throws IllegalMonitorStateException if exclusive synchronization 1312 * is not held 1313 * @throws IllegalArgumentException if the given condition is 1314 * not associated with this synchronizer 1315 * @throws NullPointerException if the condition is null 1316 */ hasWaiters(ConditionObject condition)1317 public final boolean hasWaiters(ConditionObject condition) { 1318 if (!owns(condition)) 1319 throw new IllegalArgumentException("Not owner"); 1320 return condition.hasWaiters(); 1321 } 1322 1323 /** 1324 * Returns an estimate of the number of threads waiting on the 1325 * given condition associated with this synchronizer. Note that 1326 * because timeouts and interrupts may occur at any time, the 1327 * estimate serves only as an upper bound on the actual number of 1328 * waiters. This method is designed for use in monitoring system 1329 * state, not for synchronization control. 1330 * 1331 * @param condition the condition 1332 * @return the estimated number of waiting threads 1333 * @throws IllegalMonitorStateException if exclusive synchronization 1334 * is not held 1335 * @throws IllegalArgumentException if the given condition is 1336 * not associated with this synchronizer 1337 * @throws NullPointerException if the condition is null 1338 */ getWaitQueueLength(ConditionObject condition)1339 public final int getWaitQueueLength(ConditionObject condition) { 1340 if (!owns(condition)) 1341 throw new IllegalArgumentException("Not owner"); 1342 return condition.getWaitQueueLength(); 1343 } 1344 1345 /** 1346 * Returns a collection containing those threads that may be 1347 * waiting on the given condition associated with this 1348 * synchronizer. Because the actual set of threads may change 1349 * dynamically while constructing this result, the returned 1350 * collection is only a best-effort estimate. The elements of the 1351 * returned collection are in no particular order. 1352 * 1353 * @param condition the condition 1354 * @return the collection of threads 1355 * @throws IllegalMonitorStateException if exclusive synchronization 1356 * is not held 1357 * @throws IllegalArgumentException if the given condition is 1358 * not associated with this synchronizer 1359 * @throws NullPointerException if the condition is null 1360 */ getWaitingThreads(ConditionObject condition)1361 public final Collection<Thread> getWaitingThreads(ConditionObject condition) { 1362 if (!owns(condition)) 1363 throw new IllegalArgumentException("Not owner"); 1364 return condition.getWaitingThreads(); 1365 } 1366 1367 /** 1368 * Condition implementation for a {@link 1369 * AbstractQueuedLongSynchronizer} serving as the basis of a {@link 1370 * Lock} implementation. 1371 * 1372 * <p>Method documentation for this class describes mechanics, 1373 * not behavioral specifications from the point of view of Lock 1374 * and Condition users. Exported versions of this class will in 1375 * general need to be accompanied by documentation describing 1376 * condition semantics that rely on those of the associated 1377 * {@code AbstractQueuedLongSynchronizer}. 1378 * 1379 * <p>This class is Serializable, but all fields are transient, 1380 * so deserialized conditions have no waiters. 1381 * 1382 * @since 1.6 1383 */ 1384 public class ConditionObject implements Condition, java.io.Serializable { 1385 private static final long serialVersionUID = 1173984872572414699L; 1386 /** First node of condition queue. */ 1387 private transient Node firstWaiter; 1388 /** Last node of condition queue. */ 1389 private transient Node lastWaiter; 1390 1391 /** 1392 * Creates a new {@code ConditionObject} instance. 1393 */ ConditionObject()1394 public ConditionObject() { } 1395 1396 // Internal methods 1397 1398 /** 1399 * Adds a new waiter to wait queue. 1400 * @return its new wait node 1401 */ addConditionWaiter()1402 private Node addConditionWaiter() { 1403 Node t = lastWaiter; 1404 // If lastWaiter is cancelled, clean out. 1405 if (t != null && t.waitStatus != Node.CONDITION) { 1406 unlinkCancelledWaiters(); 1407 t = lastWaiter; 1408 } 1409 1410 Node node = new Node(Node.CONDITION); 1411 1412 if (t == null) 1413 firstWaiter = node; 1414 else 1415 t.nextWaiter = node; 1416 lastWaiter = node; 1417 return node; 1418 } 1419 1420 /** 1421 * Removes and transfers nodes until hit non-cancelled one or 1422 * null. Split out from signal in part to encourage compilers 1423 * to inline the case of no waiters. 1424 * @param first (non-null) the first node on condition queue 1425 */ doSignal(Node first)1426 private void doSignal(Node first) { 1427 do { 1428 if ( (firstWaiter = first.nextWaiter) == null) 1429 lastWaiter = null; 1430 first.nextWaiter = null; 1431 } while (!transferForSignal(first) && 1432 (first = firstWaiter) != null); 1433 } 1434 1435 /** 1436 * Removes and transfers all nodes. 1437 * @param first (non-null) the first node on condition queue 1438 */ doSignalAll(Node first)1439 private void doSignalAll(Node first) { 1440 lastWaiter = firstWaiter = null; 1441 do { 1442 Node next = first.nextWaiter; 1443 first.nextWaiter = null; 1444 transferForSignal(first); 1445 first = next; 1446 } while (first != null); 1447 } 1448 1449 /** 1450 * Unlinks cancelled waiter nodes from condition queue. 1451 * Called only while holding lock. This is called when 1452 * cancellation occurred during condition wait, and upon 1453 * insertion of a new waiter when lastWaiter is seen to have 1454 * been cancelled. This method is needed to avoid garbage 1455 * retention in the absence of signals. So even though it may 1456 * require a full traversal, it comes into play only when 1457 * timeouts or cancellations occur in the absence of 1458 * signals. It traverses all nodes rather than stopping at a 1459 * particular target to unlink all pointers to garbage nodes 1460 * without requiring many re-traversals during cancellation 1461 * storms. 1462 */ unlinkCancelledWaiters()1463 private void unlinkCancelledWaiters() { 1464 Node t = firstWaiter; 1465 Node trail = null; 1466 while (t != null) { 1467 Node next = t.nextWaiter; 1468 if (t.waitStatus != Node.CONDITION) { 1469 t.nextWaiter = null; 1470 if (trail == null) 1471 firstWaiter = next; 1472 else 1473 trail.nextWaiter = next; 1474 if (next == null) 1475 lastWaiter = trail; 1476 } 1477 else 1478 trail = t; 1479 t = next; 1480 } 1481 } 1482 1483 // public methods 1484 1485 /** 1486 * Moves the longest-waiting thread, if one exists, from the 1487 * wait queue for this condition to the wait queue for the 1488 * owning lock. 1489 * 1490 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1491 * returns {@code false} 1492 */ signal()1493 public final void signal() { 1494 if (!isHeldExclusively()) 1495 throw new IllegalMonitorStateException(); 1496 Node first = firstWaiter; 1497 if (first != null) 1498 doSignal(first); 1499 } 1500 1501 /** 1502 * Moves all threads from the wait queue for this condition to 1503 * the wait queue for the owning lock. 1504 * 1505 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1506 * returns {@code false} 1507 */ signalAll()1508 public final void signalAll() { 1509 if (!isHeldExclusively()) 1510 throw new IllegalMonitorStateException(); 1511 Node first = firstWaiter; 1512 if (first != null) 1513 doSignalAll(first); 1514 } 1515 1516 /** 1517 * Implements uninterruptible condition wait. 1518 * <ol> 1519 * <li>Save lock state returned by {@link #getState}. 1520 * <li>Invoke {@link #release} with saved state as argument, 1521 * throwing IllegalMonitorStateException if it fails. 1522 * <li>Block until signalled. 1523 * <li>Reacquire by invoking specialized version of 1524 * {@link #acquire} with saved state as argument. 1525 * </ol> 1526 */ awaitUninterruptibly()1527 public final void awaitUninterruptibly() { 1528 Node node = addConditionWaiter(); 1529 long savedState = fullyRelease(node); 1530 boolean interrupted = false; 1531 while (!isOnSyncQueue(node)) { 1532 LockSupport.park(this); 1533 if (Thread.interrupted()) 1534 interrupted = true; 1535 } 1536 if (acquireQueued(node, savedState) || interrupted) 1537 selfInterrupt(); 1538 } 1539 1540 /* 1541 * For interruptible waits, we need to track whether to throw 1542 * InterruptedException, if interrupted while blocked on 1543 * condition, versus reinterrupt current thread, if 1544 * interrupted while blocked waiting to re-acquire. 1545 */ 1546 1547 /** Mode meaning to reinterrupt on exit from wait */ 1548 private static final int REINTERRUPT = 1; 1549 /** Mode meaning to throw InterruptedException on exit from wait */ 1550 private static final int THROW_IE = -1; 1551 1552 /** 1553 * Checks for interrupt, returning THROW_IE if interrupted 1554 * before signalled, REINTERRUPT if after signalled, or 1555 * 0 if not interrupted. 1556 */ checkInterruptWhileWaiting(Node node)1557 private int checkInterruptWhileWaiting(Node node) { 1558 return Thread.interrupted() ? 1559 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 1560 0; 1561 } 1562 1563 /** 1564 * Throws InterruptedException, reinterrupts current thread, or 1565 * does nothing, depending on mode. 1566 */ reportInterruptAfterWait(int interruptMode)1567 private void reportInterruptAfterWait(int interruptMode) 1568 throws InterruptedException { 1569 if (interruptMode == THROW_IE) 1570 throw new InterruptedException(); 1571 else if (interruptMode == REINTERRUPT) 1572 selfInterrupt(); 1573 } 1574 1575 /** 1576 * Implements interruptible condition wait. 1577 * <ol> 1578 * <li>If current thread is interrupted, throw InterruptedException. 1579 * <li>Save lock state returned by {@link #getState}. 1580 * <li>Invoke {@link #release} with saved state as argument, 1581 * throwing IllegalMonitorStateException if it fails. 1582 * <li>Block until signalled or interrupted. 1583 * <li>Reacquire by invoking specialized version of 1584 * {@link #acquire} with saved state as argument. 1585 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1586 * </ol> 1587 */ await()1588 public final void await() throws InterruptedException { 1589 if (Thread.interrupted()) 1590 throw new InterruptedException(); 1591 Node node = addConditionWaiter(); 1592 long savedState = fullyRelease(node); 1593 int interruptMode = 0; 1594 while (!isOnSyncQueue(node)) { 1595 LockSupport.park(this); 1596 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1597 break; 1598 } 1599 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1600 interruptMode = REINTERRUPT; 1601 if (node.nextWaiter != null) // clean up if cancelled 1602 unlinkCancelledWaiters(); 1603 if (interruptMode != 0) 1604 reportInterruptAfterWait(interruptMode); 1605 } 1606 1607 /** 1608 * Implements timed condition wait. 1609 * <ol> 1610 * <li>If current thread is interrupted, throw InterruptedException. 1611 * <li>Save lock state returned by {@link #getState}. 1612 * <li>Invoke {@link #release} with saved state as argument, 1613 * throwing IllegalMonitorStateException if it fails. 1614 * <li>Block until signalled, interrupted, or timed out. 1615 * <li>Reacquire by invoking specialized version of 1616 * {@link #acquire} with saved state as argument. 1617 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1618 * </ol> 1619 */ awaitNanos(long nanosTimeout)1620 public final long awaitNanos(long nanosTimeout) 1621 throws InterruptedException { 1622 if (Thread.interrupted()) 1623 throw new InterruptedException(); 1624 // We don't check for nanosTimeout <= 0L here, to allow 1625 // awaitNanos(0) as a way to "yield the lock". 1626 final long deadline = System.nanoTime() + nanosTimeout; 1627 long initialNanos = nanosTimeout; 1628 Node node = addConditionWaiter(); 1629 long savedState = fullyRelease(node); 1630 int interruptMode = 0; 1631 while (!isOnSyncQueue(node)) { 1632 if (nanosTimeout <= 0L) { 1633 transferAfterCancelledWait(node); 1634 break; 1635 } 1636 if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) 1637 LockSupport.parkNanos(this, nanosTimeout); 1638 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1639 break; 1640 nanosTimeout = deadline - System.nanoTime(); 1641 } 1642 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1643 interruptMode = REINTERRUPT; 1644 if (node.nextWaiter != null) 1645 unlinkCancelledWaiters(); 1646 if (interruptMode != 0) 1647 reportInterruptAfterWait(interruptMode); 1648 long remaining = deadline - System.nanoTime(); // avoid overflow 1649 return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE; 1650 } 1651 1652 /** 1653 * Implements absolute timed condition wait. 1654 * <ol> 1655 * <li>If current thread is interrupted, throw InterruptedException. 1656 * <li>Save lock state returned by {@link #getState}. 1657 * <li>Invoke {@link #release} with saved state as argument, 1658 * throwing IllegalMonitorStateException if it fails. 1659 * <li>Block until signalled, interrupted, or timed out. 1660 * <li>Reacquire by invoking specialized version of 1661 * {@link #acquire} with saved state as argument. 1662 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1663 * <li>If timed out while blocked in step 4, return false, else true. 1664 * </ol> 1665 */ awaitUntil(Date deadline)1666 public final boolean awaitUntil(Date deadline) 1667 throws InterruptedException { 1668 long abstime = deadline.getTime(); 1669 if (Thread.interrupted()) 1670 throw new InterruptedException(); 1671 Node node = addConditionWaiter(); 1672 long savedState = fullyRelease(node); 1673 boolean timedout = false; 1674 int interruptMode = 0; 1675 while (!isOnSyncQueue(node)) { 1676 if (System.currentTimeMillis() >= abstime) { 1677 timedout = transferAfterCancelledWait(node); 1678 break; 1679 } 1680 LockSupport.parkUntil(this, abstime); 1681 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1682 break; 1683 } 1684 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1685 interruptMode = REINTERRUPT; 1686 if (node.nextWaiter != null) 1687 unlinkCancelledWaiters(); 1688 if (interruptMode != 0) 1689 reportInterruptAfterWait(interruptMode); 1690 return !timedout; 1691 } 1692 1693 /** 1694 * Implements timed condition wait. 1695 * <ol> 1696 * <li>If current thread is interrupted, throw InterruptedException. 1697 * <li>Save lock state returned by {@link #getState}. 1698 * <li>Invoke {@link #release} with saved state as argument, 1699 * throwing IllegalMonitorStateException if it fails. 1700 * <li>Block until signalled, interrupted, or timed out. 1701 * <li>Reacquire by invoking specialized version of 1702 * {@link #acquire} with saved state as argument. 1703 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1704 * <li>If timed out while blocked in step 4, return false, else true. 1705 * </ol> 1706 */ await(long time, TimeUnit unit)1707 public final boolean await(long time, TimeUnit unit) 1708 throws InterruptedException { 1709 long nanosTimeout = unit.toNanos(time); 1710 if (Thread.interrupted()) 1711 throw new InterruptedException(); 1712 // We don't check for nanosTimeout <= 0L here, to allow 1713 // await(0, unit) as a way to "yield the lock". 1714 final long deadline = System.nanoTime() + nanosTimeout; 1715 Node node = addConditionWaiter(); 1716 long savedState = fullyRelease(node); 1717 boolean timedout = false; 1718 int interruptMode = 0; 1719 while (!isOnSyncQueue(node)) { 1720 if (nanosTimeout <= 0L) { 1721 timedout = transferAfterCancelledWait(node); 1722 break; 1723 } 1724 if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) 1725 LockSupport.parkNanos(this, nanosTimeout); 1726 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1727 break; 1728 nanosTimeout = deadline - System.nanoTime(); 1729 } 1730 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1731 interruptMode = REINTERRUPT; 1732 if (node.nextWaiter != null) 1733 unlinkCancelledWaiters(); 1734 if (interruptMode != 0) 1735 reportInterruptAfterWait(interruptMode); 1736 return !timedout; 1737 } 1738 1739 // support for instrumentation 1740 1741 /** 1742 * Returns true if this condition was created by the given 1743 * synchronization object. 1744 * 1745 * @return {@code true} if owned 1746 */ isOwnedBy(AbstractQueuedLongSynchronizer sync)1747 final boolean isOwnedBy(AbstractQueuedLongSynchronizer sync) { 1748 return sync == AbstractQueuedLongSynchronizer.this; 1749 } 1750 1751 /** 1752 * Queries whether any threads are waiting on this condition. 1753 * Implements {@link AbstractQueuedLongSynchronizer#hasWaiters(ConditionObject)}. 1754 * 1755 * @return {@code true} if there are any waiting threads 1756 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1757 * returns {@code false} 1758 */ hasWaiters()1759 protected final boolean hasWaiters() { 1760 if (!isHeldExclusively()) 1761 throw new IllegalMonitorStateException(); 1762 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 1763 if (w.waitStatus == Node.CONDITION) 1764 return true; 1765 } 1766 return false; 1767 } 1768 1769 /** 1770 * Returns an estimate of the number of threads waiting on 1771 * this condition. 1772 * Implements {@link AbstractQueuedLongSynchronizer#getWaitQueueLength(ConditionObject)}. 1773 * 1774 * @return the estimated number of waiting threads 1775 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1776 * returns {@code false} 1777 */ getWaitQueueLength()1778 protected final int getWaitQueueLength() { 1779 if (!isHeldExclusively()) 1780 throw new IllegalMonitorStateException(); 1781 int n = 0; 1782 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 1783 if (w.waitStatus == Node.CONDITION) 1784 ++n; 1785 } 1786 return n; 1787 } 1788 1789 /** 1790 * Returns a collection containing those threads that may be 1791 * waiting on this Condition. 1792 * Implements {@link AbstractQueuedLongSynchronizer#getWaitingThreads(ConditionObject)}. 1793 * 1794 * @return the collection of threads 1795 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1796 * returns {@code false} 1797 */ getWaitingThreads()1798 protected final Collection<Thread> getWaitingThreads() { 1799 if (!isHeldExclusively()) 1800 throw new IllegalMonitorStateException(); 1801 ArrayList<Thread> list = new ArrayList<>(); 1802 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 1803 if (w.waitStatus == Node.CONDITION) { 1804 Thread t = w.thread; 1805 if (t != null) 1806 list.add(t); 1807 } 1808 } 1809 return list; 1810 } 1811 } 1812 1813 /** 1814 * Setup to support compareAndSet. We need to natively implement 1815 * this here: For the sake of permitting future enhancements, we 1816 * cannot explicitly subclass AtomicLong, which would be 1817 * efficient and useful otherwise. So, as the lesser of evils, we 1818 * natively implement using hotspot intrinsics API. And while we 1819 * are at it, we do the same for other CASable fields (which could 1820 * otherwise be done with atomic field updaters). 1821 */ 1822 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); 1823 private static final long STATE; 1824 private static final long HEAD; 1825 private static final long TAIL; 1826 1827 static { 1828 try { 1829 STATE = U.objectFieldOffset 1830 (AbstractQueuedLongSynchronizer.class.getDeclaredField("state")); 1831 HEAD = U.objectFieldOffset 1832 (AbstractQueuedLongSynchronizer.class.getDeclaredField("head")); 1833 TAIL = U.objectFieldOffset 1834 (AbstractQueuedLongSynchronizer.class.getDeclaredField("tail")); 1835 } catch (ReflectiveOperationException e) { 1836 throw new Error(e); 1837 } 1838 1839 // Reduce the risk of rare disastrous classloading in first call to 1840 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 1841 Class<?> ensureLoaded = LockSupport.class; 1842 } 1843 1844 /** 1845 * Initializes head and tail fields on first contention. 1846 */ initializeSyncQueue()1847 private final void initializeSyncQueue() { 1848 Node h; 1849 if (U.compareAndSwapObject(this, HEAD, null, (h = new Node()))) 1850 tail = h; 1851 } 1852 1853 /** 1854 * CASes tail field. 1855 */ compareAndSetTail(Node expect, Node update)1856 private final boolean compareAndSetTail(Node expect, Node update) { 1857 return U.compareAndSwapObject(this, TAIL, expect, update); 1858 } 1859 } 1860