1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/publicdomain/zero/1.0/
5  */
6 
7 package java.util.concurrent.locks;
8 
9 import java.util.ArrayList;
10 import java.util.Collection;
11 import java.util.Date;
12 import java.util.concurrent.TimeUnit;
13 import java.util.concurrent.locks.AbstractQueuedSynchronizer.Node;
14 
15 /**
16  * A version of {@link AbstractQueuedSynchronizer} in
17  * which synchronization state is maintained as a {@code long}.
18  * This class has exactly the same structure, properties, and methods
19  * as {@code AbstractQueuedSynchronizer} with the exception
20  * that all state-related parameters and results are defined
21  * as {@code long} rather than {@code int}. This class
22  * may be useful when creating synchronizers such as
23  * multilevel locks and barriers that require
24  * 64 bits of state.
25  *
26  * <p>See {@link AbstractQueuedSynchronizer} for usage
27  * notes and examples.
28  *
29  * @since 1.6
30  * @author Doug Lea
31  */
32 public abstract class AbstractQueuedLongSynchronizer
33     extends AbstractOwnableSynchronizer
34     implements java.io.Serializable {
35 
36     private static final long serialVersionUID = 7373984972572414692L;
37 
38     /*
39       To keep sources in sync, the remainder of this source file is
40       exactly cloned from AbstractQueuedSynchronizer, replacing class
41       name and changing ints related with sync state to longs. Please
42       keep it that way.
43     */
44 
45     /**
46      * Creates a new {@code AbstractQueuedLongSynchronizer} instance
47      * with initial synchronization state of zero.
48      */
AbstractQueuedLongSynchronizer()49     protected AbstractQueuedLongSynchronizer() { }
50 
51     /**
52      * Head of the wait queue, lazily initialized.  Except for
53      * initialization, it is modified only via method setHead.  Note:
54      * If head exists, its waitStatus is guaranteed not to be
55      * CANCELLED.
56      */
57     private transient volatile Node head;
58 
59     /**
60      * Tail of the wait queue, lazily initialized.  Modified only via
61      * method enq to add new wait node.
62      */
63     private transient volatile Node tail;
64 
65     /**
66      * The synchronization state.
67      */
68     private volatile long state;
69 
70     /**
71      * Returns the current value of synchronization state.
72      * This operation has memory semantics of a {@code volatile} read.
73      * @return current state value
74      */
getState()75     protected final long getState() {
76         return state;
77     }
78 
79     /**
80      * Sets the value of synchronization state.
81      * This operation has memory semantics of a {@code volatile} write.
82      * @param newState the new state value
83      */
setState(long newState)84     protected final void setState(long newState) {
85         // Use putLongVolatile instead of ordinary volatile store when
86         // using compareAndSwapLong, for sake of some 32bit systems.
87         U.putLongVolatile(this, STATE, newState);
88     }
89 
90     /**
91      * Atomically sets synchronization state to the given updated
92      * value if the current state value equals the expected value.
93      * This operation has memory semantics of a {@code volatile} read
94      * and write.
95      *
96      * @param expect the expected value
97      * @param update the new value
98      * @return {@code true} if successful. False return indicates that the actual
99      *         value was not equal to the expected value.
100      */
compareAndSetState(long expect, long update)101     protected final boolean compareAndSetState(long expect, long update) {
102         return U.compareAndSwapLong(this, STATE, expect, update);
103     }
104 
105     // Queuing utilities
106 
107     /**
108      * The number of nanoseconds for which it is faster to spin
109      * rather than to use timed park. A rough estimate suffices
110      * to improve responsiveness with very short timeouts.
111      */
112     static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
113 
114     /**
115      * Inserts node into queue, initializing if necessary. See picture above.
116      * @param node the node to insert
117      * @return node's predecessor
118      */
enq(Node node)119     private Node enq(Node node) {
120         for (;;) {
121             Node oldTail = tail;
122             if (oldTail != null) {
123                 U.putObject(node, Node.PREV, oldTail);
124                 if (compareAndSetTail(oldTail, node)) {
125                     oldTail.next = node;
126                     return oldTail;
127                 }
128             } else {
129                 initializeSyncQueue();
130             }
131         }
132     }
133 
134     /**
135      * Creates and enqueues node for current thread and given mode.
136      *
137      * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
138      * @return the new node
139      */
addWaiter(Node mode)140     private Node addWaiter(Node mode) {
141         Node node = new Node(mode);
142 
143         for (;;) {
144             Node oldTail = tail;
145             if (oldTail != null) {
146                 U.putObject(node, Node.PREV, oldTail);
147                 if (compareAndSetTail(oldTail, node)) {
148                     oldTail.next = node;
149                     return node;
150                 }
151             } else {
152                 initializeSyncQueue();
153             }
154         }
155     }
156 
157     /**
158      * Sets head of queue to be node, thus dequeuing. Called only by
159      * acquire methods.  Also nulls out unused fields for sake of GC
160      * and to suppress unnecessary signals and traversals.
161      *
162      * @param node the node
163      */
setHead(Node node)164     private void setHead(Node node) {
165         head = node;
166         node.thread = null;
167         node.prev = null;
168     }
169 
170     /**
171      * Wakes up node's successor, if one exists.
172      *
173      * @param node the node
174      */
unparkSuccessor(Node node)175     private void unparkSuccessor(Node node) {
176         /*
177          * If status is negative (i.e., possibly needing signal) try
178          * to clear in anticipation of signalling.  It is OK if this
179          * fails or if status is changed by waiting thread.
180          */
181         int ws = node.waitStatus;
182         if (ws < 0)
183             node.compareAndSetWaitStatus(ws, 0);
184 
185         /*
186          * Thread to unpark is held in successor, which is normally
187          * just the next node.  But if cancelled or apparently null,
188          * traverse backwards from tail to find the actual
189          * non-cancelled successor.
190          */
191         Node s = node.next;
192         if (s == null || s.waitStatus > 0) {
193             s = null;
194             for (Node p = tail; p != node && p != null; p = p.prev)
195                 if (p.waitStatus <= 0)
196                     s = p;
197         }
198         if (s != null)
199             LockSupport.unpark(s.thread);
200     }
201 
202     /**
203      * Release action for shared mode -- signals successor and ensures
204      * propagation. (Note: For exclusive mode, release just amounts
205      * to calling unparkSuccessor of head if it needs signal.)
206      */
doReleaseShared()207     private void doReleaseShared() {
208         /*
209          * Ensure that a release propagates, even if there are other
210          * in-progress acquires/releases.  This proceeds in the usual
211          * way of trying to unparkSuccessor of head if it needs
212          * signal. But if it does not, status is set to PROPAGATE to
213          * ensure that upon release, propagation continues.
214          * Additionally, we must loop in case a new node is added
215          * while we are doing this. Also, unlike other uses of
216          * unparkSuccessor, we need to know if CAS to reset status
217          * fails, if so rechecking.
218          */
219         for (;;) {
220             Node h = head;
221             if (h != null && h != tail) {
222                 int ws = h.waitStatus;
223                 if (ws == Node.SIGNAL) {
224                     if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
225                         continue;            // loop to recheck cases
226                     unparkSuccessor(h);
227                 }
228                 else if (ws == 0 &&
229                          !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
230                     continue;                // loop on failed CAS
231             }
232             if (h == head)                   // loop if head changed
233                 break;
234         }
235     }
236 
237     /**
238      * Sets head of queue, and checks if successor may be waiting
239      * in shared mode, if so propagating if either propagate > 0 or
240      * PROPAGATE status was set.
241      *
242      * @param node the node
243      * @param propagate the return value from a tryAcquireShared
244      */
setHeadAndPropagate(Node node, long propagate)245     private void setHeadAndPropagate(Node node, long propagate) {
246         Node h = head; // Record old head for check below
247         setHead(node);
248         /*
249          * Try to signal next queued node if:
250          *   Propagation was indicated by caller,
251          *     or was recorded (as h.waitStatus either before
252          *     or after setHead) by a previous operation
253          *     (note: this uses sign-check of waitStatus because
254          *      PROPAGATE status may transition to SIGNAL.)
255          * and
256          *   The next node is waiting in shared mode,
257          *     or we don't know, because it appears null
258          *
259          * The conservatism in both of these checks may cause
260          * unnecessary wake-ups, but only when there are multiple
261          * racing acquires/releases, so most need signals now or soon
262          * anyway.
263          */
264         if (propagate > 0 || h == null || h.waitStatus < 0 ||
265             (h = head) == null || h.waitStatus < 0) {
266             Node s = node.next;
267             if (s == null || s.isShared())
268                 doReleaseShared();
269         }
270     }
271 
272     // Utilities for various versions of acquire
273 
274     /**
275      * Cancels an ongoing attempt to acquire.
276      *
277      * @param node the node
278      */
cancelAcquire(Node node)279     private void cancelAcquire(Node node) {
280         // Ignore if node doesn't exist
281         if (node == null)
282             return;
283 
284         node.thread = null;
285 
286         // Skip cancelled predecessors
287         Node pred = node.prev;
288         while (pred.waitStatus > 0)
289             node.prev = pred = pred.prev;
290 
291         // predNext is the apparent node to unsplice. CASes below will
292         // fail if not, in which case, we lost race vs another cancel
293         // or signal, so no further action is necessary.
294         Node predNext = pred.next;
295 
296         // Can use unconditional write instead of CAS here.
297         // After this atomic step, other Nodes can skip past us.
298         // Before, we are free of interference from other threads.
299         node.waitStatus = Node.CANCELLED;
300 
301         // If we are the tail, remove ourselves.
302         if (node == tail && compareAndSetTail(node, pred)) {
303             pred.compareAndSetNext(predNext, null);
304         } else {
305             // If successor needs signal, try to set pred's next-link
306             // so it will get one. Otherwise wake it up to propagate.
307             int ws;
308             if (pred != head &&
309                 ((ws = pred.waitStatus) == Node.SIGNAL ||
310                  (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
311                 pred.thread != null) {
312                 Node next = node.next;
313                 if (next != null && next.waitStatus <= 0)
314                     pred.compareAndSetNext(predNext, next);
315             } else {
316                 unparkSuccessor(node);
317             }
318 
319             node.next = node; // help GC
320         }
321     }
322 
323     /**
324      * Checks and updates status for a node that failed to acquire.
325      * Returns true if thread should block. This is the main signal
326      * control in all acquire loops.  Requires that pred == node.prev.
327      *
328      * @param pred node's predecessor holding status
329      * @param node the node
330      * @return {@code true} if thread should block
331      */
shouldParkAfterFailedAcquire(Node pred, Node node)332     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
333         int ws = pred.waitStatus;
334         if (ws == Node.SIGNAL)
335             /*
336              * This node has already set status asking a release
337              * to signal it, so it can safely park.
338              */
339             return true;
340         if (ws > 0) {
341             /*
342              * Predecessor was cancelled. Skip over predecessors and
343              * indicate retry.
344              */
345             do {
346                 node.prev = pred = pred.prev;
347             } while (pred.waitStatus > 0);
348             pred.next = node;
349         } else {
350             /*
351              * waitStatus must be 0 or PROPAGATE.  Indicate that we
352              * need a signal, but don't park yet.  Caller will need to
353              * retry to make sure it cannot acquire before parking.
354              */
355             pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
356         }
357         return false;
358     }
359 
360     /**
361      * Convenience method to interrupt current thread.
362      */
selfInterrupt()363     static void selfInterrupt() {
364         Thread.currentThread().interrupt();
365     }
366 
367     /**
368      * Convenience method to park and then check if interrupted.
369      *
370      * @return {@code true} if interrupted
371      */
parkAndCheckInterrupt()372     private final boolean parkAndCheckInterrupt() {
373         LockSupport.park(this);
374         return Thread.interrupted();
375     }
376 
377     /*
378      * Various flavors of acquire, varying in exclusive/shared and
379      * control modes.  Each is mostly the same, but annoyingly
380      * different.  Only a little bit of factoring is possible due to
381      * interactions of exception mechanics (including ensuring that we
382      * cancel if tryAcquire throws exception) and other control, at
383      * least not without hurting performance too much.
384      */
385 
386     /**
387      * Acquires in exclusive uninterruptible mode for thread already in
388      * queue. Used by condition wait methods as well as acquire.
389      *
390      * @param node the node
391      * @param arg the acquire argument
392      * @return {@code true} if interrupted while waiting
393      */
acquireQueued(final Node node, long arg)394     final boolean acquireQueued(final Node node, long arg) {
395         try {
396             boolean interrupted = false;
397             for (;;) {
398                 final Node p = node.predecessor();
399                 if (p == head && tryAcquire(arg)) {
400                     setHead(node);
401                     p.next = null; // help GC
402                     return interrupted;
403                 }
404                 if (shouldParkAfterFailedAcquire(p, node) &&
405                     parkAndCheckInterrupt())
406                     interrupted = true;
407             }
408         } catch (Throwable t) {
409             cancelAcquire(node);
410             throw t;
411         }
412     }
413 
414     /**
415      * Acquires in exclusive interruptible mode.
416      * @param arg the acquire argument
417      */
doAcquireInterruptibly(long arg)418     private void doAcquireInterruptibly(long arg)
419         throws InterruptedException {
420         final Node node = addWaiter(Node.EXCLUSIVE);
421         try {
422             for (;;) {
423                 final Node p = node.predecessor();
424                 if (p == head && tryAcquire(arg)) {
425                     setHead(node);
426                     p.next = null; // help GC
427                     return;
428                 }
429                 if (shouldParkAfterFailedAcquire(p, node) &&
430                     parkAndCheckInterrupt())
431                     throw new InterruptedException();
432             }
433         } catch (Throwable t) {
434             cancelAcquire(node);
435             throw t;
436         }
437     }
438 
439     /**
440      * Acquires in exclusive timed mode.
441      *
442      * @param arg the acquire argument
443      * @param nanosTimeout max wait time
444      * @return {@code true} if acquired
445      */
doAcquireNanos(long arg, long nanosTimeout)446     private boolean doAcquireNanos(long arg, long nanosTimeout)
447             throws InterruptedException {
448         if (nanosTimeout <= 0L)
449             return false;
450         final long deadline = System.nanoTime() + nanosTimeout;
451         final Node node = addWaiter(Node.EXCLUSIVE);
452         try {
453             for (;;) {
454                 final Node p = node.predecessor();
455                 if (p == head && tryAcquire(arg)) {
456                     setHead(node);
457                     p.next = null; // help GC
458                     return true;
459                 }
460                 nanosTimeout = deadline - System.nanoTime();
461                 if (nanosTimeout <= 0L) {
462                     cancelAcquire(node);
463                     return false;
464                 }
465                 if (shouldParkAfterFailedAcquire(p, node) &&
466                     nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
467                     LockSupport.parkNanos(this, nanosTimeout);
468                 if (Thread.interrupted())
469                     throw new InterruptedException();
470             }
471         } catch (Throwable t) {
472             cancelAcquire(node);
473             throw t;
474         }
475     }
476 
477     /**
478      * Acquires in shared uninterruptible mode.
479      * @param arg the acquire argument
480      */
doAcquireShared(long arg)481     private void doAcquireShared(long arg) {
482         final Node node = addWaiter(Node.SHARED);
483         try {
484             boolean interrupted = false;
485             for (;;) {
486                 final Node p = node.predecessor();
487                 if (p == head) {
488                     long r = tryAcquireShared(arg);
489                     if (r >= 0) {
490                         setHeadAndPropagate(node, r);
491                         p.next = null; // help GC
492                         if (interrupted)
493                             selfInterrupt();
494                         return;
495                     }
496                 }
497                 if (shouldParkAfterFailedAcquire(p, node) &&
498                     parkAndCheckInterrupt())
499                     interrupted = true;
500             }
501         } catch (Throwable t) {
502             cancelAcquire(node);
503             throw t;
504         }
505     }
506 
507     /**
508      * Acquires in shared interruptible mode.
509      * @param arg the acquire argument
510      */
doAcquireSharedInterruptibly(long arg)511     private void doAcquireSharedInterruptibly(long arg)
512         throws InterruptedException {
513         final Node node = addWaiter(Node.SHARED);
514         try {
515             for (;;) {
516                 final Node p = node.predecessor();
517                 if (p == head) {
518                     long r = tryAcquireShared(arg);
519                     if (r >= 0) {
520                         setHeadAndPropagate(node, r);
521                         p.next = null; // help GC
522                         return;
523                     }
524                 }
525                 if (shouldParkAfterFailedAcquire(p, node) &&
526                     parkAndCheckInterrupt())
527                     throw new InterruptedException();
528             }
529         } catch (Throwable t) {
530             cancelAcquire(node);
531             throw t;
532         }
533     }
534 
535     /**
536      * Acquires in shared timed mode.
537      *
538      * @param arg the acquire argument
539      * @param nanosTimeout max wait time
540      * @return {@code true} if acquired
541      */
doAcquireSharedNanos(long arg, long nanosTimeout)542     private boolean doAcquireSharedNanos(long arg, long nanosTimeout)
543             throws InterruptedException {
544         if (nanosTimeout <= 0L)
545             return false;
546         final long deadline = System.nanoTime() + nanosTimeout;
547         final Node node = addWaiter(Node.SHARED);
548         try {
549             for (;;) {
550                 final Node p = node.predecessor();
551                 if (p == head) {
552                     long r = tryAcquireShared(arg);
553                     if (r >= 0) {
554                         setHeadAndPropagate(node, r);
555                         p.next = null; // help GC
556                         return true;
557                     }
558                 }
559                 nanosTimeout = deadline - System.nanoTime();
560                 if (nanosTimeout <= 0L) {
561                     cancelAcquire(node);
562                     return false;
563                 }
564                 if (shouldParkAfterFailedAcquire(p, node) &&
565                     nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
566                     LockSupport.parkNanos(this, nanosTimeout);
567                 if (Thread.interrupted())
568                     throw new InterruptedException();
569             }
570         } catch (Throwable t) {
571             cancelAcquire(node);
572             throw t;
573         }
574     }
575 
576     // Main exported methods
577 
578     /**
579      * Attempts to acquire in exclusive mode. This method should query
580      * if the state of the object permits it to be acquired in the
581      * exclusive mode, and if so to acquire it.
582      *
583      * <p>This method is always invoked by the thread performing
584      * acquire.  If this method reports failure, the acquire method
585      * may queue the thread, if it is not already queued, until it is
586      * signalled by a release from some other thread. This can be used
587      * to implement method {@link Lock#tryLock()}.
588      *
589      * <p>The default
590      * implementation throws {@link UnsupportedOperationException}.
591      *
592      * @param arg the acquire argument. This value is always the one
593      *        passed to an acquire method, or is the value saved on entry
594      *        to a condition wait.  The value is otherwise uninterpreted
595      *        and can represent anything you like.
596      * @return {@code true} if successful. Upon success, this object has
597      *         been acquired.
598      * @throws IllegalMonitorStateException if acquiring would place this
599      *         synchronizer in an illegal state. This exception must be
600      *         thrown in a consistent fashion for synchronization to work
601      *         correctly.
602      * @throws UnsupportedOperationException if exclusive mode is not supported
603      */
tryAcquire(long arg)604     protected boolean tryAcquire(long arg) {
605         throw new UnsupportedOperationException();
606     }
607 
608     /**
609      * Attempts to set the state to reflect a release in exclusive
610      * mode.
611      *
612      * <p>This method is always invoked by the thread performing release.
613      *
614      * <p>The default implementation throws
615      * {@link UnsupportedOperationException}.
616      *
617      * @param arg the release argument. This value is always the one
618      *        passed to a release method, or the current state value upon
619      *        entry to a condition wait.  The value is otherwise
620      *        uninterpreted and can represent anything you like.
621      * @return {@code true} if this object is now in a fully released
622      *         state, so that any waiting threads may attempt to acquire;
623      *         and {@code false} otherwise.
624      * @throws IllegalMonitorStateException if releasing would place this
625      *         synchronizer in an illegal state. This exception must be
626      *         thrown in a consistent fashion for synchronization to work
627      *         correctly.
628      * @throws UnsupportedOperationException if exclusive mode is not supported
629      */
tryRelease(long arg)630     protected boolean tryRelease(long arg) {
631         throw new UnsupportedOperationException();
632     }
633 
634     /**
635      * Attempts to acquire in shared mode. This method should query if
636      * the state of the object permits it to be acquired in the shared
637      * mode, and if so to acquire it.
638      *
639      * <p>This method is always invoked by the thread performing
640      * acquire.  If this method reports failure, the acquire method
641      * may queue the thread, if it is not already queued, until it is
642      * signalled by a release from some other thread.
643      *
644      * <p>The default implementation throws {@link
645      * UnsupportedOperationException}.
646      *
647      * @param arg the acquire argument. This value is always the one
648      *        passed to an acquire method, or is the value saved on entry
649      *        to a condition wait.  The value is otherwise uninterpreted
650      *        and can represent anything you like.
651      * @return a negative value on failure; zero if acquisition in shared
652      *         mode succeeded but no subsequent shared-mode acquire can
653      *         succeed; and a positive value if acquisition in shared
654      *         mode succeeded and subsequent shared-mode acquires might
655      *         also succeed, in which case a subsequent waiting thread
656      *         must check availability. (Support for three different
657      *         return values enables this method to be used in contexts
658      *         where acquires only sometimes act exclusively.)  Upon
659      *         success, this object has been acquired.
660      * @throws IllegalMonitorStateException if acquiring would place this
661      *         synchronizer in an illegal state. This exception must be
662      *         thrown in a consistent fashion for synchronization to work
663      *         correctly.
664      * @throws UnsupportedOperationException if shared mode is not supported
665      */
tryAcquireShared(long arg)666     protected long tryAcquireShared(long arg) {
667         throw new UnsupportedOperationException();
668     }
669 
670     /**
671      * Attempts to set the state to reflect a release in shared mode.
672      *
673      * <p>This method is always invoked by the thread performing release.
674      *
675      * <p>The default implementation throws
676      * {@link UnsupportedOperationException}.
677      *
678      * @param arg the release argument. This value is always the one
679      *        passed to a release method, or the current state value upon
680      *        entry to a condition wait.  The value is otherwise
681      *        uninterpreted and can represent anything you like.
682      * @return {@code true} if this release of shared mode may permit a
683      *         waiting acquire (shared or exclusive) to succeed; and
684      *         {@code false} otherwise
685      * @throws IllegalMonitorStateException if releasing would place this
686      *         synchronizer in an illegal state. This exception must be
687      *         thrown in a consistent fashion for synchronization to work
688      *         correctly.
689      * @throws UnsupportedOperationException if shared mode is not supported
690      */
tryReleaseShared(long arg)691     protected boolean tryReleaseShared(long arg) {
692         throw new UnsupportedOperationException();
693     }
694 
695     /**
696      * Returns {@code true} if synchronization is held exclusively with
697      * respect to the current (calling) thread.  This method is invoked
698      * upon each call to a non-waiting {@link ConditionObject} method.
699      * (Waiting methods instead invoke {@link #release}.)
700      *
701      * <p>The default implementation throws {@link
702      * UnsupportedOperationException}. This method is invoked
703      * internally only within {@link ConditionObject} methods, so need
704      * not be defined if conditions are not used.
705      *
706      * @return {@code true} if synchronization is held exclusively;
707      *         {@code false} otherwise
708      * @throws UnsupportedOperationException if conditions are not supported
709      */
isHeldExclusively()710     protected boolean isHeldExclusively() {
711         throw new UnsupportedOperationException();
712     }
713 
714     /**
715      * Acquires in exclusive mode, ignoring interrupts.  Implemented
716      * by invoking at least once {@link #tryAcquire},
717      * returning on success.  Otherwise the thread is queued, possibly
718      * repeatedly blocking and unblocking, invoking {@link
719      * #tryAcquire} until success.  This method can be used
720      * to implement method {@link Lock#lock}.
721      *
722      * @param arg the acquire argument.  This value is conveyed to
723      *        {@link #tryAcquire} but is otherwise uninterpreted and
724      *        can represent anything you like.
725      */
acquire(long arg)726     public final void acquire(long arg) {
727         if (!tryAcquire(arg) &&
728             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
729             selfInterrupt();
730     }
731 
732     /**
733      * Acquires in exclusive mode, aborting if interrupted.
734      * Implemented by first checking interrupt status, then invoking
735      * at least once {@link #tryAcquire}, returning on
736      * success.  Otherwise the thread is queued, possibly repeatedly
737      * blocking and unblocking, invoking {@link #tryAcquire}
738      * until success or the thread is interrupted.  This method can be
739      * used to implement method {@link Lock#lockInterruptibly}.
740      *
741      * @param arg the acquire argument.  This value is conveyed to
742      *        {@link #tryAcquire} but is otherwise uninterpreted and
743      *        can represent anything you like.
744      * @throws InterruptedException if the current thread is interrupted
745      */
acquireInterruptibly(long arg)746     public final void acquireInterruptibly(long arg)
747             throws InterruptedException {
748         if (Thread.interrupted())
749             throw new InterruptedException();
750         if (!tryAcquire(arg))
751             doAcquireInterruptibly(arg);
752     }
753 
754     /**
755      * Attempts to acquire in exclusive mode, aborting if interrupted,
756      * and failing if the given timeout elapses.  Implemented by first
757      * checking interrupt status, then invoking at least once {@link
758      * #tryAcquire}, returning on success.  Otherwise, the thread is
759      * queued, possibly repeatedly blocking and unblocking, invoking
760      * {@link #tryAcquire} until success or the thread is interrupted
761      * or the timeout elapses.  This method can be used to implement
762      * method {@link Lock#tryLock(long, TimeUnit)}.
763      *
764      * @param arg the acquire argument.  This value is conveyed to
765      *        {@link #tryAcquire} but is otherwise uninterpreted and
766      *        can represent anything you like.
767      * @param nanosTimeout the maximum number of nanoseconds to wait
768      * @return {@code true} if acquired; {@code false} if timed out
769      * @throws InterruptedException if the current thread is interrupted
770      */
tryAcquireNanos(long arg, long nanosTimeout)771     public final boolean tryAcquireNanos(long arg, long nanosTimeout)
772             throws InterruptedException {
773         if (Thread.interrupted())
774             throw new InterruptedException();
775         return tryAcquire(arg) ||
776             doAcquireNanos(arg, nanosTimeout);
777     }
778 
779     /**
780      * Releases in exclusive mode.  Implemented by unblocking one or
781      * more threads if {@link #tryRelease} returns true.
782      * This method can be used to implement method {@link Lock#unlock}.
783      *
784      * @param arg the release argument.  This value is conveyed to
785      *        {@link #tryRelease} but is otherwise uninterpreted and
786      *        can represent anything you like.
787      * @return the value returned from {@link #tryRelease}
788      */
release(long arg)789     public final boolean release(long arg) {
790         if (tryRelease(arg)) {
791             Node h = head;
792             if (h != null && h.waitStatus != 0)
793                 unparkSuccessor(h);
794             return true;
795         }
796         return false;
797     }
798 
799     /**
800      * Acquires in shared mode, ignoring interrupts.  Implemented by
801      * first invoking at least once {@link #tryAcquireShared},
802      * returning on success.  Otherwise the thread is queued, possibly
803      * repeatedly blocking and unblocking, invoking {@link
804      * #tryAcquireShared} until success.
805      *
806      * @param arg the acquire argument.  This value is conveyed to
807      *        {@link #tryAcquireShared} but is otherwise uninterpreted
808      *        and can represent anything you like.
809      */
acquireShared(long arg)810     public final void acquireShared(long arg) {
811         if (tryAcquireShared(arg) < 0)
812             doAcquireShared(arg);
813     }
814 
815     /**
816      * Acquires in shared mode, aborting if interrupted.  Implemented
817      * by first checking interrupt status, then invoking at least once
818      * {@link #tryAcquireShared}, returning on success.  Otherwise the
819      * thread is queued, possibly repeatedly blocking and unblocking,
820      * invoking {@link #tryAcquireShared} until success or the thread
821      * is interrupted.
822      * @param arg the acquire argument.
823      * This value is conveyed to {@link #tryAcquireShared} but is
824      * otherwise uninterpreted and can represent anything
825      * you like.
826      * @throws InterruptedException if the current thread is interrupted
827      */
acquireSharedInterruptibly(long arg)828     public final void acquireSharedInterruptibly(long arg)
829             throws InterruptedException {
830         if (Thread.interrupted())
831             throw new InterruptedException();
832         if (tryAcquireShared(arg) < 0)
833             doAcquireSharedInterruptibly(arg);
834     }
835 
836     /**
837      * Attempts to acquire in shared mode, aborting if interrupted, and
838      * failing if the given timeout elapses.  Implemented by first
839      * checking interrupt status, then invoking at least once {@link
840      * #tryAcquireShared}, returning on success.  Otherwise, the
841      * thread is queued, possibly repeatedly blocking and unblocking,
842      * invoking {@link #tryAcquireShared} until success or the thread
843      * is interrupted or the timeout elapses.
844      *
845      * @param arg the acquire argument.  This value is conveyed to
846      *        {@link #tryAcquireShared} but is otherwise uninterpreted
847      *        and can represent anything you like.
848      * @param nanosTimeout the maximum number of nanoseconds to wait
849      * @return {@code true} if acquired; {@code false} if timed out
850      * @throws InterruptedException if the current thread is interrupted
851      */
tryAcquireSharedNanos(long arg, long nanosTimeout)852     public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout)
853             throws InterruptedException {
854         if (Thread.interrupted())
855             throw new InterruptedException();
856         return tryAcquireShared(arg) >= 0 ||
857             doAcquireSharedNanos(arg, nanosTimeout);
858     }
859 
860     /**
861      * Releases in shared mode.  Implemented by unblocking one or more
862      * threads if {@link #tryReleaseShared} returns true.
863      *
864      * @param arg the release argument.  This value is conveyed to
865      *        {@link #tryReleaseShared} but is otherwise uninterpreted
866      *        and can represent anything you like.
867      * @return the value returned from {@link #tryReleaseShared}
868      */
releaseShared(long arg)869     public final boolean releaseShared(long arg) {
870         if (tryReleaseShared(arg)) {
871             doReleaseShared();
872             return true;
873         }
874         return false;
875     }
876 
877     // Queue inspection methods
878 
879     /**
880      * Queries whether any threads are waiting to acquire. Note that
881      * because cancellations due to interrupts and timeouts may occur
882      * at any time, a {@code true} return does not guarantee that any
883      * other thread will ever acquire.
884      *
885      * <p>In this implementation, this operation returns in
886      * constant time.
887      *
888      * @return {@code true} if there may be other threads waiting to acquire
889      */
hasQueuedThreads()890     public final boolean hasQueuedThreads() {
891         return head != tail;
892     }
893 
894     /**
895      * Queries whether any threads have ever contended to acquire this
896      * synchronizer; that is, if an acquire method has ever blocked.
897      *
898      * <p>In this implementation, this operation returns in
899      * constant time.
900      *
901      * @return {@code true} if there has ever been contention
902      */
hasContended()903     public final boolean hasContended() {
904         return head != null;
905     }
906 
907     /**
908      * Returns the first (longest-waiting) thread in the queue, or
909      * {@code null} if no threads are currently queued.
910      *
911      * <p>In this implementation, this operation normally returns in
912      * constant time, but may iterate upon contention if other threads are
913      * concurrently modifying the queue.
914      *
915      * @return the first (longest-waiting) thread in the queue, or
916      *         {@code null} if no threads are currently queued
917      */
getFirstQueuedThread()918     public final Thread getFirstQueuedThread() {
919         // handle only fast path, else relay
920         return (head == tail) ? null : fullGetFirstQueuedThread();
921     }
922 
923     /**
924      * Version of getFirstQueuedThread called when fastpath fails.
925      */
fullGetFirstQueuedThread()926     private Thread fullGetFirstQueuedThread() {
927         /*
928          * The first node is normally head.next. Try to get its
929          * thread field, ensuring consistent reads: If thread
930          * field is nulled out or s.prev is no longer head, then
931          * some other thread(s) concurrently performed setHead in
932          * between some of our reads. We try this twice before
933          * resorting to traversal.
934          */
935         Node h, s;
936         Thread st;
937         if (((h = head) != null && (s = h.next) != null &&
938              s.prev == head && (st = s.thread) != null) ||
939             ((h = head) != null && (s = h.next) != null &&
940              s.prev == head && (st = s.thread) != null))
941             return st;
942 
943         /*
944          * Head's next field might not have been set yet, or may have
945          * been unset after setHead. So we must check to see if tail
946          * is actually first node. If not, we continue on, safely
947          * traversing from tail back to head to find first,
948          * guaranteeing termination.
949          */
950 
951         Thread firstThread = null;
952         for (Node p = tail; p != null && p != head; p = p.prev) {
953             Thread t = p.thread;
954             if (t != null)
955                 firstThread = t;
956         }
957         return firstThread;
958     }
959 
960     /**
961      * Returns true if the given thread is currently queued.
962      *
963      * <p>This implementation traverses the queue to determine
964      * presence of the given thread.
965      *
966      * @param thread the thread
967      * @return {@code true} if the given thread is on the queue
968      * @throws NullPointerException if the thread is null
969      */
isQueued(Thread thread)970     public final boolean isQueued(Thread thread) {
971         if (thread == null)
972             throw new NullPointerException();
973         for (Node p = tail; p != null; p = p.prev)
974             if (p.thread == thread)
975                 return true;
976         return false;
977     }
978 
979     /**
980      * Returns {@code true} if the apparent first queued thread, if one
981      * exists, is waiting in exclusive mode.  If this method returns
982      * {@code true}, and the current thread is attempting to acquire in
983      * shared mode (that is, this method is invoked from {@link
984      * #tryAcquireShared}) then it is guaranteed that the current thread
985      * is not the first queued thread.  Used only as a heuristic in
986      * ReentrantReadWriteLock.
987      */
apparentlyFirstQueuedIsExclusive()988     final boolean apparentlyFirstQueuedIsExclusive() {
989         Node h, s;
990         return (h = head) != null &&
991             (s = h.next)  != null &&
992             !s.isShared()         &&
993             s.thread != null;
994     }
995 
996     /**
997      * Queries whether any threads have been waiting to acquire longer
998      * than the current thread.
999      *
1000      * <p>An invocation of this method is equivalent to (but may be
1001      * more efficient than):
1002      * <pre> {@code
1003      * getFirstQueuedThread() != Thread.currentThread()
1004      *   && hasQueuedThreads()}</pre>
1005      *
1006      * <p>Note that because cancellations due to interrupts and
1007      * timeouts may occur at any time, a {@code true} return does not
1008      * guarantee that some other thread will acquire before the current
1009      * thread.  Likewise, it is possible for another thread to win a
1010      * race to enqueue after this method has returned {@code false},
1011      * due to the queue being empty.
1012      *
1013      * <p>This method is designed to be used by a fair synchronizer to
1014      * avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>.
1015      * Such a synchronizer's {@link #tryAcquire} method should return
1016      * {@code false}, and its {@link #tryAcquireShared} method should
1017      * return a negative value, if this method returns {@code true}
1018      * (unless this is a reentrant acquire).  For example, the {@code
1019      * tryAcquire} method for a fair, reentrant, exclusive mode
1020      * synchronizer might look like this:
1021      *
1022      * <pre> {@code
1023      * protected boolean tryAcquire(int arg) {
1024      *   if (isHeldExclusively()) {
1025      *     // A reentrant acquire; increment hold count
1026      *     return true;
1027      *   } else if (hasQueuedPredecessors()) {
1028      *     return false;
1029      *   } else {
1030      *     // try to acquire normally
1031      *   }
1032      * }}</pre>
1033      *
1034      * @return {@code true} if there is a queued thread preceding the
1035      *         current thread, and {@code false} if the current thread
1036      *         is at the head of the queue or the queue is empty
1037      * @since 1.7
1038      */
hasQueuedPredecessors()1039     public final boolean hasQueuedPredecessors() {
1040         // The correctness of this depends on head being initialized
1041         // before tail and on head.next being accurate if the current
1042         // thread is first in queue.
1043         Node t = tail; // Read fields in reverse initialization order
1044         Node h = head;
1045         Node s;
1046         return h != t &&
1047             ((s = h.next) == null || s.thread != Thread.currentThread());
1048     }
1049 
1050 
1051     // Instrumentation and monitoring methods
1052 
1053     /**
1054      * Returns an estimate of the number of threads waiting to
1055      * acquire.  The value is only an estimate because the number of
1056      * threads may change dynamically while this method traverses
1057      * internal data structures.  This method is designed for use in
1058      * monitoring system state, not for synchronization control.
1059      *
1060      * @return the estimated number of threads waiting to acquire
1061      */
getQueueLength()1062     public final int getQueueLength() {
1063         int n = 0;
1064         for (Node p = tail; p != null; p = p.prev) {
1065             if (p.thread != null)
1066                 ++n;
1067         }
1068         return n;
1069     }
1070 
1071     /**
1072      * Returns a collection containing threads that may be waiting to
1073      * acquire.  Because the actual set of threads may change
1074      * dynamically while constructing this result, the returned
1075      * collection is only a best-effort estimate.  The elements of the
1076      * returned collection are in no particular order.  This method is
1077      * designed to facilitate construction of subclasses that provide
1078      * more extensive monitoring facilities.
1079      *
1080      * @return the collection of threads
1081      */
getQueuedThreads()1082     public final Collection<Thread> getQueuedThreads() {
1083         ArrayList<Thread> list = new ArrayList<>();
1084         for (Node p = tail; p != null; p = p.prev) {
1085             Thread t = p.thread;
1086             if (t != null)
1087                 list.add(t);
1088         }
1089         return list;
1090     }
1091 
1092     /**
1093      * Returns a collection containing threads that may be waiting to
1094      * acquire in exclusive mode. This has the same properties
1095      * as {@link #getQueuedThreads} except that it only returns
1096      * those threads waiting due to an exclusive acquire.
1097      *
1098      * @return the collection of threads
1099      */
getExclusiveQueuedThreads()1100     public final Collection<Thread> getExclusiveQueuedThreads() {
1101         ArrayList<Thread> list = new ArrayList<>();
1102         for (Node p = tail; p != null; p = p.prev) {
1103             if (!p.isShared()) {
1104                 Thread t = p.thread;
1105                 if (t != null)
1106                     list.add(t);
1107             }
1108         }
1109         return list;
1110     }
1111 
1112     /**
1113      * Returns a collection containing threads that may be waiting to
1114      * acquire in shared mode. This has the same properties
1115      * as {@link #getQueuedThreads} except that it only returns
1116      * those threads waiting due to a shared acquire.
1117      *
1118      * @return the collection of threads
1119      */
getSharedQueuedThreads()1120     public final Collection<Thread> getSharedQueuedThreads() {
1121         ArrayList<Thread> list = new ArrayList<>();
1122         for (Node p = tail; p != null; p = p.prev) {
1123             if (p.isShared()) {
1124                 Thread t = p.thread;
1125                 if (t != null)
1126                     list.add(t);
1127             }
1128         }
1129         return list;
1130     }
1131 
1132     /**
1133      * Returns a string identifying this synchronizer, as well as its state.
1134      * The state, in brackets, includes the String {@code "State ="}
1135      * followed by the current value of {@link #getState}, and either
1136      * {@code "nonempty"} or {@code "empty"} depending on whether the
1137      * queue is empty.
1138      *
1139      * @return a string identifying this synchronizer, as well as its state
1140      */
toString()1141     public String toString() {
1142         return super.toString()
1143             + "[State = " + getState() + ", "
1144             + (hasQueuedThreads() ? "non" : "") + "empty queue]";
1145     }
1146 
1147 
1148     // Internal support methods for Conditions
1149 
1150     /**
1151      * Returns true if a node, always one that was initially placed on
1152      * a condition queue, is now waiting to reacquire on sync queue.
1153      * @param node the node
1154      * @return true if is reacquiring
1155      */
isOnSyncQueue(Node node)1156     final boolean isOnSyncQueue(Node node) {
1157         if (node.waitStatus == Node.CONDITION || node.prev == null)
1158             return false;
1159         if (node.next != null) // If has successor, it must be on queue
1160             return true;
1161         /*
1162          * node.prev can be non-null, but not yet on queue because
1163          * the CAS to place it on queue can fail. So we have to
1164          * traverse from tail to make sure it actually made it.  It
1165          * will always be near the tail in calls to this method, and
1166          * unless the CAS failed (which is unlikely), it will be
1167          * there, so we hardly ever traverse much.
1168          */
1169         return findNodeFromTail(node);
1170     }
1171 
1172     /**
1173      * Returns true if node is on sync queue by searching backwards from tail.
1174      * Called only when needed by isOnSyncQueue.
1175      * @return true if present
1176      */
findNodeFromTail(Node node)1177     private boolean findNodeFromTail(Node node) {
1178         // We check for node first, since it's likely to be at or near tail.
1179         // tail is known to be non-null, so we could re-order to "save"
1180         // one null check, but we leave it this way to help the VM.
1181         for (Node p = tail;;) {
1182             if (p == node)
1183                 return true;
1184             if (p == null)
1185                 return false;
1186             p = p.prev;
1187         }
1188     }
1189 
1190     /**
1191      * Transfers a node from a condition queue onto sync queue.
1192      * Returns true if successful.
1193      * @param node the node
1194      * @return true if successfully transferred (else the node was
1195      * cancelled before signal)
1196      */
transferForSignal(Node node)1197     final boolean transferForSignal(Node node) {
1198         /*
1199          * If cannot change waitStatus, the node has been cancelled.
1200          */
1201         if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
1202             return false;
1203 
1204         /*
1205          * Splice onto queue and try to set waitStatus of predecessor to
1206          * indicate that thread is (probably) waiting. If cancelled or
1207          * attempt to set waitStatus fails, wake up to resync (in which
1208          * case the waitStatus can be transiently and harmlessly wrong).
1209          */
1210         Node p = enq(node);
1211         int ws = p.waitStatus;
1212         if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
1213             LockSupport.unpark(node.thread);
1214         return true;
1215     }
1216 
1217     /**
1218      * Transfers node, if necessary, to sync queue after a cancelled wait.
1219      * Returns true if thread was cancelled before being signalled.
1220      *
1221      * @param node the node
1222      * @return true if cancelled before the node was signalled
1223      */
transferAfterCancelledWait(Node node)1224     final boolean transferAfterCancelledWait(Node node) {
1225         if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
1226             enq(node);
1227             return true;
1228         }
1229         /*
1230          * If we lost out to a signal(), then we can't proceed
1231          * until it finishes its enq().  Cancelling during an
1232          * incomplete transfer is both rare and transient, so just
1233          * spin.
1234          */
1235         while (!isOnSyncQueue(node))
1236             Thread.yield();
1237         return false;
1238     }
1239 
1240     /**
1241      * Invokes release with current state value; returns saved state.
1242      * Cancels node and throws exception on failure.
1243      * @param node the condition node for this wait
1244      * @return previous sync state
1245      */
fullyRelease(Node node)1246     final long fullyRelease(Node node) {
1247         try {
1248             long savedState = getState();
1249             if (release(savedState))
1250                 return savedState;
1251             throw new IllegalMonitorStateException();
1252         } catch (Throwable t) {
1253             node.waitStatus = Node.CANCELLED;
1254             throw t;
1255         }
1256     }
1257 
1258     // Instrumentation methods for conditions
1259 
1260     /**
1261      * Queries whether the given ConditionObject
1262      * uses this synchronizer as its lock.
1263      *
1264      * @param condition the condition
1265      * @return {@code true} if owned
1266      * @throws NullPointerException if the condition is null
1267      */
owns(ConditionObject condition)1268     public final boolean owns(ConditionObject condition) {
1269         return condition.isOwnedBy(this);
1270     }
1271 
1272     /**
1273      * Queries whether any threads are waiting on the given condition
1274      * associated with this synchronizer. Note that because timeouts
1275      * and interrupts may occur at any time, a {@code true} return
1276      * does not guarantee that a future {@code signal} will awaken
1277      * any threads.  This method is designed primarily for use in
1278      * monitoring of the system state.
1279      *
1280      * @param condition the condition
1281      * @return {@code true} if there are any waiting threads
1282      * @throws IllegalMonitorStateException if exclusive synchronization
1283      *         is not held
1284      * @throws IllegalArgumentException if the given condition is
1285      *         not associated with this synchronizer
1286      * @throws NullPointerException if the condition is null
1287      */
hasWaiters(ConditionObject condition)1288     public final boolean hasWaiters(ConditionObject condition) {
1289         if (!owns(condition))
1290             throw new IllegalArgumentException("Not owner");
1291         return condition.hasWaiters();
1292     }
1293 
1294     /**
1295      * Returns an estimate of the number of threads waiting on the
1296      * given condition associated with this synchronizer. Note that
1297      * because timeouts and interrupts may occur at any time, the
1298      * estimate serves only as an upper bound on the actual number of
1299      * waiters.  This method is designed for use in monitoring system
1300      * state, not for synchronization control.
1301      *
1302      * @param condition the condition
1303      * @return the estimated number of waiting threads
1304      * @throws IllegalMonitorStateException if exclusive synchronization
1305      *         is not held
1306      * @throws IllegalArgumentException if the given condition is
1307      *         not associated with this synchronizer
1308      * @throws NullPointerException if the condition is null
1309      */
getWaitQueueLength(ConditionObject condition)1310     public final int getWaitQueueLength(ConditionObject condition) {
1311         if (!owns(condition))
1312             throw new IllegalArgumentException("Not owner");
1313         return condition.getWaitQueueLength();
1314     }
1315 
1316     /**
1317      * Returns a collection containing those threads that may be
1318      * waiting on the given condition associated with this
1319      * synchronizer.  Because the actual set of threads may change
1320      * dynamically while constructing this result, the returned
1321      * collection is only a best-effort estimate. The elements of the
1322      * returned collection are in no particular order.
1323      *
1324      * @param condition the condition
1325      * @return the collection of threads
1326      * @throws IllegalMonitorStateException if exclusive synchronization
1327      *         is not held
1328      * @throws IllegalArgumentException if the given condition is
1329      *         not associated with this synchronizer
1330      * @throws NullPointerException if the condition is null
1331      */
getWaitingThreads(ConditionObject condition)1332     public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
1333         if (!owns(condition))
1334             throw new IllegalArgumentException("Not owner");
1335         return condition.getWaitingThreads();
1336     }
1337 
1338     /**
1339      * Condition implementation for a {@link
1340      * AbstractQueuedLongSynchronizer} serving as the basis of a {@link
1341      * Lock} implementation.
1342      *
1343      * <p>Method documentation for this class describes mechanics,
1344      * not behavioral specifications from the point of view of Lock
1345      * and Condition users. Exported versions of this class will in
1346      * general need to be accompanied by documentation describing
1347      * condition semantics that rely on those of the associated
1348      * {@code AbstractQueuedLongSynchronizer}.
1349      *
1350      * <p>This class is Serializable, but all fields are transient,
1351      * so deserialized conditions have no waiters.
1352      *
1353      * @since 1.6
1354      */
1355     public class ConditionObject implements Condition, java.io.Serializable {
1356         private static final long serialVersionUID = 1173984872572414699L;
1357         /** First node of condition queue. */
1358         private transient Node firstWaiter;
1359         /** Last node of condition queue. */
1360         private transient Node lastWaiter;
1361 
1362         /**
1363          * Creates a new {@code ConditionObject} instance.
1364          */
ConditionObject()1365         public ConditionObject() { }
1366 
1367         // Internal methods
1368 
1369         /**
1370          * Adds a new waiter to wait queue.
1371          * @return its new wait node
1372          */
addConditionWaiter()1373         private Node addConditionWaiter() {
1374             Node t = lastWaiter;
1375             // If lastWaiter is cancelled, clean out.
1376             if (t != null && t.waitStatus != Node.CONDITION) {
1377                 unlinkCancelledWaiters();
1378                 t = lastWaiter;
1379             }
1380 
1381             Node node = new Node(Node.CONDITION);
1382 
1383             if (t == null)
1384                 firstWaiter = node;
1385             else
1386                 t.nextWaiter = node;
1387             lastWaiter = node;
1388             return node;
1389         }
1390 
1391         /**
1392          * Removes and transfers nodes until hit non-cancelled one or
1393          * null. Split out from signal in part to encourage compilers
1394          * to inline the case of no waiters.
1395          * @param first (non-null) the first node on condition queue
1396          */
doSignal(Node first)1397         private void doSignal(Node first) {
1398             do {
1399                 if ( (firstWaiter = first.nextWaiter) == null)
1400                     lastWaiter = null;
1401                 first.nextWaiter = null;
1402             } while (!transferForSignal(first) &&
1403                      (first = firstWaiter) != null);
1404         }
1405 
1406         /**
1407          * Removes and transfers all nodes.
1408          * @param first (non-null) the first node on condition queue
1409          */
doSignalAll(Node first)1410         private void doSignalAll(Node first) {
1411             lastWaiter = firstWaiter = null;
1412             do {
1413                 Node next = first.nextWaiter;
1414                 first.nextWaiter = null;
1415                 transferForSignal(first);
1416                 first = next;
1417             } while (first != null);
1418         }
1419 
1420         /**
1421          * Unlinks cancelled waiter nodes from condition queue.
1422          * Called only while holding lock. This is called when
1423          * cancellation occurred during condition wait, and upon
1424          * insertion of a new waiter when lastWaiter is seen to have
1425          * been cancelled. This method is needed to avoid garbage
1426          * retention in the absence of signals. So even though it may
1427          * require a full traversal, it comes into play only when
1428          * timeouts or cancellations occur in the absence of
1429          * signals. It traverses all nodes rather than stopping at a
1430          * particular target to unlink all pointers to garbage nodes
1431          * without requiring many re-traversals during cancellation
1432          * storms.
1433          */
unlinkCancelledWaiters()1434         private void unlinkCancelledWaiters() {
1435             Node t = firstWaiter;
1436             Node trail = null;
1437             while (t != null) {
1438                 Node next = t.nextWaiter;
1439                 if (t.waitStatus != Node.CONDITION) {
1440                     t.nextWaiter = null;
1441                     if (trail == null)
1442                         firstWaiter = next;
1443                     else
1444                         trail.nextWaiter = next;
1445                     if (next == null)
1446                         lastWaiter = trail;
1447                 }
1448                 else
1449                     trail = t;
1450                 t = next;
1451             }
1452         }
1453 
1454         // public methods
1455 
1456         /**
1457          * Moves the longest-waiting thread, if one exists, from the
1458          * wait queue for this condition to the wait queue for the
1459          * owning lock.
1460          *
1461          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1462          *         returns {@code false}
1463          */
signal()1464         public final void signal() {
1465             if (!isHeldExclusively())
1466                 throw new IllegalMonitorStateException();
1467             Node first = firstWaiter;
1468             if (first != null)
1469                 doSignal(first);
1470         }
1471 
1472         /**
1473          * Moves all threads from the wait queue for this condition to
1474          * the wait queue for the owning lock.
1475          *
1476          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1477          *         returns {@code false}
1478          */
signalAll()1479         public final void signalAll() {
1480             if (!isHeldExclusively())
1481                 throw new IllegalMonitorStateException();
1482             Node first = firstWaiter;
1483             if (first != null)
1484                 doSignalAll(first);
1485         }
1486 
1487         /**
1488          * Implements uninterruptible condition wait.
1489          * <ol>
1490          * <li>Save lock state returned by {@link #getState}.
1491          * <li>Invoke {@link #release} with saved state as argument,
1492          *     throwing IllegalMonitorStateException if it fails.
1493          * <li>Block until signalled.
1494          * <li>Reacquire by invoking specialized version of
1495          *     {@link #acquire} with saved state as argument.
1496          * </ol>
1497          */
awaitUninterruptibly()1498         public final void awaitUninterruptibly() {
1499             Node node = addConditionWaiter();
1500             long savedState = fullyRelease(node);
1501             boolean interrupted = false;
1502             while (!isOnSyncQueue(node)) {
1503                 LockSupport.park(this);
1504                 if (Thread.interrupted())
1505                     interrupted = true;
1506             }
1507             if (acquireQueued(node, savedState) || interrupted)
1508                 selfInterrupt();
1509         }
1510 
1511         /*
1512          * For interruptible waits, we need to track whether to throw
1513          * InterruptedException, if interrupted while blocked on
1514          * condition, versus reinterrupt current thread, if
1515          * interrupted while blocked waiting to re-acquire.
1516          */
1517 
1518         /** Mode meaning to reinterrupt on exit from wait */
1519         private static final int REINTERRUPT =  1;
1520         /** Mode meaning to throw InterruptedException on exit from wait */
1521         private static final int THROW_IE    = -1;
1522 
1523         /**
1524          * Checks for interrupt, returning THROW_IE if interrupted
1525          * before signalled, REINTERRUPT if after signalled, or
1526          * 0 if not interrupted.
1527          */
checkInterruptWhileWaiting(Node node)1528         private int checkInterruptWhileWaiting(Node node) {
1529             return Thread.interrupted() ?
1530                 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
1531                 0;
1532         }
1533 
1534         /**
1535          * Throws InterruptedException, reinterrupts current thread, or
1536          * does nothing, depending on mode.
1537          */
reportInterruptAfterWait(int interruptMode)1538         private void reportInterruptAfterWait(int interruptMode)
1539             throws InterruptedException {
1540             if (interruptMode == THROW_IE)
1541                 throw new InterruptedException();
1542             else if (interruptMode == REINTERRUPT)
1543                 selfInterrupt();
1544         }
1545 
1546         /**
1547          * Implements interruptible condition wait.
1548          * <ol>
1549          * <li>If current thread is interrupted, throw InterruptedException.
1550          * <li>Save lock state returned by {@link #getState}.
1551          * <li>Invoke {@link #release} with saved state as argument,
1552          *     throwing IllegalMonitorStateException if it fails.
1553          * <li>Block until signalled or interrupted.
1554          * <li>Reacquire by invoking specialized version of
1555          *     {@link #acquire} with saved state as argument.
1556          * <li>If interrupted while blocked in step 4, throw InterruptedException.
1557          * </ol>
1558          */
await()1559         public final void await() throws InterruptedException {
1560             if (Thread.interrupted())
1561                 throw new InterruptedException();
1562             Node node = addConditionWaiter();
1563             long savedState = fullyRelease(node);
1564             int interruptMode = 0;
1565             while (!isOnSyncQueue(node)) {
1566                 LockSupport.park(this);
1567                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1568                     break;
1569             }
1570             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1571                 interruptMode = REINTERRUPT;
1572             if (node.nextWaiter != null) // clean up if cancelled
1573                 unlinkCancelledWaiters();
1574             if (interruptMode != 0)
1575                 reportInterruptAfterWait(interruptMode);
1576         }
1577 
1578         /**
1579          * Implements timed condition wait.
1580          * <ol>
1581          * <li>If current thread is interrupted, throw InterruptedException.
1582          * <li>Save lock state returned by {@link #getState}.
1583          * <li>Invoke {@link #release} with saved state as argument,
1584          *     throwing IllegalMonitorStateException if it fails.
1585          * <li>Block until signalled, interrupted, or timed out.
1586          * <li>Reacquire by invoking specialized version of
1587          *     {@link #acquire} with saved state as argument.
1588          * <li>If interrupted while blocked in step 4, throw InterruptedException.
1589          * </ol>
1590          */
awaitNanos(long nanosTimeout)1591         public final long awaitNanos(long nanosTimeout)
1592                 throws InterruptedException {
1593             if (Thread.interrupted())
1594                 throw new InterruptedException();
1595             // We don't check for nanosTimeout <= 0L here, to allow
1596             // awaitNanos(0) as a way to "yield the lock".
1597             final long deadline = System.nanoTime() + nanosTimeout;
1598             long initialNanos = nanosTimeout;
1599             Node node = addConditionWaiter();
1600             long savedState = fullyRelease(node);
1601             int interruptMode = 0;
1602             while (!isOnSyncQueue(node)) {
1603                 if (nanosTimeout <= 0L) {
1604                     transferAfterCancelledWait(node);
1605                     break;
1606                 }
1607                 if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
1608                     LockSupport.parkNanos(this, nanosTimeout);
1609                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1610                     break;
1611                 nanosTimeout = deadline - System.nanoTime();
1612             }
1613             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1614                 interruptMode = REINTERRUPT;
1615             if (node.nextWaiter != null)
1616                 unlinkCancelledWaiters();
1617             if (interruptMode != 0)
1618                 reportInterruptAfterWait(interruptMode);
1619             long remaining = deadline - System.nanoTime(); // avoid overflow
1620             return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE;
1621         }
1622 
1623         /**
1624          * Implements absolute timed condition wait.
1625          * <ol>
1626          * <li>If current thread is interrupted, throw InterruptedException.
1627          * <li>Save lock state returned by {@link #getState}.
1628          * <li>Invoke {@link #release} with saved state as argument,
1629          *     throwing IllegalMonitorStateException if it fails.
1630          * <li>Block until signalled, interrupted, or timed out.
1631          * <li>Reacquire by invoking specialized version of
1632          *     {@link #acquire} with saved state as argument.
1633          * <li>If interrupted while blocked in step 4, throw InterruptedException.
1634          * <li>If timed out while blocked in step 4, return false, else true.
1635          * </ol>
1636          */
awaitUntil(Date deadline)1637         public final boolean awaitUntil(Date deadline)
1638                 throws InterruptedException {
1639             long abstime = deadline.getTime();
1640             if (Thread.interrupted())
1641                 throw new InterruptedException();
1642             Node node = addConditionWaiter();
1643             long savedState = fullyRelease(node);
1644             boolean timedout = false;
1645             int interruptMode = 0;
1646             while (!isOnSyncQueue(node)) {
1647                 if (System.currentTimeMillis() >= abstime) {
1648                     timedout = transferAfterCancelledWait(node);
1649                     break;
1650                 }
1651                 LockSupport.parkUntil(this, abstime);
1652                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1653                     break;
1654             }
1655             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1656                 interruptMode = REINTERRUPT;
1657             if (node.nextWaiter != null)
1658                 unlinkCancelledWaiters();
1659             if (interruptMode != 0)
1660                 reportInterruptAfterWait(interruptMode);
1661             return !timedout;
1662         }
1663 
1664         /**
1665          * Implements timed condition wait.
1666          * <ol>
1667          * <li>If current thread is interrupted, throw InterruptedException.
1668          * <li>Save lock state returned by {@link #getState}.
1669          * <li>Invoke {@link #release} with saved state as argument,
1670          *     throwing IllegalMonitorStateException if it fails.
1671          * <li>Block until signalled, interrupted, or timed out.
1672          * <li>Reacquire by invoking specialized version of
1673          *     {@link #acquire} with saved state as argument.
1674          * <li>If interrupted while blocked in step 4, throw InterruptedException.
1675          * <li>If timed out while blocked in step 4, return false, else true.
1676          * </ol>
1677          */
await(long time, TimeUnit unit)1678         public final boolean await(long time, TimeUnit unit)
1679                 throws InterruptedException {
1680             long nanosTimeout = unit.toNanos(time);
1681             if (Thread.interrupted())
1682                 throw new InterruptedException();
1683             // We don't check for nanosTimeout <= 0L here, to allow
1684             // await(0, unit) as a way to "yield the lock".
1685             final long deadline = System.nanoTime() + nanosTimeout;
1686             Node node = addConditionWaiter();
1687             long savedState = fullyRelease(node);
1688             boolean timedout = false;
1689             int interruptMode = 0;
1690             while (!isOnSyncQueue(node)) {
1691                 if (nanosTimeout <= 0L) {
1692                     timedout = transferAfterCancelledWait(node);
1693                     break;
1694                 }
1695                 if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
1696                     LockSupport.parkNanos(this, nanosTimeout);
1697                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
1698                     break;
1699                 nanosTimeout = deadline - System.nanoTime();
1700             }
1701             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
1702                 interruptMode = REINTERRUPT;
1703             if (node.nextWaiter != null)
1704                 unlinkCancelledWaiters();
1705             if (interruptMode != 0)
1706                 reportInterruptAfterWait(interruptMode);
1707             return !timedout;
1708         }
1709 
1710         //  support for instrumentation
1711 
1712         /**
1713          * Returns true if this condition was created by the given
1714          * synchronization object.
1715          *
1716          * @return {@code true} if owned
1717          */
isOwnedBy(AbstractQueuedLongSynchronizer sync)1718         final boolean isOwnedBy(AbstractQueuedLongSynchronizer sync) {
1719             return sync == AbstractQueuedLongSynchronizer.this;
1720         }
1721 
1722         /**
1723          * Queries whether any threads are waiting on this condition.
1724          * Implements {@link AbstractQueuedLongSynchronizer#hasWaiters(ConditionObject)}.
1725          *
1726          * @return {@code true} if there are any waiting threads
1727          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1728          *         returns {@code false}
1729          */
hasWaiters()1730         protected final boolean hasWaiters() {
1731             if (!isHeldExclusively())
1732                 throw new IllegalMonitorStateException();
1733             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1734                 if (w.waitStatus == Node.CONDITION)
1735                     return true;
1736             }
1737             return false;
1738         }
1739 
1740         /**
1741          * Returns an estimate of the number of threads waiting on
1742          * this condition.
1743          * Implements {@link AbstractQueuedLongSynchronizer#getWaitQueueLength(ConditionObject)}.
1744          *
1745          * @return the estimated number of waiting threads
1746          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1747          *         returns {@code false}
1748          */
getWaitQueueLength()1749         protected final int getWaitQueueLength() {
1750             if (!isHeldExclusively())
1751                 throw new IllegalMonitorStateException();
1752             int n = 0;
1753             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1754                 if (w.waitStatus == Node.CONDITION)
1755                     ++n;
1756             }
1757             return n;
1758         }
1759 
1760         /**
1761          * Returns a collection containing those threads that may be
1762          * waiting on this Condition.
1763          * Implements {@link AbstractQueuedLongSynchronizer#getWaitingThreads(ConditionObject)}.
1764          *
1765          * @return the collection of threads
1766          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1767          *         returns {@code false}
1768          */
getWaitingThreads()1769         protected final Collection<Thread> getWaitingThreads() {
1770             if (!isHeldExclusively())
1771                 throw new IllegalMonitorStateException();
1772             ArrayList<Thread> list = new ArrayList<>();
1773             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
1774                 if (w.waitStatus == Node.CONDITION) {
1775                     Thread t = w.thread;
1776                     if (t != null)
1777                         list.add(t);
1778                 }
1779             }
1780             return list;
1781         }
1782     }
1783 
1784     /**
1785      * Setup to support compareAndSet. We need to natively implement
1786      * this here: For the sake of permitting future enhancements, we
1787      * cannot explicitly subclass AtomicLong, which would be
1788      * efficient and useful otherwise. So, as the lesser of evils, we
1789      * natively implement using hotspot intrinsics API. And while we
1790      * are at it, we do the same for other CASable fields (which could
1791      * otherwise be done with atomic field updaters).
1792      */
1793     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1794     private static final long STATE;
1795     private static final long HEAD;
1796     private static final long TAIL;
1797 
1798     static {
1799         try {
1800             STATE = U.objectFieldOffset
1801                 (AbstractQueuedLongSynchronizer.class.getDeclaredField("state"));
1802             HEAD = U.objectFieldOffset
1803                 (AbstractQueuedLongSynchronizer.class.getDeclaredField("head"));
1804             TAIL = U.objectFieldOffset
1805                 (AbstractQueuedLongSynchronizer.class.getDeclaredField("tail"));
1806         } catch (ReflectiveOperationException e) {
1807             throw new Error(e);
1808         }
1809 
1810         // Reduce the risk of rare disastrous classloading in first call to
1811         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1812         Class<?> ensureLoaded = LockSupport.class;
1813     }
1814 
1815     /**
1816      * Initializes head and tail fields on first contention.
1817      */
initializeSyncQueue()1818     private final void initializeSyncQueue() {
1819         Node h;
1820         if (U.compareAndSwapObject(this, HEAD, null, (h = new Node())))
1821             tail = h;
1822     }
1823 
1824     /**
1825      * CASes tail field.
1826      */
compareAndSetTail(Node expect, Node update)1827     private final boolean compareAndSetTail(Node expect, Node update) {
1828         return U.compareAndSwapObject(this, TAIL, expect, update);
1829     }
1830 }
1831