1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import java.util.AbstractQueue;
39 import java.util.Arrays;
40 import java.util.Collection;
41 import java.util.Iterator;
42 import java.util.NoSuchElementException;
43 import java.util.Queue;
44 import java.util.Spliterator;
45 import java.util.Spliterators;
46 import java.util.concurrent.locks.LockSupport;
47 import java.util.function.Consumer;
48 
49 // BEGIN android-note
50 // removed link to collections framework docs
51 // END android-note
52 
53 /**
54  * An unbounded {@link TransferQueue} based on linked nodes.
55  * This queue orders elements FIFO (first-in-first-out) with respect
56  * to any given producer.  The <em>head</em> of the queue is that
57  * element that has been on the queue the longest time for some
58  * producer.  The <em>tail</em> of the queue is that element that has
59  * been on the queue the shortest time for some producer.
60  *
61  * <p>Beware that, unlike in most collections, the {@code size} method
62  * is <em>NOT</em> a constant-time operation. Because of the
63  * asynchronous nature of these queues, determining the current number
64  * of elements requires a traversal of the elements, and so may report
65  * inaccurate results if this collection is modified during traversal.
66  * Additionally, the bulk operations {@code addAll},
67  * {@code removeAll}, {@code retainAll}, {@code containsAll},
68  * {@code equals}, and {@code toArray} are <em>not</em> guaranteed
69  * to be performed atomically. For example, an iterator operating
70  * concurrently with an {@code addAll} operation might view only some
71  * of the added elements.
72  *
73  * <p>This class and its iterator implement all of the
74  * <em>optional</em> methods of the {@link Collection} and {@link
75  * Iterator} interfaces.
76  *
77  * <p>Memory consistency effects: As with other concurrent
78  * collections, actions in a thread prior to placing an object into a
79  * {@code LinkedTransferQueue}
80  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
81  * actions subsequent to the access or removal of that element from
82  * the {@code LinkedTransferQueue} in another thread.
83  *
84  * @since 1.7
85  * @author Doug Lea
86  * @param <E> the type of elements held in this queue
87  */
88 public class LinkedTransferQueue<E> extends AbstractQueue<E>
89     implements TransferQueue<E>, java.io.Serializable {
90     private static final long serialVersionUID = -3223113410248163686L;
91 
92     /*
93      * *** Overview of Dual Queues with Slack ***
94      *
95      * Dual Queues, introduced by Scherer and Scott
96      * (http://www.cs.rice.edu/~wns1/papers/2004-DISC-DDS.pdf) are
97      * (linked) queues in which nodes may represent either data or
98      * requests.  When a thread tries to enqueue a data node, but
99      * encounters a request node, it instead "matches" and removes it;
100      * and vice versa for enqueuing requests. Blocking Dual Queues
101      * arrange that threads enqueuing unmatched requests block until
102      * other threads provide the match. Dual Synchronous Queues (see
103      * Scherer, Lea, & Scott
104      * http://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf)
105      * additionally arrange that threads enqueuing unmatched data also
106      * block.  Dual Transfer Queues support all of these modes, as
107      * dictated by callers.
108      *
109      * A FIFO dual queue may be implemented using a variation of the
110      * Michael & Scott (M&S) lock-free queue algorithm
111      * (http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf).
112      * It maintains two pointer fields, "head", pointing to a
113      * (matched) node that in turn points to the first actual
114      * (unmatched) queue node (or null if empty); and "tail" that
115      * points to the last node on the queue (or again null if
116      * empty). For example, here is a possible queue with four data
117      * elements:
118      *
119      *  head                tail
120      *    |                   |
121      *    v                   v
122      *    M -> U -> U -> U -> U
123      *
124      * The M&S queue algorithm is known to be prone to scalability and
125      * overhead limitations when maintaining (via CAS) these head and
126      * tail pointers. This has led to the development of
127      * contention-reducing variants such as elimination arrays (see
128      * Moir et al http://portal.acm.org/citation.cfm?id=1074013) and
129      * optimistic back pointers (see Ladan-Mozes & Shavit
130      * http://people.csail.mit.edu/edya/publications/OptimisticFIFOQueue-journal.pdf).
131      * However, the nature of dual queues enables a simpler tactic for
132      * improving M&S-style implementations when dual-ness is needed.
133      *
134      * In a dual queue, each node must atomically maintain its match
135      * status. While there are other possible variants, we implement
136      * this here as: for a data-mode node, matching entails CASing an
137      * "item" field from a non-null data value to null upon match, and
138      * vice-versa for request nodes, CASing from null to a data
139      * value. (Note that the linearization properties of this style of
140      * queue are easy to verify -- elements are made available by
141      * linking, and unavailable by matching.) Compared to plain M&S
142      * queues, this property of dual queues requires one additional
143      * successful atomic operation per enq/deq pair. But it also
144      * enables lower cost variants of queue maintenance mechanics. (A
145      * variation of this idea applies even for non-dual queues that
146      * support deletion of interior elements, such as
147      * j.u.c.ConcurrentLinkedQueue.)
148      *
149      * Once a node is matched, its match status can never again
150      * change.  We may thus arrange that the linked list of them
151      * contain a prefix of zero or more matched nodes, followed by a
152      * suffix of zero or more unmatched nodes. (Note that we allow
153      * both the prefix and suffix to be zero length, which in turn
154      * means that we do not use a dummy header.)  If we were not
155      * concerned with either time or space efficiency, we could
156      * correctly perform enqueue and dequeue operations by traversing
157      * from a pointer to the initial node; CASing the item of the
158      * first unmatched node on match and CASing the next field of the
159      * trailing node on appends. (Plus some special-casing when
160      * initially empty).  While this would be a terrible idea in
161      * itself, it does have the benefit of not requiring ANY atomic
162      * updates on head/tail fields.
163      *
164      * We introduce here an approach that lies between the extremes of
165      * never versus always updating queue (head and tail) pointers.
166      * This offers a tradeoff between sometimes requiring extra
167      * traversal steps to locate the first and/or last unmatched
168      * nodes, versus the reduced overhead and contention of fewer
169      * updates to queue pointers. For example, a possible snapshot of
170      * a queue is:
171      *
172      *  head           tail
173      *    |              |
174      *    v              v
175      *    M -> M -> U -> U -> U -> U
176      *
177      * The best value for this "slack" (the targeted maximum distance
178      * between the value of "head" and the first unmatched node, and
179      * similarly for "tail") is an empirical matter. We have found
180      * that using very small constants in the range of 1-3 work best
181      * over a range of platforms. Larger values introduce increasing
182      * costs of cache misses and risks of long traversal chains, while
183      * smaller values increase CAS contention and overhead.
184      *
185      * Dual queues with slack differ from plain M&S dual queues by
186      * virtue of only sometimes updating head or tail pointers when
187      * matching, appending, or even traversing nodes; in order to
188      * maintain a targeted slack.  The idea of "sometimes" may be
189      * operationalized in several ways. The simplest is to use a
190      * per-operation counter incremented on each traversal step, and
191      * to try (via CAS) to update the associated queue pointer
192      * whenever the count exceeds a threshold. Another, that requires
193      * more overhead, is to use random number generators to update
194      * with a given probability per traversal step.
195      *
196      * In any strategy along these lines, because CASes updating
197      * fields may fail, the actual slack may exceed targeted
198      * slack. However, they may be retried at any time to maintain
199      * targets.  Even when using very small slack values, this
200      * approach works well for dual queues because it allows all
201      * operations up to the point of matching or appending an item
202      * (hence potentially allowing progress by another thread) to be
203      * read-only, thus not introducing any further contention. As
204      * described below, we implement this by performing slack
205      * maintenance retries only after these points.
206      *
207      * As an accompaniment to such techniques, traversal overhead can
208      * be further reduced without increasing contention of head
209      * pointer updates: Threads may sometimes shortcut the "next" link
210      * path from the current "head" node to be closer to the currently
211      * known first unmatched node, and similarly for tail. Again, this
212      * may be triggered with using thresholds or randomization.
213      *
214      * These ideas must be further extended to avoid unbounded amounts
215      * of costly-to-reclaim garbage caused by the sequential "next"
216      * links of nodes starting at old forgotten head nodes: As first
217      * described in detail by Boehm
218      * (http://portal.acm.org/citation.cfm?doid=503272.503282), if a GC
219      * delays noticing that any arbitrarily old node has become
220      * garbage, all newer dead nodes will also be unreclaimed.
221      * (Similar issues arise in non-GC environments.)  To cope with
222      * this in our implementation, upon CASing to advance the head
223      * pointer, we set the "next" link of the previous head to point
224      * only to itself; thus limiting the length of connected dead lists.
225      * (We also take similar care to wipe out possibly garbage
226      * retaining values held in other Node fields.)  However, doing so
227      * adds some further complexity to traversal: If any "next"
228      * pointer links to itself, it indicates that the current thread
229      * has lagged behind a head-update, and so the traversal must
230      * continue from the "head".  Traversals trying to find the
231      * current tail starting from "tail" may also encounter
232      * self-links, in which case they also continue at "head".
233      *
234      * It is tempting in slack-based scheme to not even use CAS for
235      * updates (similarly to Ladan-Mozes & Shavit). However, this
236      * cannot be done for head updates under the above link-forgetting
237      * mechanics because an update may leave head at a detached node.
238      * And while direct writes are possible for tail updates, they
239      * increase the risk of long retraversals, and hence long garbage
240      * chains, which can be much more costly than is worthwhile
241      * considering that the cost difference of performing a CAS vs
242      * write is smaller when they are not triggered on each operation
243      * (especially considering that writes and CASes equally require
244      * additional GC bookkeeping ("write barriers") that are sometimes
245      * more costly than the writes themselves because of contention).
246      *
247      * *** Overview of implementation ***
248      *
249      * We use a threshold-based approach to updates, with a slack
250      * threshold of two -- that is, we update head/tail when the
251      * current pointer appears to be two or more steps away from the
252      * first/last node. The slack value is hard-wired: a path greater
253      * than one is naturally implemented by checking equality of
254      * traversal pointers except when the list has only one element,
255      * in which case we keep slack threshold at one. Avoiding tracking
256      * explicit counts across method calls slightly simplifies an
257      * already-messy implementation. Using randomization would
258      * probably work better if there were a low-quality dirt-cheap
259      * per-thread one available, but even ThreadLocalRandom is too
260      * heavy for these purposes.
261      *
262      * With such a small slack threshold value, it is not worthwhile
263      * to augment this with path short-circuiting (i.e., unsplicing
264      * interior nodes) except in the case of cancellation/removal (see
265      * below).
266      *
267      * We allow both the head and tail fields to be null before any
268      * nodes are enqueued; initializing upon first append.  This
269      * simplifies some other logic, as well as providing more
270      * efficient explicit control paths instead of letting JVMs insert
271      * implicit NullPointerExceptions when they are null.  While not
272      * currently fully implemented, we also leave open the possibility
273      * of re-nulling these fields when empty (which is complicated to
274      * arrange, for little benefit.)
275      *
276      * All enqueue/dequeue operations are handled by the single method
277      * "xfer" with parameters indicating whether to act as some form
278      * of offer, put, poll, take, or transfer (each possibly with
279      * timeout). The relative complexity of using one monolithic
280      * method outweighs the code bulk and maintenance problems of
281      * using separate methods for each case.
282      *
283      * Operation consists of up to three phases. The first is
284      * implemented within method xfer, the second in tryAppend, and
285      * the third in method awaitMatch.
286      *
287      * 1. Try to match an existing node
288      *
289      *    Starting at head, skip already-matched nodes until finding
290      *    an unmatched node of opposite mode, if one exists, in which
291      *    case matching it and returning, also if necessary updating
292      *    head to one past the matched node (or the node itself if the
293      *    list has no other unmatched nodes). If the CAS misses, then
294      *    a loop retries advancing head by two steps until either
295      *    success or the slack is at most two. By requiring that each
296      *    attempt advances head by two (if applicable), we ensure that
297      *    the slack does not grow without bound. Traversals also check
298      *    if the initial head is now off-list, in which case they
299      *    start at the new head.
300      *
301      *    If no candidates are found and the call was untimed
302      *    poll/offer, (argument "how" is NOW) return.
303      *
304      * 2. Try to append a new node (method tryAppend)
305      *
306      *    Starting at current tail pointer, find the actual last node
307      *    and try to append a new node (or if head was null, establish
308      *    the first node). Nodes can be appended only if their
309      *    predecessors are either already matched or are of the same
310      *    mode. If we detect otherwise, then a new node with opposite
311      *    mode must have been appended during traversal, so we must
312      *    restart at phase 1. The traversal and update steps are
313      *    otherwise similar to phase 1: Retrying upon CAS misses and
314      *    checking for staleness.  In particular, if a self-link is
315      *    encountered, then we can safely jump to a node on the list
316      *    by continuing the traversal at current head.
317      *
318      *    On successful append, if the call was ASYNC, return.
319      *
320      * 3. Await match or cancellation (method awaitMatch)
321      *
322      *    Wait for another thread to match node; instead cancelling if
323      *    the current thread was interrupted or the wait timed out. On
324      *    multiprocessors, we use front-of-queue spinning: If a node
325      *    appears to be the first unmatched node in the queue, it
326      *    spins a bit before blocking. In either case, before blocking
327      *    it tries to unsplice any nodes between the current "head"
328      *    and the first unmatched node.
329      *
330      *    Front-of-queue spinning vastly improves performance of
331      *    heavily contended queues. And so long as it is relatively
332      *    brief and "quiet", spinning does not much impact performance
333      *    of less-contended queues.  During spins threads check their
334      *    interrupt status and generate a thread-local random number
335      *    to decide to occasionally perform a Thread.yield. While
336      *    yield has underdefined specs, we assume that it might help,
337      *    and will not hurt, in limiting impact of spinning on busy
338      *    systems.  We also use smaller (1/2) spins for nodes that are
339      *    not known to be front but whose predecessors have not
340      *    blocked -- these "chained" spins avoid artifacts of
341      *    front-of-queue rules which otherwise lead to alternating
342      *    nodes spinning vs blocking. Further, front threads that
343      *    represent phase changes (from data to request node or vice
344      *    versa) compared to their predecessors receive additional
345      *    chained spins, reflecting longer paths typically required to
346      *    unblock threads during phase changes.
347      *
348      *
349      * ** Unlinking removed interior nodes **
350      *
351      * In addition to minimizing garbage retention via self-linking
352      * described above, we also unlink removed interior nodes. These
353      * may arise due to timed out or interrupted waits, or calls to
354      * remove(x) or Iterator.remove.  Normally, given a node that was
355      * at one time known to be the predecessor of some node s that is
356      * to be removed, we can unsplice s by CASing the next field of
357      * its predecessor if it still points to s (otherwise s must
358      * already have been removed or is now offlist). But there are two
359      * situations in which we cannot guarantee to make node s
360      * unreachable in this way: (1) If s is the trailing node of list
361      * (i.e., with null next), then it is pinned as the target node
362      * for appends, so can only be removed later after other nodes are
363      * appended. (2) We cannot necessarily unlink s given a
364      * predecessor node that is matched (including the case of being
365      * cancelled): the predecessor may already be unspliced, in which
366      * case some previous reachable node may still point to s.
367      * (For further explanation see Herlihy & Shavit "The Art of
368      * Multiprocessor Programming" chapter 9).  Although, in both
369      * cases, we can rule out the need for further action if either s
370      * or its predecessor are (or can be made to be) at, or fall off
371      * from, the head of list.
372      *
373      * Without taking these into account, it would be possible for an
374      * unbounded number of supposedly removed nodes to remain
375      * reachable.  Situations leading to such buildup are uncommon but
376      * can occur in practice; for example when a series of short timed
377      * calls to poll repeatedly time out but never otherwise fall off
378      * the list because of an untimed call to take at the front of the
379      * queue.
380      *
381      * When these cases arise, rather than always retraversing the
382      * entire list to find an actual predecessor to unlink (which
383      * won't help for case (1) anyway), we record a conservative
384      * estimate of possible unsplice failures (in "sweepVotes").
385      * We trigger a full sweep when the estimate exceeds a threshold
386      * ("SWEEP_THRESHOLD") indicating the maximum number of estimated
387      * removal failures to tolerate before sweeping through, unlinking
388      * cancelled nodes that were not unlinked upon initial removal.
389      * We perform sweeps by the thread hitting threshold (rather than
390      * background threads or by spreading work to other threads)
391      * because in the main contexts in which removal occurs, the
392      * caller is already timed-out, cancelled, or performing a
393      * potentially O(n) operation (e.g. remove(x)), none of which are
394      * time-critical enough to warrant the overhead that alternatives
395      * would impose on other threads.
396      *
397      * Because the sweepVotes estimate is conservative, and because
398      * nodes become unlinked "naturally" as they fall off the head of
399      * the queue, and because we allow votes to accumulate even while
400      * sweeps are in progress, there are typically significantly fewer
401      * such nodes than estimated.  Choice of a threshold value
402      * balances the likelihood of wasted effort and contention, versus
403      * providing a worst-case bound on retention of interior nodes in
404      * quiescent queues. The value defined below was chosen
405      * empirically to balance these under various timeout scenarios.
406      *
407      * Note that we cannot self-link unlinked interior nodes during
408      * sweeps. However, the associated garbage chains terminate when
409      * some successor ultimately falls off the head of the list and is
410      * self-linked.
411      */
412 
413     /** True if on multiprocessor */
414     private static final boolean MP =
415         Runtime.getRuntime().availableProcessors() > 1;
416 
417     /**
418      * The number of times to spin (with randomly interspersed calls
419      * to Thread.yield) on multiprocessor before blocking when a node
420      * is apparently the first waiter in the queue.  See above for
421      * explanation. Must be a power of two. The value is empirically
422      * derived -- it works pretty well across a variety of processors,
423      * numbers of CPUs, and OSes.
424      */
425     private static final int FRONT_SPINS   = 1 << 7;
426 
427     /**
428      * The number of times to spin before blocking when a node is
429      * preceded by another node that is apparently spinning.  Also
430      * serves as an increment to FRONT_SPINS on phase changes, and as
431      * base average frequency for yielding during spins. Must be a
432      * power of two.
433      */
434     private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
435 
436     /**
437      * The maximum number of estimated removal failures (sweepVotes)
438      * to tolerate before sweeping through the queue unlinking
439      * cancelled nodes that were not unlinked upon initial
440      * removal. See above for explanation. The value must be at least
441      * two to avoid useless sweeps when removing trailing nodes.
442      */
443     static final int SWEEP_THRESHOLD = 32;
444 
445     /**
446      * Queue nodes. Uses Object, not E, for items to allow forgetting
447      * them after use.  Relies heavily on Unsafe mechanics to minimize
448      * unnecessary ordering constraints: Writes that are intrinsically
449      * ordered wrt other accesses or CASes use simple relaxed forms.
450      */
451     static final class Node {
452         final boolean isData;   // false if this is a request node
453         volatile Object item;   // initially non-null if isData; CASed to match
454         volatile Node next;
455         volatile Thread waiter; // null until waiting
456 
457         // CAS methods for fields
casNext(Node cmp, Node val)458         final boolean casNext(Node cmp, Node val) {
459             return U.compareAndSwapObject(this, NEXT, cmp, val);
460         }
461 
casItem(Object cmp, Object val)462         final boolean casItem(Object cmp, Object val) {
463             // assert cmp == null || cmp.getClass() != Node.class;
464             return U.compareAndSwapObject(this, ITEM, cmp, val);
465         }
466 
467         /**
468          * Constructs a new node.  Uses relaxed write because item can
469          * only be seen after publication via casNext.
470          */
Node(Object item, boolean isData)471         Node(Object item, boolean isData) {
472             U.putObject(this, ITEM, item); // relaxed write
473             this.isData = isData;
474         }
475 
476         /**
477          * Links node to itself to avoid garbage retention.  Called
478          * only after CASing head field, so uses relaxed write.
479          */
forgetNext()480         final void forgetNext() {
481             U.putObject(this, NEXT, this);
482         }
483 
484         /**
485          * Sets item to self and waiter to null, to avoid garbage
486          * retention after matching or cancelling. Uses relaxed writes
487          * because order is already constrained in the only calling
488          * contexts: item is forgotten only after volatile/atomic
489          * mechanics that extract items.  Similarly, clearing waiter
490          * follows either CAS or return from park (if ever parked;
491          * else we don't care).
492          */
forgetContents()493         final void forgetContents() {
494             U.putObject(this, ITEM, this);
495             U.putObject(this, WAITER, null);
496         }
497 
498         /**
499          * Returns true if this node has been matched, including the
500          * case of artificial matches due to cancellation.
501          */
isMatched()502         final boolean isMatched() {
503             Object x = item;
504             return (x == this) || ((x == null) == isData);
505         }
506 
507         /**
508          * Returns true if this is an unmatched request node.
509          */
isUnmatchedRequest()510         final boolean isUnmatchedRequest() {
511             return !isData && item == null;
512         }
513 
514         /**
515          * Returns true if a node with the given mode cannot be
516          * appended to this node because this node is unmatched and
517          * has opposite data mode.
518          */
cannotPrecede(boolean haveData)519         final boolean cannotPrecede(boolean haveData) {
520             boolean d = isData;
521             Object x;
522             return d != haveData && (x = item) != this && (x != null) == d;
523         }
524 
525         /**
526          * Tries to artificially match a data node -- used by remove.
527          */
tryMatchData()528         final boolean tryMatchData() {
529             // assert isData;
530             Object x = item;
531             if (x != null && x != this && casItem(x, null)) {
532                 LockSupport.unpark(waiter);
533                 return true;
534             }
535             return false;
536         }
537 
538         private static final long serialVersionUID = -3375979862319811754L;
539 
540         // Unsafe mechanics
541         private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
542         private static final long ITEM;
543         private static final long NEXT;
544         private static final long WAITER;
545         static {
546             try {
547                 ITEM = U.objectFieldOffset
548                     (Node.class.getDeclaredField("item"));
549                 NEXT = U.objectFieldOffset
550                     (Node.class.getDeclaredField("next"));
551                 WAITER = U.objectFieldOffset
552                     (Node.class.getDeclaredField("waiter"));
553             } catch (ReflectiveOperationException e) {
554                 throw new Error(e);
555             }
556         }
557     }
558 
559     /** head of the queue; null until first enqueue */
560     transient volatile Node head;
561 
562     /** tail of the queue; null until first append */
563     private transient volatile Node tail;
564 
565     /** The number of apparent failures to unsplice removed nodes */
566     private transient volatile int sweepVotes;
567 
568     // CAS methods for fields
casTail(Node cmp, Node val)569     private boolean casTail(Node cmp, Node val) {
570         return U.compareAndSwapObject(this, TAIL, cmp, val);
571     }
572 
casHead(Node cmp, Node val)573     private boolean casHead(Node cmp, Node val) {
574         return U.compareAndSwapObject(this, HEAD, cmp, val);
575     }
576 
casSweepVotes(int cmp, int val)577     private boolean casSweepVotes(int cmp, int val) {
578         return U.compareAndSwapInt(this, SWEEPVOTES, cmp, val);
579     }
580 
581     /*
582      * Possible values for "how" argument in xfer method.
583      */
584     private static final int NOW   = 0; // for untimed poll, tryTransfer
585     private static final int ASYNC = 1; // for offer, put, add
586     private static final int SYNC  = 2; // for transfer, take
587     private static final int TIMED = 3; // for timed poll, tryTransfer
588 
589     /**
590      * Implements all queuing methods. See above for explanation.
591      *
592      * @param e the item or null for take
593      * @param haveData true if this is a put, else a take
594      * @param how NOW, ASYNC, SYNC, or TIMED
595      * @param nanos timeout in nanosecs, used only if mode is TIMED
596      * @return an item if matched, else e
597      * @throws NullPointerException if haveData mode but e is null
598      */
xfer(E e, boolean haveData, int how, long nanos)599     private E xfer(E e, boolean haveData, int how, long nanos) {
600         if (haveData && (e == null))
601             throw new NullPointerException();
602         Node s = null;                        // the node to append, if needed
603 
604         retry:
605         for (;;) {                            // restart on append race
606 
607             for (Node h = head, p = h; p != null;) { // find & match first node
608                 boolean isData = p.isData;
609                 Object item = p.item;
610                 if (item != p && (item != null) == isData) { // unmatched
611                     if (isData == haveData)   // can't match
612                         break;
613                     if (p.casItem(item, e)) { // match
614                         for (Node q = p; q != h;) {
615                             Node n = q.next;  // update by 2 unless singleton
616                             if (head == h && casHead(h, n == null ? q : n)) {
617                                 h.forgetNext();
618                                 break;
619                             }                 // advance and retry
620                             if ((h = head)   == null ||
621                                 (q = h.next) == null || !q.isMatched())
622                                 break;        // unless slack < 2
623                         }
624                         LockSupport.unpark(p.waiter);
625                         @SuppressWarnings("unchecked") E itemE = (E) item;
626                         return itemE;
627                     }
628                 }
629                 Node n = p.next;
630                 p = (p != n) ? n : (h = head); // Use head if p offlist
631             }
632 
633             if (how != NOW) {                 // No matches available
634                 if (s == null)
635                     s = new Node(e, haveData);
636                 Node pred = tryAppend(s, haveData);
637                 if (pred == null)
638                     continue retry;           // lost race vs opposite mode
639                 if (how != ASYNC)
640                     return awaitMatch(s, pred, e, (how == TIMED), nanos);
641             }
642             return e; // not waiting
643         }
644     }
645 
646     /**
647      * Tries to append node s as tail.
648      *
649      * @param s the node to append
650      * @param haveData true if appending in data mode
651      * @return null on failure due to losing race with append in
652      * different mode, else s's predecessor, or s itself if no
653      * predecessor
654      */
tryAppend(Node s, boolean haveData)655     private Node tryAppend(Node s, boolean haveData) {
656         for (Node t = tail, p = t;;) {        // move p to last node and append
657             Node n, u;                        // temps for reads of next & tail
658             if (p == null && (p = head) == null) {
659                 if (casHead(null, s))
660                     return s;                 // initialize
661             }
662             else if (p.cannotPrecede(haveData))
663                 return null;                  // lost race vs opposite mode
664             else if ((n = p.next) != null)    // not last; keep traversing
665                 p = p != t && t != (u = tail) ? (t = u) : // stale tail
666                     (p != n) ? n : null;      // restart if off list
667             else if (!p.casNext(null, s))
668                 p = p.next;                   // re-read on CAS failure
669             else {
670                 if (p != t) {                 // update if slack now >= 2
671                     while ((tail != t || !casTail(t, s)) &&
672                            (t = tail)   != null &&
673                            (s = t.next) != null && // advance and retry
674                            (s = s.next) != null && s != t);
675                 }
676                 return p;
677             }
678         }
679     }
680 
681     /**
682      * Spins/yields/blocks until node s is matched or caller gives up.
683      *
684      * @param s the waiting node
685      * @param pred the predecessor of s, or s itself if it has no
686      * predecessor, or null if unknown (the null case does not occur
687      * in any current calls but may in possible future extensions)
688      * @param e the comparison value for checking match
689      * @param timed if true, wait only until timeout elapses
690      * @param nanos timeout in nanosecs, used only if timed is true
691      * @return matched item, or e if unmatched on interrupt or timeout
692      */
awaitMatch(Node s, Node pred, E e, boolean timed, long nanos)693     private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
694         final long deadline = timed ? System.nanoTime() + nanos : 0L;
695         Thread w = Thread.currentThread();
696         int spins = -1; // initialized after first item and cancel checks
697         ThreadLocalRandom randomYields = null; // bound if needed
698 
699         for (;;) {
700             Object item = s.item;
701             if (item != e) {                  // matched
702                 // assert item != s;
703                 s.forgetContents();           // avoid garbage
704                 @SuppressWarnings("unchecked") E itemE = (E) item;
705                 return itemE;
706             }
707             else if (w.isInterrupted() || (timed && nanos <= 0L)) {
708                 unsplice(pred, s);           // try to unlink and cancel
709                 if (s.casItem(e, s))         // return normally if lost CAS
710                     return e;
711             }
712             else if (spins < 0) {            // establish spins at/near front
713                 if ((spins = spinsFor(pred, s.isData)) > 0)
714                     randomYields = ThreadLocalRandom.current();
715             }
716             else if (spins > 0) {             // spin
717                 --spins;
718                 if (randomYields.nextInt(CHAINED_SPINS) == 0)
719                     Thread.yield();           // occasionally yield
720             }
721             else if (s.waiter == null) {
722                 s.waiter = w;                 // request unpark then recheck
723             }
724             else if (timed) {
725                 nanos = deadline - System.nanoTime();
726                 if (nanos > 0L)
727                     LockSupport.parkNanos(this, nanos);
728             }
729             else {
730                 LockSupport.park(this);
731             }
732         }
733     }
734 
735     /**
736      * Returns spin/yield value for a node with given predecessor and
737      * data mode. See above for explanation.
738      */
spinsFor(Node pred, boolean haveData)739     private static int spinsFor(Node pred, boolean haveData) {
740         if (MP && pred != null) {
741             if (pred.isData != haveData)      // phase change
742                 return FRONT_SPINS + CHAINED_SPINS;
743             if (pred.isMatched())             // probably at front
744                 return FRONT_SPINS;
745             if (pred.waiter == null)          // pred apparently spinning
746                 return CHAINED_SPINS;
747         }
748         return 0;
749     }
750 
751     /* -------------- Traversal methods -------------- */
752 
753     /**
754      * Returns the successor of p, or the head node if p.next has been
755      * linked to self, which will only be true if traversing with a
756      * stale pointer that is now off the list.
757      */
succ(Node p)758     final Node succ(Node p) {
759         Node next = p.next;
760         return (p == next) ? head : next;
761     }
762 
763     /**
764      * Returns the first unmatched data node, or null if none.
765      * Callers must recheck if the returned node's item field is null
766      * or self-linked before using.
767      */
firstDataNode()768     final Node firstDataNode() {
769         restartFromHead: for (;;) {
770             for (Node p = head; p != null;) {
771                 Object item = p.item;
772                 if (p.isData) {
773                     if (item != null && item != p)
774                         return p;
775                 }
776                 else if (item == null)
777                     break;
778                 if (p == (p = p.next))
779                     continue restartFromHead;
780             }
781             return null;
782         }
783     }
784 
785     /**
786      * Traverses and counts unmatched nodes of the given mode.
787      * Used by methods size and getWaitingConsumerCount.
788      */
countOfMode(boolean data)789     private int countOfMode(boolean data) {
790         restartFromHead: for (;;) {
791             int count = 0;
792             for (Node p = head; p != null;) {
793                 if (!p.isMatched()) {
794                     if (p.isData != data)
795                         return 0;
796                     if (++count == Integer.MAX_VALUE)
797                         break;  // @see Collection.size()
798                 }
799                 if (p == (p = p.next))
800                     continue restartFromHead;
801             }
802             return count;
803         }
804     }
805 
toString()806     public String toString() {
807         String[] a = null;
808         restartFromHead: for (;;) {
809             int charLength = 0;
810             int size = 0;
811             for (Node p = head; p != null;) {
812                 Object item = p.item;
813                 if (p.isData) {
814                     if (item != null && item != p) {
815                         if (a == null)
816                             a = new String[4];
817                         else if (size == a.length)
818                             a = Arrays.copyOf(a, 2 * size);
819                         String s = item.toString();
820                         a[size++] = s;
821                         charLength += s.length();
822                     }
823                 } else if (item == null)
824                     break;
825                 if (p == (p = p.next))
826                     continue restartFromHead;
827             }
828 
829             if (size == 0)
830                 return "[]";
831 
832             return Helpers.toString(a, size, charLength);
833         }
834     }
835 
toArrayInternal(Object[] a)836     private Object[] toArrayInternal(Object[] a) {
837         Object[] x = a;
838         restartFromHead: for (;;) {
839             int size = 0;
840             for (Node p = head; p != null;) {
841                 Object item = p.item;
842                 if (p.isData) {
843                     if (item != null && item != p) {
844                         if (x == null)
845                             x = new Object[4];
846                         else if (size == x.length)
847                             x = Arrays.copyOf(x, 2 * (size + 4));
848                         x[size++] = item;
849                     }
850                 } else if (item == null)
851                     break;
852                 if (p == (p = p.next))
853                     continue restartFromHead;
854             }
855             if (x == null)
856                 return new Object[0];
857             else if (a != null && size <= a.length) {
858                 if (a != x)
859                     System.arraycopy(x, 0, a, 0, size);
860                 if (size < a.length)
861                     a[size] = null;
862                 return a;
863             }
864             return (size == x.length) ? x : Arrays.copyOf(x, size);
865         }
866     }
867 
868     /**
869      * Returns an array containing all of the elements in this queue, in
870      * proper sequence.
871      *
872      * <p>The returned array will be "safe" in that no references to it are
873      * maintained by this queue.  (In other words, this method must allocate
874      * a new array).  The caller is thus free to modify the returned array.
875      *
876      * <p>This method acts as bridge between array-based and collection-based
877      * APIs.
878      *
879      * @return an array containing all of the elements in this queue
880      */
toArray()881     public Object[] toArray() {
882         return toArrayInternal(null);
883     }
884 
885     /**
886      * Returns an array containing all of the elements in this queue, in
887      * proper sequence; the runtime type of the returned array is that of
888      * the specified array.  If the queue fits in the specified array, it
889      * is returned therein.  Otherwise, a new array is allocated with the
890      * runtime type of the specified array and the size of this queue.
891      *
892      * <p>If this queue fits in the specified array with room to spare
893      * (i.e., the array has more elements than this queue), the element in
894      * the array immediately following the end of the queue is set to
895      * {@code null}.
896      *
897      * <p>Like the {@link #toArray()} method, this method acts as bridge between
898      * array-based and collection-based APIs.  Further, this method allows
899      * precise control over the runtime type of the output array, and may,
900      * under certain circumstances, be used to save allocation costs.
901      *
902      * <p>Suppose {@code x} is a queue known to contain only strings.
903      * The following code can be used to dump the queue into a newly
904      * allocated array of {@code String}:
905      *
906      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
907      *
908      * Note that {@code toArray(new Object[0])} is identical in function to
909      * {@code toArray()}.
910      *
911      * @param a the array into which the elements of the queue are to
912      *          be stored, if it is big enough; otherwise, a new array of the
913      *          same runtime type is allocated for this purpose
914      * @return an array containing all of the elements in this queue
915      * @throws ArrayStoreException if the runtime type of the specified array
916      *         is not a supertype of the runtime type of every element in
917      *         this queue
918      * @throws NullPointerException if the specified array is null
919      */
920     @SuppressWarnings("unchecked")
toArray(T[] a)921     public <T> T[] toArray(T[] a) {
922         if (a == null) throw new NullPointerException();
923         return (T[]) toArrayInternal(a);
924     }
925 
926     final class Itr implements Iterator<E> {
927         private Node nextNode;   // next node to return item for
928         private E nextItem;      // the corresponding item
929         private Node lastRet;    // last returned node, to support remove
930         private Node lastPred;   // predecessor to unlink lastRet
931 
932         /**
933          * Moves to next node after prev, or first node if prev null.
934          */
advance(Node prev)935         private void advance(Node prev) {
936             /*
937              * To track and avoid buildup of deleted nodes in the face
938              * of calls to both Queue.remove and Itr.remove, we must
939              * include variants of unsplice and sweep upon each
940              * advance: Upon Itr.remove, we may need to catch up links
941              * from lastPred, and upon other removes, we might need to
942              * skip ahead from stale nodes and unsplice deleted ones
943              * found while advancing.
944              */
945 
946             Node r, b; // reset lastPred upon possible deletion of lastRet
947             if ((r = lastRet) != null && !r.isMatched())
948                 lastPred = r;    // next lastPred is old lastRet
949             else if ((b = lastPred) == null || b.isMatched())
950                 lastPred = null; // at start of list
951             else {
952                 Node s, n;       // help with removal of lastPred.next
953                 while ((s = b.next) != null &&
954                        s != b && s.isMatched() &&
955                        (n = s.next) != null && n != s)
956                     b.casNext(s, n);
957             }
958 
959             this.lastRet = prev;
960 
961             for (Node p = prev, s, n;;) {
962                 s = (p == null) ? head : p.next;
963                 if (s == null)
964                     break;
965                 else if (s == p) {
966                     p = null;
967                     continue;
968                 }
969                 Object item = s.item;
970                 if (s.isData) {
971                     if (item != null && item != s) {
972                         @SuppressWarnings("unchecked") E itemE = (E) item;
973                         nextItem = itemE;
974                         nextNode = s;
975                         return;
976                     }
977                 }
978                 else if (item == null)
979                     break;
980                 // assert s.isMatched();
981                 if (p == null)
982                     p = s;
983                 else if ((n = s.next) == null)
984                     break;
985                 else if (s == n)
986                     p = null;
987                 else
988                     p.casNext(s, n);
989             }
990             nextNode = null;
991             nextItem = null;
992         }
993 
Itr()994         Itr() {
995             advance(null);
996         }
997 
hasNext()998         public final boolean hasNext() {
999             return nextNode != null;
1000         }
1001 
next()1002         public final E next() {
1003             Node p = nextNode;
1004             if (p == null) throw new NoSuchElementException();
1005             E e = nextItem;
1006             advance(p);
1007             return e;
1008         }
1009 
remove()1010         public final void remove() {
1011             final Node lastRet = this.lastRet;
1012             if (lastRet == null)
1013                 throw new IllegalStateException();
1014             this.lastRet = null;
1015             if (lastRet.tryMatchData())
1016                 unsplice(lastPred, lastRet);
1017         }
1018     }
1019 
1020     /** A customized variant of Spliterators.IteratorSpliterator */
1021     final class LTQSpliterator<E> implements Spliterator<E> {
1022         static final int MAX_BATCH = 1 << 25;  // max batch array size;
1023         Node current;       // current node; null until initialized
1024         int batch;          // batch size for splits
1025         boolean exhausted;  // true when no more nodes
LTQSpliterator()1026         LTQSpliterator() {}
1027 
trySplit()1028         public Spliterator<E> trySplit() {
1029             Node p;
1030             int b = batch;
1031             int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
1032             if (!exhausted &&
1033                 ((p = current) != null || (p = firstDataNode()) != null) &&
1034                 p.next != null) {
1035                 Object[] a = new Object[n];
1036                 int i = 0;
1037                 do {
1038                     Object e = p.item;
1039                     if (e != p && (a[i] = e) != null)
1040                         ++i;
1041                     if (p == (p = p.next))
1042                         p = firstDataNode();
1043                 } while (p != null && i < n && p.isData);
1044                 if ((current = p) == null)
1045                     exhausted = true;
1046                 if (i > 0) {
1047                     batch = i;
1048                     return Spliterators.spliterator
1049                         (a, 0, i, (Spliterator.ORDERED |
1050                                    Spliterator.NONNULL |
1051                                    Spliterator.CONCURRENT));
1052                 }
1053             }
1054             return null;
1055         }
1056 
1057         @SuppressWarnings("unchecked")
forEachRemaining(Consumer<? super E> action)1058         public void forEachRemaining(Consumer<? super E> action) {
1059             Node p;
1060             if (action == null) throw new NullPointerException();
1061             if (!exhausted &&
1062                 ((p = current) != null || (p = firstDataNode()) != null)) {
1063                 exhausted = true;
1064                 do {
1065                     Object e = p.item;
1066                     if (e != null && e != p)
1067                         action.accept((E)e);
1068                     if (p == (p = p.next))
1069                         p = firstDataNode();
1070                 } while (p != null && p.isData);
1071             }
1072         }
1073 
1074         @SuppressWarnings("unchecked")
tryAdvance(Consumer<? super E> action)1075         public boolean tryAdvance(Consumer<? super E> action) {
1076             Node p;
1077             if (action == null) throw new NullPointerException();
1078             if (!exhausted &&
1079                 ((p = current) != null || (p = firstDataNode()) != null)) {
1080                 Object e;
1081                 do {
1082                     if ((e = p.item) == p)
1083                         e = null;
1084                     if (p == (p = p.next))
1085                         p = firstDataNode();
1086                 } while (e == null && p != null && p.isData);
1087                 if ((current = p) == null)
1088                     exhausted = true;
1089                 if (e != null) {
1090                     action.accept((E)e);
1091                     return true;
1092                 }
1093             }
1094             return false;
1095         }
1096 
estimateSize()1097         public long estimateSize() { return Long.MAX_VALUE; }
1098 
characteristics()1099         public int characteristics() {
1100             return Spliterator.ORDERED | Spliterator.NONNULL |
1101                 Spliterator.CONCURRENT;
1102         }
1103     }
1104 
1105     /**
1106      * Returns a {@link Spliterator} over the elements in this queue.
1107      *
1108      * <p>The returned spliterator is
1109      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1110      *
1111      * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
1112      * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
1113      *
1114      * @implNote
1115      * The {@code Spliterator} implements {@code trySplit} to permit limited
1116      * parallelism.
1117      *
1118      * @return a {@code Spliterator} over the elements in this queue
1119      * @since 1.8
1120      */
spliterator()1121     public Spliterator<E> spliterator() {
1122         return new LTQSpliterator<E>();
1123     }
1124 
1125     /* -------------- Removal methods -------------- */
1126 
1127     /**
1128      * Unsplices (now or later) the given deleted/cancelled node with
1129      * the given predecessor.
1130      *
1131      * @param pred a node that was at one time known to be the
1132      * predecessor of s, or null or s itself if s is/was at head
1133      * @param s the node to be unspliced
1134      */
unsplice(Node pred, Node s)1135     final void unsplice(Node pred, Node s) {
1136         s.waiter = null; // disable signals
1137         /*
1138          * See above for rationale. Briefly: if pred still points to
1139          * s, try to unlink s.  If s cannot be unlinked, because it is
1140          * trailing node or pred might be unlinked, and neither pred
1141          * nor s are head or offlist, add to sweepVotes, and if enough
1142          * votes have accumulated, sweep.
1143          */
1144         if (pred != null && pred != s && pred.next == s) {
1145             Node n = s.next;
1146             if (n == null ||
1147                 (n != s && pred.casNext(s, n) && pred.isMatched())) {
1148                 for (;;) {               // check if at, or could be, head
1149                     Node h = head;
1150                     if (h == pred || h == s || h == null)
1151                         return;          // at head or list empty
1152                     if (!h.isMatched())
1153                         break;
1154                     Node hn = h.next;
1155                     if (hn == null)
1156                         return;          // now empty
1157                     if (hn != h && casHead(h, hn))
1158                         h.forgetNext();  // advance head
1159                 }
1160                 if (pred.next != pred && s.next != s) { // recheck if offlist
1161                     for (;;) {           // sweep now if enough votes
1162                         int v = sweepVotes;
1163                         if (v < SWEEP_THRESHOLD) {
1164                             if (casSweepVotes(v, v + 1))
1165                                 break;
1166                         }
1167                         else if (casSweepVotes(v, 0)) {
1168                             sweep();
1169                             break;
1170                         }
1171                     }
1172                 }
1173             }
1174         }
1175     }
1176 
1177     /**
1178      * Unlinks matched (typically cancelled) nodes encountered in a
1179      * traversal from head.
1180      */
sweep()1181     private void sweep() {
1182         for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
1183             if (!s.isMatched())
1184                 // Unmatched nodes are never self-linked
1185                 p = s;
1186             else if ((n = s.next) == null) // trailing node is pinned
1187                 break;
1188             else if (s == n)    // stale
1189                 // No need to also check for p == s, since that implies s == n
1190                 p = head;
1191             else
1192                 p.casNext(s, n);
1193         }
1194     }
1195 
1196     /**
1197      * Main implementation of remove(Object)
1198      */
findAndRemove(Object e)1199     private boolean findAndRemove(Object e) {
1200         if (e != null) {
1201             for (Node pred = null, p = head; p != null; ) {
1202                 Object item = p.item;
1203                 if (p.isData) {
1204                     if (item != null && item != p && e.equals(item) &&
1205                         p.tryMatchData()) {
1206                         unsplice(pred, p);
1207                         return true;
1208                     }
1209                 }
1210                 else if (item == null)
1211                     break;
1212                 pred = p;
1213                 if ((p = p.next) == pred) { // stale
1214                     pred = null;
1215                     p = head;
1216                 }
1217             }
1218         }
1219         return false;
1220     }
1221 
1222     /**
1223      * Creates an initially empty {@code LinkedTransferQueue}.
1224      */
LinkedTransferQueue()1225     public LinkedTransferQueue() {
1226     }
1227 
1228     /**
1229      * Creates a {@code LinkedTransferQueue}
1230      * initially containing the elements of the given collection,
1231      * added in traversal order of the collection's iterator.
1232      *
1233      * @param c the collection of elements to initially contain
1234      * @throws NullPointerException if the specified collection or any
1235      *         of its elements are null
1236      */
LinkedTransferQueue(Collection<? extends E> c)1237     public LinkedTransferQueue(Collection<? extends E> c) {
1238         this();
1239         addAll(c);
1240     }
1241 
1242     /**
1243      * Inserts the specified element at the tail of this queue.
1244      * As the queue is unbounded, this method will never block.
1245      *
1246      * @throws NullPointerException if the specified element is null
1247      */
put(E e)1248     public void put(E e) {
1249         xfer(e, true, ASYNC, 0);
1250     }
1251 
1252     /**
1253      * Inserts the specified element at the tail of this queue.
1254      * As the queue is unbounded, this method will never block or
1255      * return {@code false}.
1256      *
1257      * @return {@code true} (as specified by
1258      *  {@link java.util.concurrent.BlockingQueue#offer(Object,long,TimeUnit)
1259      *  BlockingQueue.offer})
1260      * @throws NullPointerException if the specified element is null
1261      */
offer(E e, long timeout, TimeUnit unit)1262     public boolean offer(E e, long timeout, TimeUnit unit) {
1263         xfer(e, true, ASYNC, 0);
1264         return true;
1265     }
1266 
1267     /**
1268      * Inserts the specified element at the tail of this queue.
1269      * As the queue is unbounded, this method will never return {@code false}.
1270      *
1271      * @return {@code true} (as specified by {@link Queue#offer})
1272      * @throws NullPointerException if the specified element is null
1273      */
offer(E e)1274     public boolean offer(E e) {
1275         xfer(e, true, ASYNC, 0);
1276         return true;
1277     }
1278 
1279     /**
1280      * Inserts the specified element at the tail of this queue.
1281      * As the queue is unbounded, this method will never throw
1282      * {@link IllegalStateException} or return {@code false}.
1283      *
1284      * @return {@code true} (as specified by {@link Collection#add})
1285      * @throws NullPointerException if the specified element is null
1286      */
add(E e)1287     public boolean add(E e) {
1288         xfer(e, true, ASYNC, 0);
1289         return true;
1290     }
1291 
1292     /**
1293      * Transfers the element to a waiting consumer immediately, if possible.
1294      *
1295      * <p>More precisely, transfers the specified element immediately
1296      * if there exists a consumer already waiting to receive it (in
1297      * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
1298      * otherwise returning {@code false} without enqueuing the element.
1299      *
1300      * @throws NullPointerException if the specified element is null
1301      */
tryTransfer(E e)1302     public boolean tryTransfer(E e) {
1303         return xfer(e, true, NOW, 0) == null;
1304     }
1305 
1306     /**
1307      * Transfers the element to a consumer, waiting if necessary to do so.
1308      *
1309      * <p>More precisely, transfers the specified element immediately
1310      * if there exists a consumer already waiting to receive it (in
1311      * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
1312      * else inserts the specified element at the tail of this queue
1313      * and waits until the element is received by a consumer.
1314      *
1315      * @throws NullPointerException if the specified element is null
1316      */
transfer(E e)1317     public void transfer(E e) throws InterruptedException {
1318         if (xfer(e, true, SYNC, 0) != null) {
1319             Thread.interrupted(); // failure possible only due to interrupt
1320             throw new InterruptedException();
1321         }
1322     }
1323 
1324     /**
1325      * Transfers the element to a consumer if it is possible to do so
1326      * before the timeout elapses.
1327      *
1328      * <p>More precisely, transfers the specified element immediately
1329      * if there exists a consumer already waiting to receive it (in
1330      * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
1331      * else inserts the specified element at the tail of this queue
1332      * and waits until the element is received by a consumer,
1333      * returning {@code false} if the specified wait time elapses
1334      * before the element can be transferred.
1335      *
1336      * @throws NullPointerException if the specified element is null
1337      */
tryTransfer(E e, long timeout, TimeUnit unit)1338     public boolean tryTransfer(E e, long timeout, TimeUnit unit)
1339         throws InterruptedException {
1340         if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
1341             return true;
1342         if (!Thread.interrupted())
1343             return false;
1344         throw new InterruptedException();
1345     }
1346 
take()1347     public E take() throws InterruptedException {
1348         E e = xfer(null, false, SYNC, 0);
1349         if (e != null)
1350             return e;
1351         Thread.interrupted();
1352         throw new InterruptedException();
1353     }
1354 
poll(long timeout, TimeUnit unit)1355     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
1356         E e = xfer(null, false, TIMED, unit.toNanos(timeout));
1357         if (e != null || !Thread.interrupted())
1358             return e;
1359         throw new InterruptedException();
1360     }
1361 
poll()1362     public E poll() {
1363         return xfer(null, false, NOW, 0);
1364     }
1365 
1366     /**
1367      * @throws NullPointerException     {@inheritDoc}
1368      * @throws IllegalArgumentException {@inheritDoc}
1369      */
drainTo(Collection<? super E> c)1370     public int drainTo(Collection<? super E> c) {
1371         if (c == null)
1372             throw new NullPointerException();
1373         if (c == this)
1374             throw new IllegalArgumentException();
1375         int n = 0;
1376         for (E e; (e = poll()) != null;) {
1377             c.add(e);
1378             ++n;
1379         }
1380         return n;
1381     }
1382 
1383     /**
1384      * @throws NullPointerException     {@inheritDoc}
1385      * @throws IllegalArgumentException {@inheritDoc}
1386      */
drainTo(Collection<? super E> c, int maxElements)1387     public int drainTo(Collection<? super E> c, int maxElements) {
1388         if (c == null)
1389             throw new NullPointerException();
1390         if (c == this)
1391             throw new IllegalArgumentException();
1392         int n = 0;
1393         for (E e; n < maxElements && (e = poll()) != null;) {
1394             c.add(e);
1395             ++n;
1396         }
1397         return n;
1398     }
1399 
1400     /**
1401      * Returns an iterator over the elements in this queue in proper sequence.
1402      * The elements will be returned in order from first (head) to last (tail).
1403      *
1404      * <p>The returned iterator is
1405      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1406      *
1407      * @return an iterator over the elements in this queue in proper sequence
1408      */
iterator()1409     public Iterator<E> iterator() {
1410         return new Itr();
1411     }
1412 
peek()1413     public E peek() {
1414         restartFromHead: for (;;) {
1415             for (Node p = head; p != null;) {
1416                 Object item = p.item;
1417                 if (p.isData) {
1418                     if (item != null && item != p) {
1419                         @SuppressWarnings("unchecked") E e = (E) item;
1420                         return e;
1421                     }
1422                 }
1423                 else if (item == null)
1424                     break;
1425                 if (p == (p = p.next))
1426                     continue restartFromHead;
1427             }
1428             return null;
1429         }
1430     }
1431 
1432     /**
1433      * Returns {@code true} if this queue contains no elements.
1434      *
1435      * @return {@code true} if this queue contains no elements
1436      */
isEmpty()1437     public boolean isEmpty() {
1438         return firstDataNode() == null;
1439     }
1440 
hasWaitingConsumer()1441     public boolean hasWaitingConsumer() {
1442         restartFromHead: for (;;) {
1443             for (Node p = head; p != null;) {
1444                 Object item = p.item;
1445                 if (p.isData) {
1446                     if (item != null && item != p)
1447                         break;
1448                 }
1449                 else if (item == null)
1450                     return true;
1451                 if (p == (p = p.next))
1452                     continue restartFromHead;
1453             }
1454             return false;
1455         }
1456     }
1457 
1458     /**
1459      * Returns the number of elements in this queue.  If this queue
1460      * contains more than {@code Integer.MAX_VALUE} elements, returns
1461      * {@code Integer.MAX_VALUE}.
1462      *
1463      * <p>Beware that, unlike in most collections, this method is
1464      * <em>NOT</em> a constant-time operation. Because of the
1465      * asynchronous nature of these queues, determining the current
1466      * number of elements requires an O(n) traversal.
1467      *
1468      * @return the number of elements in this queue
1469      */
size()1470     public int size() {
1471         return countOfMode(true);
1472     }
1473 
getWaitingConsumerCount()1474     public int getWaitingConsumerCount() {
1475         return countOfMode(false);
1476     }
1477 
1478     /**
1479      * Removes a single instance of the specified element from this queue,
1480      * if it is present.  More formally, removes an element {@code e} such
1481      * that {@code o.equals(e)}, if this queue contains one or more such
1482      * elements.
1483      * Returns {@code true} if this queue contained the specified element
1484      * (or equivalently, if this queue changed as a result of the call).
1485      *
1486      * @param o element to be removed from this queue, if present
1487      * @return {@code true} if this queue changed as a result of the call
1488      */
remove(Object o)1489     public boolean remove(Object o) {
1490         return findAndRemove(o);
1491     }
1492 
1493     /**
1494      * Returns {@code true} if this queue contains the specified element.
1495      * More formally, returns {@code true} if and only if this queue contains
1496      * at least one element {@code e} such that {@code o.equals(e)}.
1497      *
1498      * @param o object to be checked for containment in this queue
1499      * @return {@code true} if this queue contains the specified element
1500      */
contains(Object o)1501     public boolean contains(Object o) {
1502         if (o != null) {
1503             for (Node p = head; p != null; p = succ(p)) {
1504                 Object item = p.item;
1505                 if (p.isData) {
1506                     if (item != null && item != p && o.equals(item))
1507                         return true;
1508                 }
1509                 else if (item == null)
1510                     break;
1511             }
1512         }
1513         return false;
1514     }
1515 
1516     /**
1517      * Always returns {@code Integer.MAX_VALUE} because a
1518      * {@code LinkedTransferQueue} is not capacity constrained.
1519      *
1520      * @return {@code Integer.MAX_VALUE} (as specified by
1521      *         {@link java.util.concurrent.BlockingQueue#remainingCapacity()
1522      *         BlockingQueue.remainingCapacity})
1523      */
remainingCapacity()1524     public int remainingCapacity() {
1525         return Integer.MAX_VALUE;
1526     }
1527 
1528     /**
1529      * Saves this queue to a stream (that is, serializes it).
1530      *
1531      * @param s the stream
1532      * @throws java.io.IOException if an I/O error occurs
1533      * @serialData All of the elements (each an {@code E}) in
1534      * the proper order, followed by a null
1535      */
writeObject(java.io.ObjectOutputStream s)1536     private void writeObject(java.io.ObjectOutputStream s)
1537         throws java.io.IOException {
1538         s.defaultWriteObject();
1539         for (E e : this)
1540             s.writeObject(e);
1541         // Use trailing null as sentinel
1542         s.writeObject(null);
1543     }
1544 
1545     /**
1546      * Reconstitutes this queue from a stream (that is, deserializes it).
1547      * @param s the stream
1548      * @throws ClassNotFoundException if the class of a serialized object
1549      *         could not be found
1550      * @throws java.io.IOException if an I/O error occurs
1551      */
readObject(java.io.ObjectInputStream s)1552     private void readObject(java.io.ObjectInputStream s)
1553         throws java.io.IOException, ClassNotFoundException {
1554         s.defaultReadObject();
1555         for (;;) {
1556             @SuppressWarnings("unchecked")
1557             E item = (E) s.readObject();
1558             if (item == null)
1559                 break;
1560             else
1561                 offer(item);
1562         }
1563     }
1564 
1565     // Unsafe mechanics
1566 
1567     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
1568     private static final long HEAD;
1569     private static final long TAIL;
1570     private static final long SWEEPVOTES;
1571     static {
1572         try {
1573             HEAD = U.objectFieldOffset
1574                 (LinkedTransferQueue.class.getDeclaredField("head"));
1575             TAIL = U.objectFieldOffset
1576                 (LinkedTransferQueue.class.getDeclaredField("tail"));
1577             SWEEPVOTES = U.objectFieldOffset
1578                 (LinkedTransferQueue.class.getDeclaredField("sweepVotes"));
1579         } catch (ReflectiveOperationException e) {
1580             throw new Error(e);
1581         }
1582 
1583         // Reduce the risk of rare disastrous classloading in first call to
1584         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1585         Class<?> ensureLoaded = LockSupport.class;
1586     }
1587 }
1588