1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/publicdomain/zero/1.0/
5  */
6 
7 package java.util.concurrent;
8 
9 import java.util.AbstractQueue;
10 import java.util.Arrays;
11 import java.util.Collection;
12 import java.util.Iterator;
13 import java.util.NoSuchElementException;
14 import java.util.Queue;
15 import java.util.Spliterator;
16 import java.util.Spliterators;
17 import java.util.concurrent.locks.LockSupport;
18 import java.util.function.Consumer;
19 
20 // BEGIN android-note
21 // removed link to collections framework docs
22 // END android-note
23 
24 /**
25  * An unbounded {@link TransferQueue} based on linked nodes.
26  * This queue orders elements FIFO (first-in-first-out) with respect
27  * to any given producer.  The <em>head</em> of the queue is that
28  * element that has been on the queue the longest time for some
29  * producer.  The <em>tail</em> of the queue is that element that has
30  * been on the queue the shortest time for some producer.
31  *
32  * <p>Beware that, unlike in most collections, the {@code size} method
33  * is <em>NOT</em> a constant-time operation. Because of the
34  * asynchronous nature of these queues, determining the current number
35  * of elements requires a traversal of the elements, and so may report
36  * inaccurate results if this collection is modified during traversal.
37  * Additionally, the bulk operations {@code addAll},
38  * {@code removeAll}, {@code retainAll}, {@code containsAll},
39  * {@code equals}, and {@code toArray} are <em>not</em> guaranteed
40  * to be performed atomically. For example, an iterator operating
41  * concurrently with an {@code addAll} operation might view only some
42  * of the added elements.
43  *
44  * <p>This class and its iterator implement all of the
45  * <em>optional</em> methods of the {@link Collection} and {@link
46  * Iterator} interfaces.
47  *
48  * <p>Memory consistency effects: As with other concurrent
49  * collections, actions in a thread prior to placing an object into a
50  * {@code LinkedTransferQueue}
51  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
52  * actions subsequent to the access or removal of that element from
53  * the {@code LinkedTransferQueue} in another thread.
54  *
55  * @since 1.7
56  * @author Doug Lea
57  * @param <E> the type of elements held in this queue
58  */
59 public class LinkedTransferQueue<E> extends AbstractQueue<E>
60     implements TransferQueue<E>, java.io.Serializable {
61     private static final long serialVersionUID = -3223113410248163686L;
62 
63     /*
64      * *** Overview of Dual Queues with Slack ***
65      *
66      * Dual Queues, introduced by Scherer and Scott
67      * (http://www.cs.rice.edu/~wns1/papers/2004-DISC-DDS.pdf) are
68      * (linked) queues in which nodes may represent either data or
69      * requests.  When a thread tries to enqueue a data node, but
70      * encounters a request node, it instead "matches" and removes it;
71      * and vice versa for enqueuing requests. Blocking Dual Queues
72      * arrange that threads enqueuing unmatched requests block until
73      * other threads provide the match. Dual Synchronous Queues (see
74      * Scherer, Lea, & Scott
75      * http://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf)
76      * additionally arrange that threads enqueuing unmatched data also
77      * block.  Dual Transfer Queues support all of these modes, as
78      * dictated by callers.
79      *
80      * A FIFO dual queue may be implemented using a variation of the
81      * Michael & Scott (M&S) lock-free queue algorithm
82      * (http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf).
83      * It maintains two pointer fields, "head", pointing to a
84      * (matched) node that in turn points to the first actual
85      * (unmatched) queue node (or null if empty); and "tail" that
86      * points to the last node on the queue (or again null if
87      * empty). For example, here is a possible queue with four data
88      * elements:
89      *
90      *  head                tail
91      *    |                   |
92      *    v                   v
93      *    M -> U -> U -> U -> U
94      *
95      * The M&S queue algorithm is known to be prone to scalability and
96      * overhead limitations when maintaining (via CAS) these head and
97      * tail pointers. This has led to the development of
98      * contention-reducing variants such as elimination arrays (see
99      * Moir et al http://portal.acm.org/citation.cfm?id=1074013) and
100      * optimistic back pointers (see Ladan-Mozes & Shavit
101      * http://people.csail.mit.edu/edya/publications/OptimisticFIFOQueue-journal.pdf).
102      * However, the nature of dual queues enables a simpler tactic for
103      * improving M&S-style implementations when dual-ness is needed.
104      *
105      * In a dual queue, each node must atomically maintain its match
106      * status. While there are other possible variants, we implement
107      * this here as: for a data-mode node, matching entails CASing an
108      * "item" field from a non-null data value to null upon match, and
109      * vice-versa for request nodes, CASing from null to a data
110      * value. (Note that the linearization properties of this style of
111      * queue are easy to verify -- elements are made available by
112      * linking, and unavailable by matching.) Compared to plain M&S
113      * queues, this property of dual queues requires one additional
114      * successful atomic operation per enq/deq pair. But it also
115      * enables lower cost variants of queue maintenance mechanics. (A
116      * variation of this idea applies even for non-dual queues that
117      * support deletion of interior elements, such as
118      * j.u.c.ConcurrentLinkedQueue.)
119      *
120      * Once a node is matched, its match status can never again
121      * change.  We may thus arrange that the linked list of them
122      * contain a prefix of zero or more matched nodes, followed by a
123      * suffix of zero or more unmatched nodes. (Note that we allow
124      * both the prefix and suffix to be zero length, which in turn
125      * means that we do not use a dummy header.)  If we were not
126      * concerned with either time or space efficiency, we could
127      * correctly perform enqueue and dequeue operations by traversing
128      * from a pointer to the initial node; CASing the item of the
129      * first unmatched node on match and CASing the next field of the
130      * trailing node on appends. (Plus some special-casing when
131      * initially empty).  While this would be a terrible idea in
132      * itself, it does have the benefit of not requiring ANY atomic
133      * updates on head/tail fields.
134      *
135      * We introduce here an approach that lies between the extremes of
136      * never versus always updating queue (head and tail) pointers.
137      * This offers a tradeoff between sometimes requiring extra
138      * traversal steps to locate the first and/or last unmatched
139      * nodes, versus the reduced overhead and contention of fewer
140      * updates to queue pointers. For example, a possible snapshot of
141      * a queue is:
142      *
143      *  head           tail
144      *    |              |
145      *    v              v
146      *    M -> M -> U -> U -> U -> U
147      *
148      * The best value for this "slack" (the targeted maximum distance
149      * between the value of "head" and the first unmatched node, and
150      * similarly for "tail") is an empirical matter. We have found
151      * that using very small constants in the range of 1-3 work best
152      * over a range of platforms. Larger values introduce increasing
153      * costs of cache misses and risks of long traversal chains, while
154      * smaller values increase CAS contention and overhead.
155      *
156      * Dual queues with slack differ from plain M&S dual queues by
157      * virtue of only sometimes updating head or tail pointers when
158      * matching, appending, or even traversing nodes; in order to
159      * maintain a targeted slack.  The idea of "sometimes" may be
160      * operationalized in several ways. The simplest is to use a
161      * per-operation counter incremented on each traversal step, and
162      * to try (via CAS) to update the associated queue pointer
163      * whenever the count exceeds a threshold. Another, that requires
164      * more overhead, is to use random number generators to update
165      * with a given probability per traversal step.
166      *
167      * In any strategy along these lines, because CASes updating
168      * fields may fail, the actual slack may exceed targeted
169      * slack. However, they may be retried at any time to maintain
170      * targets.  Even when using very small slack values, this
171      * approach works well for dual queues because it allows all
172      * operations up to the point of matching or appending an item
173      * (hence potentially allowing progress by another thread) to be
174      * read-only, thus not introducing any further contention. As
175      * described below, we implement this by performing slack
176      * maintenance retries only after these points.
177      *
178      * As an accompaniment to such techniques, traversal overhead can
179      * be further reduced without increasing contention of head
180      * pointer updates: Threads may sometimes shortcut the "next" link
181      * path from the current "head" node to be closer to the currently
182      * known first unmatched node, and similarly for tail. Again, this
183      * may be triggered with using thresholds or randomization.
184      *
185      * These ideas must be further extended to avoid unbounded amounts
186      * of costly-to-reclaim garbage caused by the sequential "next"
187      * links of nodes starting at old forgotten head nodes: As first
188      * described in detail by Boehm
189      * (http://portal.acm.org/citation.cfm?doid=503272.503282), if a GC
190      * delays noticing that any arbitrarily old node has become
191      * garbage, all newer dead nodes will also be unreclaimed.
192      * (Similar issues arise in non-GC environments.)  To cope with
193      * this in our implementation, upon CASing to advance the head
194      * pointer, we set the "next" link of the previous head to point
195      * only to itself; thus limiting the length of connected dead lists.
196      * (We also take similar care to wipe out possibly garbage
197      * retaining values held in other Node fields.)  However, doing so
198      * adds some further complexity to traversal: If any "next"
199      * pointer links to itself, it indicates that the current thread
200      * has lagged behind a head-update, and so the traversal must
201      * continue from the "head".  Traversals trying to find the
202      * current tail starting from "tail" may also encounter
203      * self-links, in which case they also continue at "head".
204      *
205      * It is tempting in slack-based scheme to not even use CAS for
206      * updates (similarly to Ladan-Mozes & Shavit). However, this
207      * cannot be done for head updates under the above link-forgetting
208      * mechanics because an update may leave head at a detached node.
209      * And while direct writes are possible for tail updates, they
210      * increase the risk of long retraversals, and hence long garbage
211      * chains, which can be much more costly than is worthwhile
212      * considering that the cost difference of performing a CAS vs
213      * write is smaller when they are not triggered on each operation
214      * (especially considering that writes and CASes equally require
215      * additional GC bookkeeping ("write barriers") that are sometimes
216      * more costly than the writes themselves because of contention).
217      *
218      * *** Overview of implementation ***
219      *
220      * We use a threshold-based approach to updates, with a slack
221      * threshold of two -- that is, we update head/tail when the
222      * current pointer appears to be two or more steps away from the
223      * first/last node. The slack value is hard-wired: a path greater
224      * than one is naturally implemented by checking equality of
225      * traversal pointers except when the list has only one element,
226      * in which case we keep slack threshold at one. Avoiding tracking
227      * explicit counts across method calls slightly simplifies an
228      * already-messy implementation. Using randomization would
229      * probably work better if there were a low-quality dirt-cheap
230      * per-thread one available, but even ThreadLocalRandom is too
231      * heavy for these purposes.
232      *
233      * With such a small slack threshold value, it is not worthwhile
234      * to augment this with path short-circuiting (i.e., unsplicing
235      * interior nodes) except in the case of cancellation/removal (see
236      * below).
237      *
238      * We allow both the head and tail fields to be null before any
239      * nodes are enqueued; initializing upon first append.  This
240      * simplifies some other logic, as well as providing more
241      * efficient explicit control paths instead of letting JVMs insert
242      * implicit NullPointerExceptions when they are null.  While not
243      * currently fully implemented, we also leave open the possibility
244      * of re-nulling these fields when empty (which is complicated to
245      * arrange, for little benefit.)
246      *
247      * All enqueue/dequeue operations are handled by the single method
248      * "xfer" with parameters indicating whether to act as some form
249      * of offer, put, poll, take, or transfer (each possibly with
250      * timeout). The relative complexity of using one monolithic
251      * method outweighs the code bulk and maintenance problems of
252      * using separate methods for each case.
253      *
254      * Operation consists of up to three phases. The first is
255      * implemented within method xfer, the second in tryAppend, and
256      * the third in method awaitMatch.
257      *
258      * 1. Try to match an existing node
259      *
260      *    Starting at head, skip already-matched nodes until finding
261      *    an unmatched node of opposite mode, if one exists, in which
262      *    case matching it and returning, also if necessary updating
263      *    head to one past the matched node (or the node itself if the
264      *    list has no other unmatched nodes). If the CAS misses, then
265      *    a loop retries advancing head by two steps until either
266      *    success or the slack is at most two. By requiring that each
267      *    attempt advances head by two (if applicable), we ensure that
268      *    the slack does not grow without bound. Traversals also check
269      *    if the initial head is now off-list, in which case they
270      *    start at the new head.
271      *
272      *    If no candidates are found and the call was untimed
273      *    poll/offer, (argument "how" is NOW) return.
274      *
275      * 2. Try to append a new node (method tryAppend)
276      *
277      *    Starting at current tail pointer, find the actual last node
278      *    and try to append a new node (or if head was null, establish
279      *    the first node). Nodes can be appended only if their
280      *    predecessors are either already matched or are of the same
281      *    mode. If we detect otherwise, then a new node with opposite
282      *    mode must have been appended during traversal, so we must
283      *    restart at phase 1. The traversal and update steps are
284      *    otherwise similar to phase 1: Retrying upon CAS misses and
285      *    checking for staleness.  In particular, if a self-link is
286      *    encountered, then we can safely jump to a node on the list
287      *    by continuing the traversal at current head.
288      *
289      *    On successful append, if the call was ASYNC, return.
290      *
291      * 3. Await match or cancellation (method awaitMatch)
292      *
293      *    Wait for another thread to match node; instead cancelling if
294      *    the current thread was interrupted or the wait timed out. On
295      *    multiprocessors, we use front-of-queue spinning: If a node
296      *    appears to be the first unmatched node in the queue, it
297      *    spins a bit before blocking. In either case, before blocking
298      *    it tries to unsplice any nodes between the current "head"
299      *    and the first unmatched node.
300      *
301      *    Front-of-queue spinning vastly improves performance of
302      *    heavily contended queues. And so long as it is relatively
303      *    brief and "quiet", spinning does not much impact performance
304      *    of less-contended queues.  During spins threads check their
305      *    interrupt status and generate a thread-local random number
306      *    to decide to occasionally perform a Thread.yield. While
307      *    yield has underdefined specs, we assume that it might help,
308      *    and will not hurt, in limiting impact of spinning on busy
309      *    systems.  We also use smaller (1/2) spins for nodes that are
310      *    not known to be front but whose predecessors have not
311      *    blocked -- these "chained" spins avoid artifacts of
312      *    front-of-queue rules which otherwise lead to alternating
313      *    nodes spinning vs blocking. Further, front threads that
314      *    represent phase changes (from data to request node or vice
315      *    versa) compared to their predecessors receive additional
316      *    chained spins, reflecting longer paths typically required to
317      *    unblock threads during phase changes.
318      *
319      *
320      * ** Unlinking removed interior nodes **
321      *
322      * In addition to minimizing garbage retention via self-linking
323      * described above, we also unlink removed interior nodes. These
324      * may arise due to timed out or interrupted waits, or calls to
325      * remove(x) or Iterator.remove.  Normally, given a node that was
326      * at one time known to be the predecessor of some node s that is
327      * to be removed, we can unsplice s by CASing the next field of
328      * its predecessor if it still points to s (otherwise s must
329      * already have been removed or is now offlist). But there are two
330      * situations in which we cannot guarantee to make node s
331      * unreachable in this way: (1) If s is the trailing node of list
332      * (i.e., with null next), then it is pinned as the target node
333      * for appends, so can only be removed later after other nodes are
334      * appended. (2) We cannot necessarily unlink s given a
335      * predecessor node that is matched (including the case of being
336      * cancelled): the predecessor may already be unspliced, in which
337      * case some previous reachable node may still point to s.
338      * (For further explanation see Herlihy & Shavit "The Art of
339      * Multiprocessor Programming" chapter 9).  Although, in both
340      * cases, we can rule out the need for further action if either s
341      * or its predecessor are (or can be made to be) at, or fall off
342      * from, the head of list.
343      *
344      * Without taking these into account, it would be possible for an
345      * unbounded number of supposedly removed nodes to remain
346      * reachable.  Situations leading to such buildup are uncommon but
347      * can occur in practice; for example when a series of short timed
348      * calls to poll repeatedly time out but never otherwise fall off
349      * the list because of an untimed call to take at the front of the
350      * queue.
351      *
352      * When these cases arise, rather than always retraversing the
353      * entire list to find an actual predecessor to unlink (which
354      * won't help for case (1) anyway), we record a conservative
355      * estimate of possible unsplice failures (in "sweepVotes").
356      * We trigger a full sweep when the estimate exceeds a threshold
357      * ("SWEEP_THRESHOLD") indicating the maximum number of estimated
358      * removal failures to tolerate before sweeping through, unlinking
359      * cancelled nodes that were not unlinked upon initial removal.
360      * We perform sweeps by the thread hitting threshold (rather than
361      * background threads or by spreading work to other threads)
362      * because in the main contexts in which removal occurs, the
363      * caller is already timed-out, cancelled, or performing a
364      * potentially O(n) operation (e.g. remove(x)), none of which are
365      * time-critical enough to warrant the overhead that alternatives
366      * would impose on other threads.
367      *
368      * Because the sweepVotes estimate is conservative, and because
369      * nodes become unlinked "naturally" as they fall off the head of
370      * the queue, and because we allow votes to accumulate even while
371      * sweeps are in progress, there are typically significantly fewer
372      * such nodes than estimated.  Choice of a threshold value
373      * balances the likelihood of wasted effort and contention, versus
374      * providing a worst-case bound on retention of interior nodes in
375      * quiescent queues. The value defined below was chosen
376      * empirically to balance these under various timeout scenarios.
377      *
378      * Note that we cannot self-link unlinked interior nodes during
379      * sweeps. However, the associated garbage chains terminate when
380      * some successor ultimately falls off the head of the list and is
381      * self-linked.
382      */
383 
384     /** True if on multiprocessor */
385     private static final boolean MP =
386         Runtime.getRuntime().availableProcessors() > 1;
387 
388     /**
389      * The number of times to spin (with randomly interspersed calls
390      * to Thread.yield) on multiprocessor before blocking when a node
391      * is apparently the first waiter in the queue.  See above for
392      * explanation. Must be a power of two. The value is empirically
393      * derived -- it works pretty well across a variety of processors,
394      * numbers of CPUs, and OSes.
395      */
396     private static final int FRONT_SPINS   = 1 << 7;
397 
398     /**
399      * The number of times to spin before blocking when a node is
400      * preceded by another node that is apparently spinning.  Also
401      * serves as an increment to FRONT_SPINS on phase changes, and as
402      * base average frequency for yielding during spins. Must be a
403      * power of two.
404      */
405     private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
406 
407     /**
408      * The maximum number of estimated removal failures (sweepVotes)
409      * to tolerate before sweeping through the queue unlinking
410      * cancelled nodes that were not unlinked upon initial
411      * removal. See above for explanation. The value must be at least
412      * two to avoid useless sweeps when removing trailing nodes.
413      */
414     static final int SWEEP_THRESHOLD = 32;
415 
416     /**
417      * Queue nodes. Uses Object, not E, for items to allow forgetting
418      * them after use.  Relies heavily on Unsafe mechanics to minimize
419      * unnecessary ordering constraints: Writes that are intrinsically
420      * ordered wrt other accesses or CASes use simple relaxed forms.
421      */
422     static final class Node {
423         final boolean isData;   // false if this is a request node
424         volatile Object item;   // initially non-null if isData; CASed to match
425         volatile Node next;
426         volatile Thread waiter; // null until waiting
427 
428         // CAS methods for fields
casNext(Node cmp, Node val)429         final boolean casNext(Node cmp, Node val) {
430             return U.compareAndSwapObject(this, NEXT, cmp, val);
431         }
432 
casItem(Object cmp, Object val)433         final boolean casItem(Object cmp, Object val) {
434             // assert cmp == null || cmp.getClass() != Node.class;
435             return U.compareAndSwapObject(this, ITEM, cmp, val);
436         }
437 
438         /**
439          * Constructs a new node.  Uses relaxed write because item can
440          * only be seen after publication via casNext.
441          */
Node(Object item, boolean isData)442         Node(Object item, boolean isData) {
443             U.putObject(this, ITEM, item); // relaxed write
444             this.isData = isData;
445         }
446 
447         /**
448          * Links node to itself to avoid garbage retention.  Called
449          * only after CASing head field, so uses relaxed write.
450          */
forgetNext()451         final void forgetNext() {
452             U.putObject(this, NEXT, this);
453         }
454 
455         /**
456          * Sets item to self and waiter to null, to avoid garbage
457          * retention after matching or cancelling. Uses relaxed writes
458          * because order is already constrained in the only calling
459          * contexts: item is forgotten only after volatile/atomic
460          * mechanics that extract items.  Similarly, clearing waiter
461          * follows either CAS or return from park (if ever parked;
462          * else we don't care).
463          */
forgetContents()464         final void forgetContents() {
465             U.putObject(this, ITEM, this);
466             U.putObject(this, WAITER, null);
467         }
468 
469         /**
470          * Returns true if this node has been matched, including the
471          * case of artificial matches due to cancellation.
472          */
isMatched()473         final boolean isMatched() {
474             Object x = item;
475             return (x == this) || ((x == null) == isData);
476         }
477 
478         /**
479          * Returns true if this is an unmatched request node.
480          */
isUnmatchedRequest()481         final boolean isUnmatchedRequest() {
482             return !isData && item == null;
483         }
484 
485         /**
486          * Returns true if a node with the given mode cannot be
487          * appended to this node because this node is unmatched and
488          * has opposite data mode.
489          */
cannotPrecede(boolean haveData)490         final boolean cannotPrecede(boolean haveData) {
491             boolean d = isData;
492             Object x;
493             return d != haveData && (x = item) != this && (x != null) == d;
494         }
495 
496         /**
497          * Tries to artificially match a data node -- used by remove.
498          */
tryMatchData()499         final boolean tryMatchData() {
500             // assert isData;
501             Object x = item;
502             if (x != null && x != this && casItem(x, null)) {
503                 LockSupport.unpark(waiter);
504                 return true;
505             }
506             return false;
507         }
508 
509         private static final long serialVersionUID = -3375979862319811754L;
510 
511         // Unsafe mechanics
512         private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
513         private static final long ITEM;
514         private static final long NEXT;
515         private static final long WAITER;
516         static {
517             try {
518                 ITEM = U.objectFieldOffset
519                     (Node.class.getDeclaredField("item"));
520                 NEXT = U.objectFieldOffset
521                     (Node.class.getDeclaredField("next"));
522                 WAITER = U.objectFieldOffset
523                     (Node.class.getDeclaredField("waiter"));
524             } catch (ReflectiveOperationException e) {
525                 throw new Error(e);
526             }
527         }
528     }
529 
530     /** head of the queue; null until first enqueue */
531     transient volatile Node head;
532 
533     /** tail of the queue; null until first append */
534     private transient volatile Node tail;
535 
536     /** The number of apparent failures to unsplice removed nodes */
537     private transient volatile int sweepVotes;
538 
539     // CAS methods for fields
casTail(Node cmp, Node val)540     private boolean casTail(Node cmp, Node val) {
541         return U.compareAndSwapObject(this, TAIL, cmp, val);
542     }
543 
casHead(Node cmp, Node val)544     private boolean casHead(Node cmp, Node val) {
545         return U.compareAndSwapObject(this, HEAD, cmp, val);
546     }
547 
casSweepVotes(int cmp, int val)548     private boolean casSweepVotes(int cmp, int val) {
549         return U.compareAndSwapInt(this, SWEEPVOTES, cmp, val);
550     }
551 
552     /*
553      * Possible values for "how" argument in xfer method.
554      */
555     private static final int NOW   = 0; // for untimed poll, tryTransfer
556     private static final int ASYNC = 1; // for offer, put, add
557     private static final int SYNC  = 2; // for transfer, take
558     private static final int TIMED = 3; // for timed poll, tryTransfer
559 
560     /**
561      * Implements all queuing methods. See above for explanation.
562      *
563      * @param e the item or null for take
564      * @param haveData true if this is a put, else a take
565      * @param how NOW, ASYNC, SYNC, or TIMED
566      * @param nanos timeout in nanosecs, used only if mode is TIMED
567      * @return an item if matched, else e
568      * @throws NullPointerException if haveData mode but e is null
569      */
xfer(E e, boolean haveData, int how, long nanos)570     private E xfer(E e, boolean haveData, int how, long nanos) {
571         if (haveData && (e == null))
572             throw new NullPointerException();
573         Node s = null;                        // the node to append, if needed
574 
575         retry:
576         for (;;) {                            // restart on append race
577 
578             for (Node h = head, p = h; p != null;) { // find & match first node
579                 boolean isData = p.isData;
580                 Object item = p.item;
581                 if (item != p && (item != null) == isData) { // unmatched
582                     if (isData == haveData)   // can't match
583                         break;
584                     if (p.casItem(item, e)) { // match
585                         for (Node q = p; q != h;) {
586                             Node n = q.next;  // update by 2 unless singleton
587                             if (head == h && casHead(h, n == null ? q : n)) {
588                                 h.forgetNext();
589                                 break;
590                             }                 // advance and retry
591                             if ((h = head)   == null ||
592                                 (q = h.next) == null || !q.isMatched())
593                                 break;        // unless slack < 2
594                         }
595                         LockSupport.unpark(p.waiter);
596                         @SuppressWarnings("unchecked") E itemE = (E) item;
597                         return itemE;
598                     }
599                 }
600                 Node n = p.next;
601                 p = (p != n) ? n : (h = head); // Use head if p offlist
602             }
603 
604             if (how != NOW) {                 // No matches available
605                 if (s == null)
606                     s = new Node(e, haveData);
607                 Node pred = tryAppend(s, haveData);
608                 if (pred == null)
609                     continue retry;           // lost race vs opposite mode
610                 if (how != ASYNC)
611                     return awaitMatch(s, pred, e, (how == TIMED), nanos);
612             }
613             return e; // not waiting
614         }
615     }
616 
617     /**
618      * Tries to append node s as tail.
619      *
620      * @param s the node to append
621      * @param haveData true if appending in data mode
622      * @return null on failure due to losing race with append in
623      * different mode, else s's predecessor, or s itself if no
624      * predecessor
625      */
tryAppend(Node s, boolean haveData)626     private Node tryAppend(Node s, boolean haveData) {
627         for (Node t = tail, p = t;;) {        // move p to last node and append
628             Node n, u;                        // temps for reads of next & tail
629             if (p == null && (p = head) == null) {
630                 if (casHead(null, s))
631                     return s;                 // initialize
632             }
633             else if (p.cannotPrecede(haveData))
634                 return null;                  // lost race vs opposite mode
635             else if ((n = p.next) != null)    // not last; keep traversing
636                 p = p != t && t != (u = tail) ? (t = u) : // stale tail
637                     (p != n) ? n : null;      // restart if off list
638             else if (!p.casNext(null, s))
639                 p = p.next;                   // re-read on CAS failure
640             else {
641                 if (p != t) {                 // update if slack now >= 2
642                     while ((tail != t || !casTail(t, s)) &&
643                            (t = tail)   != null &&
644                            (s = t.next) != null && // advance and retry
645                            (s = s.next) != null && s != t);
646                 }
647                 return p;
648             }
649         }
650     }
651 
652     /**
653      * Spins/yields/blocks until node s is matched or caller gives up.
654      *
655      * @param s the waiting node
656      * @param pred the predecessor of s, or s itself if it has no
657      * predecessor, or null if unknown (the null case does not occur
658      * in any current calls but may in possible future extensions)
659      * @param e the comparison value for checking match
660      * @param timed if true, wait only until timeout elapses
661      * @param nanos timeout in nanosecs, used only if timed is true
662      * @return matched item, or e if unmatched on interrupt or timeout
663      */
awaitMatch(Node s, Node pred, E e, boolean timed, long nanos)664     private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
665         final long deadline = timed ? System.nanoTime() + nanos : 0L;
666         Thread w = Thread.currentThread();
667         int spins = -1; // initialized after first item and cancel checks
668         ThreadLocalRandom randomYields = null; // bound if needed
669 
670         for (;;) {
671             Object item = s.item;
672             if (item != e) {                  // matched
673                 // assert item != s;
674                 s.forgetContents();           // avoid garbage
675                 @SuppressWarnings("unchecked") E itemE = (E) item;
676                 return itemE;
677             }
678             else if (w.isInterrupted() || (timed && nanos <= 0L)) {
679                 unsplice(pred, s);           // try to unlink and cancel
680                 if (s.casItem(e, s))         // return normally if lost CAS
681                     return e;
682             }
683             else if (spins < 0) {            // establish spins at/near front
684                 if ((spins = spinsFor(pred, s.isData)) > 0)
685                     randomYields = ThreadLocalRandom.current();
686             }
687             else if (spins > 0) {             // spin
688                 --spins;
689                 if (randomYields.nextInt(CHAINED_SPINS) == 0)
690                     Thread.yield();           // occasionally yield
691             }
692             else if (s.waiter == null) {
693                 s.waiter = w;                 // request unpark then recheck
694             }
695             else if (timed) {
696                 nanos = deadline - System.nanoTime();
697                 if (nanos > 0L)
698                     LockSupport.parkNanos(this, nanos);
699             }
700             else {
701                 LockSupport.park(this);
702             }
703         }
704     }
705 
706     /**
707      * Returns spin/yield value for a node with given predecessor and
708      * data mode. See above for explanation.
709      */
spinsFor(Node pred, boolean haveData)710     private static int spinsFor(Node pred, boolean haveData) {
711         if (MP && pred != null) {
712             if (pred.isData != haveData)      // phase change
713                 return FRONT_SPINS + CHAINED_SPINS;
714             if (pred.isMatched())             // probably at front
715                 return FRONT_SPINS;
716             if (pred.waiter == null)          // pred apparently spinning
717                 return CHAINED_SPINS;
718         }
719         return 0;
720     }
721 
722     /* -------------- Traversal methods -------------- */
723 
724     /**
725      * Returns the successor of p, or the head node if p.next has been
726      * linked to self, which will only be true if traversing with a
727      * stale pointer that is now off the list.
728      */
succ(Node p)729     final Node succ(Node p) {
730         Node next = p.next;
731         return (p == next) ? head : next;
732     }
733 
734     /**
735      * Returns the first unmatched data node, or null if none.
736      * Callers must recheck if the returned node's item field is null
737      * or self-linked before using.
738      */
firstDataNode()739     final Node firstDataNode() {
740         restartFromHead: for (;;) {
741             for (Node p = head; p != null;) {
742                 Object item = p.item;
743                 if (p.isData) {
744                     if (item != null && item != p)
745                         return p;
746                 }
747                 else if (item == null)
748                     break;
749                 if (p == (p = p.next))
750                     continue restartFromHead;
751             }
752             return null;
753         }
754     }
755 
756     /**
757      * Traverses and counts unmatched nodes of the given mode.
758      * Used by methods size and getWaitingConsumerCount.
759      */
countOfMode(boolean data)760     private int countOfMode(boolean data) {
761         restartFromHead: for (;;) {
762             int count = 0;
763             for (Node p = head; p != null;) {
764                 if (!p.isMatched()) {
765                     if (p.isData != data)
766                         return 0;
767                     if (++count == Integer.MAX_VALUE)
768                         break;  // @see Collection.size()
769                 }
770                 if (p == (p = p.next))
771                     continue restartFromHead;
772             }
773             return count;
774         }
775     }
776 
toString()777     public String toString() {
778         String[] a = null;
779         restartFromHead: for (;;) {
780             int charLength = 0;
781             int size = 0;
782             for (Node p = head; p != null;) {
783                 Object item = p.item;
784                 if (p.isData) {
785                     if (item != null && item != p) {
786                         if (a == null)
787                             a = new String[4];
788                         else if (size == a.length)
789                             a = Arrays.copyOf(a, 2 * size);
790                         String s = item.toString();
791                         a[size++] = s;
792                         charLength += s.length();
793                     }
794                 } else if (item == null)
795                     break;
796                 if (p == (p = p.next))
797                     continue restartFromHead;
798             }
799 
800             if (size == 0)
801                 return "[]";
802 
803             return Helpers.toString(a, size, charLength);
804         }
805     }
806 
toArrayInternal(Object[] a)807     private Object[] toArrayInternal(Object[] a) {
808         Object[] x = a;
809         restartFromHead: for (;;) {
810             int size = 0;
811             for (Node p = head; p != null;) {
812                 Object item = p.item;
813                 if (p.isData) {
814                     if (item != null && item != p) {
815                         if (x == null)
816                             x = new Object[4];
817                         else if (size == x.length)
818                             x = Arrays.copyOf(x, 2 * (size + 4));
819                         x[size++] = item;
820                     }
821                 } else if (item == null)
822                     break;
823                 if (p == (p = p.next))
824                     continue restartFromHead;
825             }
826             if (x == null)
827                 return new Object[0];
828             else if (a != null && size <= a.length) {
829                 if (a != x)
830                     System.arraycopy(x, 0, a, 0, size);
831                 if (size < a.length)
832                     a[size] = null;
833                 return a;
834             }
835             return (size == x.length) ? x : Arrays.copyOf(x, size);
836         }
837     }
838 
839     /**
840      * Returns an array containing all of the elements in this queue, in
841      * proper sequence.
842      *
843      * <p>The returned array will be "safe" in that no references to it are
844      * maintained by this queue.  (In other words, this method must allocate
845      * a new array).  The caller is thus free to modify the returned array.
846      *
847      * <p>This method acts as bridge between array-based and collection-based
848      * APIs.
849      *
850      * @return an array containing all of the elements in this queue
851      */
toArray()852     public Object[] toArray() {
853         return toArrayInternal(null);
854     }
855 
856     /**
857      * Returns an array containing all of the elements in this queue, in
858      * proper sequence; the runtime type of the returned array is that of
859      * the specified array.  If the queue fits in the specified array, it
860      * is returned therein.  Otherwise, a new array is allocated with the
861      * runtime type of the specified array and the size of this queue.
862      *
863      * <p>If this queue fits in the specified array with room to spare
864      * (i.e., the array has more elements than this queue), the element in
865      * the array immediately following the end of the queue is set to
866      * {@code null}.
867      *
868      * <p>Like the {@link #toArray()} method, this method acts as bridge between
869      * array-based and collection-based APIs.  Further, this method allows
870      * precise control over the runtime type of the output array, and may,
871      * under certain circumstances, be used to save allocation costs.
872      *
873      * <p>Suppose {@code x} is a queue known to contain only strings.
874      * The following code can be used to dump the queue into a newly
875      * allocated array of {@code String}:
876      *
877      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
878      *
879      * Note that {@code toArray(new Object[0])} is identical in function to
880      * {@code toArray()}.
881      *
882      * @param a the array into which the elements of the queue are to
883      *          be stored, if it is big enough; otherwise, a new array of the
884      *          same runtime type is allocated for this purpose
885      * @return an array containing all of the elements in this queue
886      * @throws ArrayStoreException if the runtime type of the specified array
887      *         is not a supertype of the runtime type of every element in
888      *         this queue
889      * @throws NullPointerException if the specified array is null
890      */
891     @SuppressWarnings("unchecked")
toArray(T[] a)892     public <T> T[] toArray(T[] a) {
893         if (a == null) throw new NullPointerException();
894         return (T[]) toArrayInternal(a);
895     }
896 
897     final class Itr implements Iterator<E> {
898         private Node nextNode;   // next node to return item for
899         private E nextItem;      // the corresponding item
900         private Node lastRet;    // last returned node, to support remove
901         private Node lastPred;   // predecessor to unlink lastRet
902 
903         /**
904          * Moves to next node after prev, or first node if prev null.
905          */
advance(Node prev)906         private void advance(Node prev) {
907             /*
908              * To track and avoid buildup of deleted nodes in the face
909              * of calls to both Queue.remove and Itr.remove, we must
910              * include variants of unsplice and sweep upon each
911              * advance: Upon Itr.remove, we may need to catch up links
912              * from lastPred, and upon other removes, we might need to
913              * skip ahead from stale nodes and unsplice deleted ones
914              * found while advancing.
915              */
916 
917             Node r, b; // reset lastPred upon possible deletion of lastRet
918             if ((r = lastRet) != null && !r.isMatched())
919                 lastPred = r;    // next lastPred is old lastRet
920             else if ((b = lastPred) == null || b.isMatched())
921                 lastPred = null; // at start of list
922             else {
923                 Node s, n;       // help with removal of lastPred.next
924                 while ((s = b.next) != null &&
925                        s != b && s.isMatched() &&
926                        (n = s.next) != null && n != s)
927                     b.casNext(s, n);
928             }
929 
930             this.lastRet = prev;
931 
932             for (Node p = prev, s, n;;) {
933                 s = (p == null) ? head : p.next;
934                 if (s == null)
935                     break;
936                 else if (s == p) {
937                     p = null;
938                     continue;
939                 }
940                 Object item = s.item;
941                 if (s.isData) {
942                     if (item != null && item != s) {
943                         @SuppressWarnings("unchecked") E itemE = (E) item;
944                         nextItem = itemE;
945                         nextNode = s;
946                         return;
947                     }
948                 }
949                 else if (item == null)
950                     break;
951                 // assert s.isMatched();
952                 if (p == null)
953                     p = s;
954                 else if ((n = s.next) == null)
955                     break;
956                 else if (s == n)
957                     p = null;
958                 else
959                     p.casNext(s, n);
960             }
961             nextNode = null;
962             nextItem = null;
963         }
964 
Itr()965         Itr() {
966             advance(null);
967         }
968 
hasNext()969         public final boolean hasNext() {
970             return nextNode != null;
971         }
972 
next()973         public final E next() {
974             Node p = nextNode;
975             if (p == null) throw new NoSuchElementException();
976             E e = nextItem;
977             advance(p);
978             return e;
979         }
980 
remove()981         public final void remove() {
982             final Node lastRet = this.lastRet;
983             if (lastRet == null)
984                 throw new IllegalStateException();
985             this.lastRet = null;
986             if (lastRet.tryMatchData())
987                 unsplice(lastPred, lastRet);
988         }
989     }
990 
991     /** A customized variant of Spliterators.IteratorSpliterator */
992     final class LTQSpliterator<E> implements Spliterator<E> {
993         static final int MAX_BATCH = 1 << 25;  // max batch array size;
994         Node current;       // current node; null until initialized
995         int batch;          // batch size for splits
996         boolean exhausted;  // true when no more nodes
LTQSpliterator()997         LTQSpliterator() {}
998 
trySplit()999         public Spliterator<E> trySplit() {
1000             Node p;
1001             int b = batch;
1002             int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
1003             if (!exhausted &&
1004                 ((p = current) != null || (p = firstDataNode()) != null) &&
1005                 p.next != null) {
1006                 Object[] a = new Object[n];
1007                 int i = 0;
1008                 do {
1009                     Object e = p.item;
1010                     if (e != p && (a[i] = e) != null)
1011                         ++i;
1012                     if (p == (p = p.next))
1013                         p = firstDataNode();
1014                 } while (p != null && i < n && p.isData);
1015                 if ((current = p) == null)
1016                     exhausted = true;
1017                 if (i > 0) {
1018                     batch = i;
1019                     return Spliterators.spliterator
1020                         (a, 0, i, (Spliterator.ORDERED |
1021                                    Spliterator.NONNULL |
1022                                    Spliterator.CONCURRENT));
1023                 }
1024             }
1025             return null;
1026         }
1027 
1028         @SuppressWarnings("unchecked")
forEachRemaining(Consumer<? super E> action)1029         public void forEachRemaining(Consumer<? super E> action) {
1030             Node p;
1031             if (action == null) throw new NullPointerException();
1032             if (!exhausted &&
1033                 ((p = current) != null || (p = firstDataNode()) != null)) {
1034                 exhausted = true;
1035                 do {
1036                     Object e = p.item;
1037                     if (e != null && e != p)
1038                         action.accept((E)e);
1039                     if (p == (p = p.next))
1040                         p = firstDataNode();
1041                 } while (p != null && p.isData);
1042             }
1043         }
1044 
1045         @SuppressWarnings("unchecked")
tryAdvance(Consumer<? super E> action)1046         public boolean tryAdvance(Consumer<? super E> action) {
1047             Node p;
1048             if (action == null) throw new NullPointerException();
1049             if (!exhausted &&
1050                 ((p = current) != null || (p = firstDataNode()) != null)) {
1051                 Object e;
1052                 do {
1053                     if ((e = p.item) == p)
1054                         e = null;
1055                     if (p == (p = p.next))
1056                         p = firstDataNode();
1057                 } while (e == null && p != null && p.isData);
1058                 if ((current = p) == null)
1059                     exhausted = true;
1060                 if (e != null) {
1061                     action.accept((E)e);
1062                     return true;
1063                 }
1064             }
1065             return false;
1066         }
1067 
estimateSize()1068         public long estimateSize() { return Long.MAX_VALUE; }
1069 
characteristics()1070         public int characteristics() {
1071             return Spliterator.ORDERED | Spliterator.NONNULL |
1072                 Spliterator.CONCURRENT;
1073         }
1074     }
1075 
1076     /**
1077      * Returns a {@link Spliterator} over the elements in this queue.
1078      *
1079      * <p>The returned spliterator is
1080      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1081      *
1082      * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
1083      * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
1084      *
1085      * @implNote
1086      * The {@code Spliterator} implements {@code trySplit} to permit limited
1087      * parallelism.
1088      *
1089      * @return a {@code Spliterator} over the elements in this queue
1090      * @since 1.8
1091      */
spliterator()1092     public Spliterator<E> spliterator() {
1093         return new LTQSpliterator<E>();
1094     }
1095 
1096     /* -------------- Removal methods -------------- */
1097 
1098     /**
1099      * Unsplices (now or later) the given deleted/cancelled node with
1100      * the given predecessor.
1101      *
1102      * @param pred a node that was at one time known to be the
1103      * predecessor of s, or null or s itself if s is/was at head
1104      * @param s the node to be unspliced
1105      */
unsplice(Node pred, Node s)1106     final void unsplice(Node pred, Node s) {
1107         s.waiter = null; // disable signals
1108         /*
1109          * See above for rationale. Briefly: if pred still points to
1110          * s, try to unlink s.  If s cannot be unlinked, because it is
1111          * trailing node or pred might be unlinked, and neither pred
1112          * nor s are head or offlist, add to sweepVotes, and if enough
1113          * votes have accumulated, sweep.
1114          */
1115         if (pred != null && pred != s && pred.next == s) {
1116             Node n = s.next;
1117             if (n == null ||
1118                 (n != s && pred.casNext(s, n) && pred.isMatched())) {
1119                 for (;;) {               // check if at, or could be, head
1120                     Node h = head;
1121                     if (h == pred || h == s || h == null)
1122                         return;          // at head or list empty
1123                     if (!h.isMatched())
1124                         break;
1125                     Node hn = h.next;
1126                     if (hn == null)
1127                         return;          // now empty
1128                     if (hn != h && casHead(h, hn))
1129                         h.forgetNext();  // advance head
1130                 }
1131                 if (pred.next != pred && s.next != s) { // recheck if offlist
1132                     for (;;) {           // sweep now if enough votes
1133                         int v = sweepVotes;
1134                         if (v < SWEEP_THRESHOLD) {
1135                             if (casSweepVotes(v, v + 1))
1136                                 break;
1137                         }
1138                         else if (casSweepVotes(v, 0)) {
1139                             sweep();
1140                             break;
1141                         }
1142                     }
1143                 }
1144             }
1145         }
1146     }
1147 
1148     /**
1149      * Unlinks matched (typically cancelled) nodes encountered in a
1150      * traversal from head.
1151      */
sweep()1152     private void sweep() {
1153         for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
1154             if (!s.isMatched())
1155                 // Unmatched nodes are never self-linked
1156                 p = s;
1157             else if ((n = s.next) == null) // trailing node is pinned
1158                 break;
1159             else if (s == n)    // stale
1160                 // No need to also check for p == s, since that implies s == n
1161                 p = head;
1162             else
1163                 p.casNext(s, n);
1164         }
1165     }
1166 
1167     /**
1168      * Main implementation of remove(Object)
1169      */
findAndRemove(Object e)1170     private boolean findAndRemove(Object e) {
1171         if (e != null) {
1172             for (Node pred = null, p = head; p != null; ) {
1173                 Object item = p.item;
1174                 if (p.isData) {
1175                     if (item != null && item != p && e.equals(item) &&
1176                         p.tryMatchData()) {
1177                         unsplice(pred, p);
1178                         return true;
1179                     }
1180                 }
1181                 else if (item == null)
1182                     break;
1183                 pred = p;
1184                 if ((p = p.next) == pred) { // stale
1185                     pred = null;
1186                     p = head;
1187                 }
1188             }
1189         }
1190         return false;
1191     }
1192 
1193     /**
1194      * Creates an initially empty {@code LinkedTransferQueue}.
1195      */
LinkedTransferQueue()1196     public LinkedTransferQueue() {
1197     }
1198 
1199     /**
1200      * Creates a {@code LinkedTransferQueue}
1201      * initially containing the elements of the given collection,
1202      * added in traversal order of the collection's iterator.
1203      *
1204      * @param c the collection of elements to initially contain
1205      * @throws NullPointerException if the specified collection or any
1206      *         of its elements are null
1207      */
LinkedTransferQueue(Collection<? extends E> c)1208     public LinkedTransferQueue(Collection<? extends E> c) {
1209         this();
1210         addAll(c);
1211     }
1212 
1213     /**
1214      * Inserts the specified element at the tail of this queue.
1215      * As the queue is unbounded, this method will never block.
1216      *
1217      * @throws NullPointerException if the specified element is null
1218      */
put(E e)1219     public void put(E e) {
1220         xfer(e, true, ASYNC, 0);
1221     }
1222 
1223     /**
1224      * Inserts the specified element at the tail of this queue.
1225      * As the queue is unbounded, this method will never block or
1226      * return {@code false}.
1227      *
1228      * @return {@code true} (as specified by
1229      *  {@link java.util.concurrent.BlockingQueue#offer(Object,long,TimeUnit)
1230      *  BlockingQueue.offer})
1231      * @throws NullPointerException if the specified element is null
1232      */
offer(E e, long timeout, TimeUnit unit)1233     public boolean offer(E e, long timeout, TimeUnit unit) {
1234         xfer(e, true, ASYNC, 0);
1235         return true;
1236     }
1237 
1238     /**
1239      * Inserts the specified element at the tail of this queue.
1240      * As the queue is unbounded, this method will never return {@code false}.
1241      *
1242      * @return {@code true} (as specified by {@link Queue#offer})
1243      * @throws NullPointerException if the specified element is null
1244      */
offer(E e)1245     public boolean offer(E e) {
1246         xfer(e, true, ASYNC, 0);
1247         return true;
1248     }
1249 
1250     /**
1251      * Inserts the specified element at the tail of this queue.
1252      * As the queue is unbounded, this method will never throw
1253      * {@link IllegalStateException} or return {@code false}.
1254      *
1255      * @return {@code true} (as specified by {@link Collection#add})
1256      * @throws NullPointerException if the specified element is null
1257      */
add(E e)1258     public boolean add(E e) {
1259         xfer(e, true, ASYNC, 0);
1260         return true;
1261     }
1262 
1263     /**
1264      * Transfers the element to a waiting consumer immediately, if possible.
1265      *
1266      * <p>More precisely, transfers the specified element immediately
1267      * if there exists a consumer already waiting to receive it (in
1268      * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
1269      * otherwise returning {@code false} without enqueuing the element.
1270      *
1271      * @throws NullPointerException if the specified element is null
1272      */
tryTransfer(E e)1273     public boolean tryTransfer(E e) {
1274         return xfer(e, true, NOW, 0) == null;
1275     }
1276 
1277     /**
1278      * Transfers the element to a consumer, waiting if necessary to do so.
1279      *
1280      * <p>More precisely, transfers the specified element immediately
1281      * if there exists a consumer already waiting to receive it (in
1282      * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
1283      * else inserts the specified element at the tail of this queue
1284      * and waits until the element is received by a consumer.
1285      *
1286      * @throws NullPointerException if the specified element is null
1287      */
transfer(E e)1288     public void transfer(E e) throws InterruptedException {
1289         if (xfer(e, true, SYNC, 0) != null) {
1290             Thread.interrupted(); // failure possible only due to interrupt
1291             throw new InterruptedException();
1292         }
1293     }
1294 
1295     /**
1296      * Transfers the element to a consumer if it is possible to do so
1297      * before the timeout elapses.
1298      *
1299      * <p>More precisely, transfers the specified element immediately
1300      * if there exists a consumer already waiting to receive it (in
1301      * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
1302      * else inserts the specified element at the tail of this queue
1303      * and waits until the element is received by a consumer,
1304      * returning {@code false} if the specified wait time elapses
1305      * before the element can be transferred.
1306      *
1307      * @throws NullPointerException if the specified element is null
1308      */
tryTransfer(E e, long timeout, TimeUnit unit)1309     public boolean tryTransfer(E e, long timeout, TimeUnit unit)
1310         throws InterruptedException {
1311         if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
1312             return true;
1313         if (!Thread.interrupted())
1314             return false;
1315         throw new InterruptedException();
1316     }
1317 
take()1318     public E take() throws InterruptedException {
1319         E e = xfer(null, false, SYNC, 0);
1320         if (e != null)
1321             return e;
1322         Thread.interrupted();
1323         throw new InterruptedException();
1324     }
1325 
poll(long timeout, TimeUnit unit)1326     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
1327         E e = xfer(null, false, TIMED, unit.toNanos(timeout));
1328         if (e != null || !Thread.interrupted())
1329             return e;
1330         throw new InterruptedException();
1331     }
1332 
poll()1333     public E poll() {
1334         return xfer(null, false, NOW, 0);
1335     }
1336 
1337     /**
1338      * @throws NullPointerException     {@inheritDoc}
1339      * @throws IllegalArgumentException {@inheritDoc}
1340      */
drainTo(Collection<? super E> c)1341     public int drainTo(Collection<? super E> c) {
1342         if (c == null)
1343             throw new NullPointerException();
1344         if (c == this)
1345             throw new IllegalArgumentException();
1346         int n = 0;
1347         for (E e; (e = poll()) != null;) {
1348             c.add(e);
1349             ++n;
1350         }
1351         return n;
1352     }
1353 
1354     /**
1355      * @throws NullPointerException     {@inheritDoc}
1356      * @throws IllegalArgumentException {@inheritDoc}
1357      */
drainTo(Collection<? super E> c, int maxElements)1358     public int drainTo(Collection<? super E> c, int maxElements) {
1359         if (c == null)
1360             throw new NullPointerException();
1361         if (c == this)
1362             throw new IllegalArgumentException();
1363         int n = 0;
1364         for (E e; n < maxElements && (e = poll()) != null;) {
1365             c.add(e);
1366             ++n;
1367         }
1368         return n;
1369     }
1370 
1371     /**
1372      * Returns an iterator over the elements in this queue in proper sequence.
1373      * The elements will be returned in order from first (head) to last (tail).
1374      *
1375      * <p>The returned iterator is
1376      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1377      *
1378      * @return an iterator over the elements in this queue in proper sequence
1379      */
iterator()1380     public Iterator<E> iterator() {
1381         return new Itr();
1382     }
1383 
peek()1384     public E peek() {
1385         restartFromHead: for (;;) {
1386             for (Node p = head; p != null;) {
1387                 Object item = p.item;
1388                 if (p.isData) {
1389                     if (item != null && item != p) {
1390                         @SuppressWarnings("unchecked") E e = (E) item;
1391                         return e;
1392                     }
1393                 }
1394                 else if (item == null)
1395                     break;
1396                 if (p == (p = p.next))
1397                     continue restartFromHead;
1398             }
1399             return null;
1400         }
1401     }
1402 
1403     /**
1404      * Returns {@code true} if this queue contains no elements.
1405      *
1406      * @return {@code true} if this queue contains no elements
1407      */
isEmpty()1408     public boolean isEmpty() {
1409         return firstDataNode() == null;
1410     }
1411 
hasWaitingConsumer()1412     public boolean hasWaitingConsumer() {
1413         restartFromHead: for (;;) {
1414             for (Node p = head; p != null;) {
1415                 Object item = p.item;
1416                 if (p.isData) {
1417                     if (item != null && item != p)
1418                         break;
1419                 }
1420                 else if (item == null)
1421                     return true;
1422                 if (p == (p = p.next))
1423                     continue restartFromHead;
1424             }
1425             return false;
1426         }
1427     }
1428 
1429     /**
1430      * Returns the number of elements in this queue.  If this queue
1431      * contains more than {@code Integer.MAX_VALUE} elements, returns
1432      * {@code Integer.MAX_VALUE}.
1433      *
1434      * <p>Beware that, unlike in most collections, this method is
1435      * <em>NOT</em> a constant-time operation. Because of the
1436      * asynchronous nature of these queues, determining the current
1437      * number of elements requires an O(n) traversal.
1438      *
1439      * @return the number of elements in this queue
1440      */
size()1441     public int size() {
1442         return countOfMode(true);
1443     }
1444 
getWaitingConsumerCount()1445     public int getWaitingConsumerCount() {
1446         return countOfMode(false);
1447     }
1448 
1449     /**
1450      * Removes a single instance of the specified element from this queue,
1451      * if it is present.  More formally, removes an element {@code e} such
1452      * that {@code o.equals(e)}, if this queue contains one or more such
1453      * elements.
1454      * Returns {@code true} if this queue contained the specified element
1455      * (or equivalently, if this queue changed as a result of the call).
1456      *
1457      * @param o element to be removed from this queue, if present
1458      * @return {@code true} if this queue changed as a result of the call
1459      */
remove(Object o)1460     public boolean remove(Object o) {
1461         return findAndRemove(o);
1462     }
1463 
1464     /**
1465      * Returns {@code true} if this queue contains the specified element.
1466      * More formally, returns {@code true} if and only if this queue contains
1467      * at least one element {@code e} such that {@code o.equals(e)}.
1468      *
1469      * @param o object to be checked for containment in this queue
1470      * @return {@code true} if this queue contains the specified element
1471      */
contains(Object o)1472     public boolean contains(Object o) {
1473         if (o != null) {
1474             for (Node p = head; p != null; p = succ(p)) {
1475                 Object item = p.item;
1476                 if (p.isData) {
1477                     if (item != null && item != p && o.equals(item))
1478                         return true;
1479                 }
1480                 else if (item == null)
1481                     break;
1482             }
1483         }
1484         return false;
1485     }
1486 
1487     /**
1488      * Always returns {@code Integer.MAX_VALUE} because a
1489      * {@code LinkedTransferQueue} is not capacity constrained.
1490      *
1491      * @return {@code Integer.MAX_VALUE} (as specified by
1492      *         {@link java.util.concurrent.BlockingQueue#remainingCapacity()
1493      *         BlockingQueue.remainingCapacity})
1494      */
remainingCapacity()1495     public int remainingCapacity() {
1496         return Integer.MAX_VALUE;
1497     }
1498 
1499     /**
1500      * Saves this queue to a stream (that is, serializes it).
1501      *
1502      * @param s the stream
1503      * @throws java.io.IOException if an I/O error occurs
1504      * @serialData All of the elements (each an {@code E}) in
1505      * the proper order, followed by a null
1506      */
writeObject(java.io.ObjectOutputStream s)1507     private void writeObject(java.io.ObjectOutputStream s)
1508         throws java.io.IOException {
1509         s.defaultWriteObject();
1510         for (E e : this)
1511             s.writeObject(e);
1512         // Use trailing null as sentinel
1513         s.writeObject(null);
1514     }
1515 
1516     /**
1517      * Reconstitutes this queue from a stream (that is, deserializes it).
1518      * @param s the stream
1519      * @throws ClassNotFoundException if the class of a serialized object
1520      *         could not be found
1521      * @throws java.io.IOException if an I/O error occurs
1522      */
readObject(java.io.ObjectInputStream s)1523     private void readObject(java.io.ObjectInputStream s)
1524         throws java.io.IOException, ClassNotFoundException {
1525         s.defaultReadObject();
1526         for (;;) {
1527             @SuppressWarnings("unchecked")
1528             E item = (E) s.readObject();
1529             if (item == null)
1530                 break;
1531             else
1532                 offer(item);
1533         }
1534     }
1535 
1536     // Unsafe mechanics
1537 
1538     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1539     private static final long HEAD;
1540     private static final long TAIL;
1541     private static final long SWEEPVOTES;
1542     static {
1543         try {
1544             HEAD = U.objectFieldOffset
1545                 (LinkedTransferQueue.class.getDeclaredField("head"));
1546             TAIL = U.objectFieldOffset
1547                 (LinkedTransferQueue.class.getDeclaredField("tail"));
1548             SWEEPVOTES = U.objectFieldOffset
1549                 (LinkedTransferQueue.class.getDeclaredField("sweepVotes"));
1550         } catch (ReflectiveOperationException e) {
1551             throw new Error(e);
1552         }
1553 
1554         // Reduce the risk of rare disastrous classloading in first call to
1555         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1556         Class<?> ensureLoaded = LockSupport.class;
1557     }
1558 }
1559