1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import java.util.AbstractQueue;
39 import java.util.Collection;
40 import java.util.Iterator;
41 import java.util.NoSuchElementException;
42 import java.util.Spliterator;
43 import java.util.Spliterators;
44 import java.util.concurrent.atomic.AtomicInteger;
45 import java.util.concurrent.locks.Condition;
46 import java.util.concurrent.locks.ReentrantLock;
47 import java.util.function.Consumer;
48 
49 // BEGIN android-note
50 // removed link to collections framework docs
51 // END android-note
52 
53 /**
54  * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
55  * linked nodes.
56  * This queue orders elements FIFO (first-in-first-out).
57  * The <em>head</em> of the queue is that element that has been on the
58  * queue the longest time.
59  * The <em>tail</em> of the queue is that element that has been on the
60  * queue the shortest time. New elements
61  * are inserted at the tail of the queue, and the queue retrieval
62  * operations obtain elements at the head of the queue.
63  * Linked queues typically have higher throughput than array-based queues but
64  * less predictable performance in most concurrent applications.
65  *
66  * <p>The optional capacity bound constructor argument serves as a
67  * way to prevent excessive queue expansion. The capacity, if unspecified,
68  * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
69  * dynamically created upon each insertion unless this would bring the
70  * queue above capacity.
71  *
72  * <p>This class and its iterator implement all of the
73  * <em>optional</em> methods of the {@link Collection} and {@link
74  * Iterator} interfaces.
75  *
76  * @since 1.5
77  * @author Doug Lea
78  * @param <E> the type of elements held in this queue
79  */
80 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
81         implements BlockingQueue<E>, java.io.Serializable {
82     private static final long serialVersionUID = -6903933977591709194L;
83 
84     /*
85      * A variant of the "two lock queue" algorithm.  The putLock gates
86      * entry to put (and offer), and has an associated condition for
87      * waiting puts.  Similarly for the takeLock.  The "count" field
88      * that they both rely on is maintained as an atomic to avoid
89      * needing to get both locks in most cases. Also, to minimize need
90      * for puts to get takeLock and vice-versa, cascading notifies are
91      * used. When a put notices that it has enabled at least one take,
92      * it signals taker. That taker in turn signals others if more
93      * items have been entered since the signal. And symmetrically for
94      * takes signalling puts. Operations such as remove(Object) and
95      * iterators acquire both locks.
96      *
97      * Visibility between writers and readers is provided as follows:
98      *
99      * Whenever an element is enqueued, the putLock is acquired and
100      * count updated.  A subsequent reader guarantees visibility to the
101      * enqueued Node by either acquiring the putLock (via fullyLock)
102      * or by acquiring the takeLock, and then reading n = count.get();
103      * this gives visibility to the first n items.
104      *
105      * To implement weakly consistent iterators, it appears we need to
106      * keep all Nodes GC-reachable from a predecessor dequeued Node.
107      * That would cause two problems:
108      * - allow a rogue Iterator to cause unbounded memory retention
109      * - cause cross-generational linking of old Nodes to new Nodes if
110      *   a Node was tenured while live, which generational GCs have a
111      *   hard time dealing with, causing repeated major collections.
112      * However, only non-deleted Nodes need to be reachable from
113      * dequeued Nodes, and reachability does not necessarily have to
114      * be of the kind understood by the GC.  We use the trick of
115      * linking a Node that has just been dequeued to itself.  Such a
116      * self-link implicitly means to advance to head.next.
117      */
118 
119     /**
120      * Linked list node class.
121      */
122     static class Node<E> {
123         E item;
124 
125         /**
126          * One of:
127          * - the real successor Node
128          * - this Node, meaning the successor is head.next
129          * - null, meaning there is no successor (this is the last node)
130          */
131         Node<E> next;
132 
Node(E x)133         Node(E x) { item = x; }
134     }
135 
136     /** The capacity bound, or Integer.MAX_VALUE if none */
137     private final int capacity;
138 
139     /** Current number of elements */
140     private final AtomicInteger count = new AtomicInteger();
141 
142     /**
143      * Head of linked list.
144      * Invariant: head.item == null
145      */
146     transient Node<E> head;
147 
148     /**
149      * Tail of linked list.
150      * Invariant: last.next == null
151      */
152     private transient Node<E> last;
153 
154     /** Lock held by take, poll, etc */
155     private final ReentrantLock takeLock = new ReentrantLock();
156 
157     /** Wait queue for waiting takes */
158     private final Condition notEmpty = takeLock.newCondition();
159 
160     /** Lock held by put, offer, etc */
161     private final ReentrantLock putLock = new ReentrantLock();
162 
163     /** Wait queue for waiting puts */
164     private final Condition notFull = putLock.newCondition();
165 
166     /**
167      * Signals a waiting take. Called only from put/offer (which do not
168      * otherwise ordinarily lock takeLock.)
169      */
signalNotEmpty()170     private void signalNotEmpty() {
171         final ReentrantLock takeLock = this.takeLock;
172         takeLock.lock();
173         try {
174             notEmpty.signal();
175         } finally {
176             takeLock.unlock();
177         }
178     }
179 
180     /**
181      * Signals a waiting put. Called only from take/poll.
182      */
signalNotFull()183     private void signalNotFull() {
184         final ReentrantLock putLock = this.putLock;
185         putLock.lock();
186         try {
187             notFull.signal();
188         } finally {
189             putLock.unlock();
190         }
191     }
192 
193     /**
194      * Links node at end of queue.
195      *
196      * @param node the node
197      */
enqueue(Node<E> node)198     private void enqueue(Node<E> node) {
199         // assert putLock.isHeldByCurrentThread();
200         // assert last.next == null;
201         last = last.next = node;
202     }
203 
204     /**
205      * Removes a node from head of queue.
206      *
207      * @return the node
208      */
dequeue()209     private E dequeue() {
210         // assert takeLock.isHeldByCurrentThread();
211         // assert head.item == null;
212         Node<E> h = head;
213         Node<E> first = h.next;
214         h.next = h; // help GC
215         head = first;
216         E x = first.item;
217         first.item = null;
218         return x;
219     }
220 
221     /**
222      * Locks to prevent both puts and takes.
223      */
fullyLock()224     void fullyLock() {
225         putLock.lock();
226         takeLock.lock();
227     }
228 
229     /**
230      * Unlocks to allow both puts and takes.
231      */
fullyUnlock()232     void fullyUnlock() {
233         takeLock.unlock();
234         putLock.unlock();
235     }
236 
237 //     /**
238 //      * Tells whether both locks are held by current thread.
239 //      */
240 //     boolean isFullyLocked() {
241 //         return (putLock.isHeldByCurrentThread() &&
242 //                 takeLock.isHeldByCurrentThread());
243 //     }
244 
245     /**
246      * Creates a {@code LinkedBlockingQueue} with a capacity of
247      * {@link Integer#MAX_VALUE}.
248      */
LinkedBlockingQueue()249     public LinkedBlockingQueue() {
250         this(Integer.MAX_VALUE);
251     }
252 
253     /**
254      * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
255      *
256      * @param capacity the capacity of this queue
257      * @throws IllegalArgumentException if {@code capacity} is not greater
258      *         than zero
259      */
LinkedBlockingQueue(int capacity)260     public LinkedBlockingQueue(int capacity) {
261         if (capacity <= 0) throw new IllegalArgumentException();
262         this.capacity = capacity;
263         last = head = new Node<E>(null);
264     }
265 
266     /**
267      * Creates a {@code LinkedBlockingQueue} with a capacity of
268      * {@link Integer#MAX_VALUE}, initially containing the elements of the
269      * given collection,
270      * added in traversal order of the collection's iterator.
271      *
272      * @param c the collection of elements to initially contain
273      * @throws NullPointerException if the specified collection or any
274      *         of its elements are null
275      */
LinkedBlockingQueue(Collection<? extends E> c)276     public LinkedBlockingQueue(Collection<? extends E> c) {
277         this(Integer.MAX_VALUE);
278         final ReentrantLock putLock = this.putLock;
279         putLock.lock(); // Never contended, but necessary for visibility
280         try {
281             int n = 0;
282             for (E e : c) {
283                 if (e == null)
284                     throw new NullPointerException();
285                 if (n == capacity)
286                     throw new IllegalStateException("Queue full");
287                 enqueue(new Node<E>(e));
288                 ++n;
289             }
290             count.set(n);
291         } finally {
292             putLock.unlock();
293         }
294     }
295 
296     // this doc comment is overridden to remove the reference to collections
297     // greater in size than Integer.MAX_VALUE
298     /**
299      * Returns the number of elements in this queue.
300      *
301      * @return the number of elements in this queue
302      */
size()303     public int size() {
304         return count.get();
305     }
306 
307     // this doc comment is a modified copy of the inherited doc comment,
308     // without the reference to unlimited queues.
309     /**
310      * Returns the number of additional elements that this queue can ideally
311      * (in the absence of memory or resource constraints) accept without
312      * blocking. This is always equal to the initial capacity of this queue
313      * less the current {@code size} of this queue.
314      *
315      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
316      * an element will succeed by inspecting {@code remainingCapacity}
317      * because it may be the case that another thread is about to
318      * insert or remove an element.
319      */
remainingCapacity()320     public int remainingCapacity() {
321         return capacity - count.get();
322     }
323 
324     /**
325      * Inserts the specified element at the tail of this queue, waiting if
326      * necessary for space to become available.
327      *
328      * @throws InterruptedException {@inheritDoc}
329      * @throws NullPointerException {@inheritDoc}
330      */
put(E e)331     public void put(E e) throws InterruptedException {
332         if (e == null) throw new NullPointerException();
333         // Note: convention in all put/take/etc is to preset local var
334         // holding count negative to indicate failure unless set.
335         int c = -1;
336         Node<E> node = new Node<E>(e);
337         final ReentrantLock putLock = this.putLock;
338         final AtomicInteger count = this.count;
339         putLock.lockInterruptibly();
340         try {
341             /*
342              * Note that count is used in wait guard even though it is
343              * not protected by lock. This works because count can
344              * only decrease at this point (all other puts are shut
345              * out by lock), and we (or some other waiting put) are
346              * signalled if it ever changes from capacity. Similarly
347              * for all other uses of count in other wait guards.
348              */
349             while (count.get() == capacity) {
350                 notFull.await();
351             }
352             enqueue(node);
353             c = count.getAndIncrement();
354             if (c + 1 < capacity)
355                 notFull.signal();
356         } finally {
357             putLock.unlock();
358         }
359         if (c == 0)
360             signalNotEmpty();
361     }
362 
363     /**
364      * Inserts the specified element at the tail of this queue, waiting if
365      * necessary up to the specified wait time for space to become available.
366      *
367      * @return {@code true} if successful, or {@code false} if
368      *         the specified waiting time elapses before space is available
369      * @throws InterruptedException {@inheritDoc}
370      * @throws NullPointerException {@inheritDoc}
371      */
offer(E e, long timeout, TimeUnit unit)372     public boolean offer(E e, long timeout, TimeUnit unit)
373         throws InterruptedException {
374 
375         if (e == null) throw new NullPointerException();
376         long nanos = unit.toNanos(timeout);
377         int c = -1;
378         final ReentrantLock putLock = this.putLock;
379         final AtomicInteger count = this.count;
380         putLock.lockInterruptibly();
381         try {
382             while (count.get() == capacity) {
383                 if (nanos <= 0L)
384                     return false;
385                 nanos = notFull.awaitNanos(nanos);
386             }
387             enqueue(new Node<E>(e));
388             c = count.getAndIncrement();
389             if (c + 1 < capacity)
390                 notFull.signal();
391         } finally {
392             putLock.unlock();
393         }
394         if (c == 0)
395             signalNotEmpty();
396         return true;
397     }
398 
399     /**
400      * Inserts the specified element at the tail of this queue if it is
401      * possible to do so immediately without exceeding the queue's capacity,
402      * returning {@code true} upon success and {@code false} if this queue
403      * is full.
404      * When using a capacity-restricted queue, this method is generally
405      * preferable to method {@link BlockingQueue#add add}, which can fail to
406      * insert an element only by throwing an exception.
407      *
408      * @throws NullPointerException if the specified element is null
409      */
offer(E e)410     public boolean offer(E e) {
411         if (e == null) throw new NullPointerException();
412         final AtomicInteger count = this.count;
413         if (count.get() == capacity)
414             return false;
415         int c = -1;
416         Node<E> node = new Node<E>(e);
417         final ReentrantLock putLock = this.putLock;
418         putLock.lock();
419         try {
420             if (count.get() < capacity) {
421                 enqueue(node);
422                 c = count.getAndIncrement();
423                 if (c + 1 < capacity)
424                     notFull.signal();
425             }
426         } finally {
427             putLock.unlock();
428         }
429         if (c == 0)
430             signalNotEmpty();
431         return c >= 0;
432     }
433 
take()434     public E take() throws InterruptedException {
435         E x;
436         int c = -1;
437         final AtomicInteger count = this.count;
438         final ReentrantLock takeLock = this.takeLock;
439         takeLock.lockInterruptibly();
440         try {
441             while (count.get() == 0) {
442                 notEmpty.await();
443             }
444             x = dequeue();
445             c = count.getAndDecrement();
446             if (c > 1)
447                 notEmpty.signal();
448         } finally {
449             takeLock.unlock();
450         }
451         if (c == capacity)
452             signalNotFull();
453         return x;
454     }
455 
poll(long timeout, TimeUnit unit)456     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
457         E x = null;
458         int c = -1;
459         long nanos = unit.toNanos(timeout);
460         final AtomicInteger count = this.count;
461         final ReentrantLock takeLock = this.takeLock;
462         takeLock.lockInterruptibly();
463         try {
464             while (count.get() == 0) {
465                 if (nanos <= 0L)
466                     return null;
467                 nanos = notEmpty.awaitNanos(nanos);
468             }
469             x = dequeue();
470             c = count.getAndDecrement();
471             if (c > 1)
472                 notEmpty.signal();
473         } finally {
474             takeLock.unlock();
475         }
476         if (c == capacity)
477             signalNotFull();
478         return x;
479     }
480 
poll()481     public E poll() {
482         final AtomicInteger count = this.count;
483         if (count.get() == 0)
484             return null;
485         E x = null;
486         int c = -1;
487         final ReentrantLock takeLock = this.takeLock;
488         takeLock.lock();
489         try {
490             if (count.get() > 0) {
491                 x = dequeue();
492                 c = count.getAndDecrement();
493                 if (c > 1)
494                     notEmpty.signal();
495             }
496         } finally {
497             takeLock.unlock();
498         }
499         if (c == capacity)
500             signalNotFull();
501         return x;
502     }
503 
peek()504     public E peek() {
505         if (count.get() == 0)
506             return null;
507         final ReentrantLock takeLock = this.takeLock;
508         takeLock.lock();
509         try {
510             return (count.get() > 0) ? head.next.item : null;
511         } finally {
512             takeLock.unlock();
513         }
514     }
515 
516     /**
517      * Unlinks interior Node p with predecessor trail.
518      */
unlink(Node<E> p, Node<E> trail)519     void unlink(Node<E> p, Node<E> trail) {
520         // assert isFullyLocked();
521         // p.next is not changed, to allow iterators that are
522         // traversing p to maintain their weak-consistency guarantee.
523         p.item = null;
524         trail.next = p.next;
525         if (last == p)
526             last = trail;
527         if (count.getAndDecrement() == capacity)
528             notFull.signal();
529     }
530 
531     /**
532      * Removes a single instance of the specified element from this queue,
533      * if it is present.  More formally, removes an element {@code e} such
534      * that {@code o.equals(e)}, if this queue contains one or more such
535      * elements.
536      * Returns {@code true} if this queue contained the specified element
537      * (or equivalently, if this queue changed as a result of the call).
538      *
539      * @param o element to be removed from this queue, if present
540      * @return {@code true} if this queue changed as a result of the call
541      */
remove(Object o)542     public boolean remove(Object o) {
543         if (o == null) return false;
544         fullyLock();
545         try {
546             for (Node<E> trail = head, p = trail.next;
547                  p != null;
548                  trail = p, p = p.next) {
549                 if (o.equals(p.item)) {
550                     unlink(p, trail);
551                     return true;
552                 }
553             }
554             return false;
555         } finally {
556             fullyUnlock();
557         }
558     }
559 
560     /**
561      * Returns {@code true} if this queue contains the specified element.
562      * More formally, returns {@code true} if and only if this queue contains
563      * at least one element {@code e} such that {@code o.equals(e)}.
564      *
565      * @param o object to be checked for containment in this queue
566      * @return {@code true} if this queue contains the specified element
567      */
contains(Object o)568     public boolean contains(Object o) {
569         if (o == null) return false;
570         fullyLock();
571         try {
572             for (Node<E> p = head.next; p != null; p = p.next)
573                 if (o.equals(p.item))
574                     return true;
575             return false;
576         } finally {
577             fullyUnlock();
578         }
579     }
580 
581     /**
582      * Returns an array containing all of the elements in this queue, in
583      * proper sequence.
584      *
585      * <p>The returned array will be "safe" in that no references to it are
586      * maintained by this queue.  (In other words, this method must allocate
587      * a new array).  The caller is thus free to modify the returned array.
588      *
589      * <p>This method acts as bridge between array-based and collection-based
590      * APIs.
591      *
592      * @return an array containing all of the elements in this queue
593      */
toArray()594     public Object[] toArray() {
595         fullyLock();
596         try {
597             int size = count.get();
598             Object[] a = new Object[size];
599             int k = 0;
600             for (Node<E> p = head.next; p != null; p = p.next)
601                 a[k++] = p.item;
602             return a;
603         } finally {
604             fullyUnlock();
605         }
606     }
607 
608     /**
609      * Returns an array containing all of the elements in this queue, in
610      * proper sequence; the runtime type of the returned array is that of
611      * the specified array.  If the queue fits in the specified array, it
612      * is returned therein.  Otherwise, a new array is allocated with the
613      * runtime type of the specified array and the size of this queue.
614      *
615      * <p>If this queue fits in the specified array with room to spare
616      * (i.e., the array has more elements than this queue), the element in
617      * the array immediately following the end of the queue is set to
618      * {@code null}.
619      *
620      * <p>Like the {@link #toArray()} method, this method acts as bridge between
621      * array-based and collection-based APIs.  Further, this method allows
622      * precise control over the runtime type of the output array, and may,
623      * under certain circumstances, be used to save allocation costs.
624      *
625      * <p>Suppose {@code x} is a queue known to contain only strings.
626      * The following code can be used to dump the queue into a newly
627      * allocated array of {@code String}:
628      *
629      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
630      *
631      * Note that {@code toArray(new Object[0])} is identical in function to
632      * {@code toArray()}.
633      *
634      * @param a the array into which the elements of the queue are to
635      *          be stored, if it is big enough; otherwise, a new array of the
636      *          same runtime type is allocated for this purpose
637      * @return an array containing all of the elements in this queue
638      * @throws ArrayStoreException if the runtime type of the specified array
639      *         is not a supertype of the runtime type of every element in
640      *         this queue
641      * @throws NullPointerException if the specified array is null
642      */
643     @SuppressWarnings("unchecked")
toArray(T[] a)644     public <T> T[] toArray(T[] a) {
645         fullyLock();
646         try {
647             int size = count.get();
648             if (a.length < size)
649                 a = (T[])java.lang.reflect.Array.newInstance
650                     (a.getClass().getComponentType(), size);
651 
652             int k = 0;
653             for (Node<E> p = head.next; p != null; p = p.next)
654                 a[k++] = (T)p.item;
655             if (a.length > k)
656                 a[k] = null;
657             return a;
658         } finally {
659             fullyUnlock();
660         }
661     }
662 
toString()663     public String toString() {
664         return Helpers.collectionToString(this);
665     }
666 
667     /**
668      * Atomically removes all of the elements from this queue.
669      * The queue will be empty after this call returns.
670      */
clear()671     public void clear() {
672         fullyLock();
673         try {
674             for (Node<E> p, h = head; (p = h.next) != null; h = p) {
675                 h.next = h;
676                 p.item = null;
677             }
678             head = last;
679             // assert head.item == null && head.next == null;
680             if (count.getAndSet(0) == capacity)
681                 notFull.signal();
682         } finally {
683             fullyUnlock();
684         }
685     }
686 
687     /**
688      * @throws UnsupportedOperationException {@inheritDoc}
689      * @throws ClassCastException            {@inheritDoc}
690      * @throws NullPointerException          {@inheritDoc}
691      * @throws IllegalArgumentException      {@inheritDoc}
692      */
drainTo(Collection<? super E> c)693     public int drainTo(Collection<? super E> c) {
694         return drainTo(c, Integer.MAX_VALUE);
695     }
696 
697     /**
698      * @throws UnsupportedOperationException {@inheritDoc}
699      * @throws ClassCastException            {@inheritDoc}
700      * @throws NullPointerException          {@inheritDoc}
701      * @throws IllegalArgumentException      {@inheritDoc}
702      */
drainTo(Collection<? super E> c, int maxElements)703     public int drainTo(Collection<? super E> c, int maxElements) {
704         if (c == null)
705             throw new NullPointerException();
706         if (c == this)
707             throw new IllegalArgumentException();
708         if (maxElements <= 0)
709             return 0;
710         boolean signalNotFull = false;
711         final ReentrantLock takeLock = this.takeLock;
712         takeLock.lock();
713         try {
714             int n = Math.min(maxElements, count.get());
715             // count.get provides visibility to first n Nodes
716             Node<E> h = head;
717             int i = 0;
718             try {
719                 while (i < n) {
720                     Node<E> p = h.next;
721                     c.add(p.item);
722                     p.item = null;
723                     h.next = h;
724                     h = p;
725                     ++i;
726                 }
727                 return n;
728             } finally {
729                 // Restore invariants even if c.add() threw
730                 if (i > 0) {
731                     // assert h.item == null;
732                     head = h;
733                     signalNotFull = (count.getAndAdd(-i) == capacity);
734                 }
735             }
736         } finally {
737             takeLock.unlock();
738             if (signalNotFull)
739                 signalNotFull();
740         }
741     }
742 
743     /**
744      * Returns an iterator over the elements in this queue in proper sequence.
745      * The elements will be returned in order from first (head) to last (tail).
746      *
747      * <p>The returned iterator is
748      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
749      *
750      * @return an iterator over the elements in this queue in proper sequence
751      */
iterator()752     public Iterator<E> iterator() {
753         return new Itr();
754     }
755 
756     private class Itr implements Iterator<E> {
757         /*
758          * Basic weakly-consistent iterator.  At all times hold the next
759          * item to hand out so that if hasNext() reports true, we will
760          * still have it to return even if lost race with a take etc.
761          */
762 
763         private Node<E> current;
764         private Node<E> lastRet;
765         private E currentElement;
766 
Itr()767         Itr() {
768             fullyLock();
769             try {
770                 current = head.next;
771                 if (current != null)
772                     currentElement = current.item;
773             } finally {
774                 fullyUnlock();
775             }
776         }
777 
hasNext()778         public boolean hasNext() {
779             return current != null;
780         }
781 
next()782         public E next() {
783             fullyLock();
784             try {
785                 if (current == null)
786                     throw new NoSuchElementException();
787                 lastRet = current;
788                 E item = null;
789                 // Unlike other traversal methods, iterators must handle both:
790                 // - dequeued nodes (p.next == p)
791                 // - (possibly multiple) interior removed nodes (p.item == null)
792                 for (Node<E> p = current, q;; p = q) {
793                     if ((q = p.next) == p)
794                         q = head.next;
795                     if (q == null || (item = q.item) != null) {
796                         current = q;
797                         E x = currentElement;
798                         currentElement = item;
799                         return x;
800                     }
801                 }
802             } finally {
803                 fullyUnlock();
804             }
805         }
806 
remove()807         public void remove() {
808             if (lastRet == null)
809                 throw new IllegalStateException();
810             fullyLock();
811             try {
812                 Node<E> node = lastRet;
813                 lastRet = null;
814                 for (Node<E> trail = head, p = trail.next;
815                      p != null;
816                      trail = p, p = p.next) {
817                     if (p == node) {
818                         unlink(p, trail);
819                         break;
820                     }
821                 }
822             } finally {
823                 fullyUnlock();
824             }
825         }
826     }
827 
828     /** A customized variant of Spliterators.IteratorSpliterator */
829     static final class LBQSpliterator<E> implements Spliterator<E> {
830         static final int MAX_BATCH = 1 << 25;  // max batch array size;
831         final LinkedBlockingQueue<E> queue;
832         Node<E> current;    // current node; null until initialized
833         int batch;          // batch size for splits
834         boolean exhausted;  // true when no more nodes
835         long est;           // size estimate
LBQSpliterator(LinkedBlockingQueue<E> queue)836         LBQSpliterator(LinkedBlockingQueue<E> queue) {
837             this.queue = queue;
838             this.est = queue.size();
839         }
840 
estimateSize()841         public long estimateSize() { return est; }
842 
trySplit()843         public Spliterator<E> trySplit() {
844             Node<E> h;
845             final LinkedBlockingQueue<E> q = this.queue;
846             int b = batch;
847             int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
848             if (!exhausted &&
849                 ((h = current) != null || (h = q.head.next) != null) &&
850                 h.next != null) {
851                 Object[] a = new Object[n];
852                 int i = 0;
853                 Node<E> p = current;
854                 q.fullyLock();
855                 try {
856                     if (p != null || (p = q.head.next) != null) {
857                         do {
858                             if ((a[i] = p.item) != null)
859                                 ++i;
860                         } while ((p = p.next) != null && i < n);
861                     }
862                 } finally {
863                     q.fullyUnlock();
864                 }
865                 if ((current = p) == null) {
866                     est = 0L;
867                     exhausted = true;
868                 }
869                 else if ((est -= i) < 0L)
870                     est = 0L;
871                 if (i > 0) {
872                     batch = i;
873                     return Spliterators.spliterator
874                         (a, 0, i, (Spliterator.ORDERED |
875                                    Spliterator.NONNULL |
876                                    Spliterator.CONCURRENT));
877                 }
878             }
879             return null;
880         }
881 
forEachRemaining(Consumer<? super E> action)882         public void forEachRemaining(Consumer<? super E> action) {
883             if (action == null) throw new NullPointerException();
884             final LinkedBlockingQueue<E> q = this.queue;
885             if (!exhausted) {
886                 exhausted = true;
887                 Node<E> p = current;
888                 do {
889                     E e = null;
890                     q.fullyLock();
891                     try {
892                         if (p == null)
893                             p = q.head.next;
894                         while (p != null) {
895                             e = p.item;
896                             p = p.next;
897                             if (e != null)
898                                 break;
899                         }
900                     } finally {
901                         q.fullyUnlock();
902                     }
903                     if (e != null)
904                         action.accept(e);
905                 } while (p != null);
906             }
907         }
908 
tryAdvance(Consumer<? super E> action)909         public boolean tryAdvance(Consumer<? super E> action) {
910             if (action == null) throw new NullPointerException();
911             final LinkedBlockingQueue<E> q = this.queue;
912             if (!exhausted) {
913                 E e = null;
914                 q.fullyLock();
915                 try {
916                     if (current == null)
917                         current = q.head.next;
918                     while (current != null) {
919                         e = current.item;
920                         current = current.next;
921                         if (e != null)
922                             break;
923                     }
924                 } finally {
925                     q.fullyUnlock();
926                 }
927                 if (current == null)
928                     exhausted = true;
929                 if (e != null) {
930                     action.accept(e);
931                     return true;
932                 }
933             }
934             return false;
935         }
936 
characteristics()937         public int characteristics() {
938             return Spliterator.ORDERED | Spliterator.NONNULL |
939                 Spliterator.CONCURRENT;
940         }
941     }
942 
943     /**
944      * Returns a {@link Spliterator} over the elements in this queue.
945      *
946      * <p>The returned spliterator is
947      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
948      *
949      * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
950      * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
951      *
952      * @implNote
953      * The {@code Spliterator} implements {@code trySplit} to permit limited
954      * parallelism.
955      *
956      * @return a {@code Spliterator} over the elements in this queue
957      * @since 1.8
958      */
spliterator()959     public Spliterator<E> spliterator() {
960         return new LBQSpliterator<E>(this);
961     }
962 
963     /**
964      * Saves this queue to a stream (that is, serializes it).
965      *
966      * @param s the stream
967      * @throws java.io.IOException if an I/O error occurs
968      * @serialData The capacity is emitted (int), followed by all of
969      * its elements (each an {@code Object}) in the proper order,
970      * followed by a null
971      */
writeObject(java.io.ObjectOutputStream s)972     private void writeObject(java.io.ObjectOutputStream s)
973         throws java.io.IOException {
974 
975         fullyLock();
976         try {
977             // Write out any hidden stuff, plus capacity
978             s.defaultWriteObject();
979 
980             // Write out all elements in the proper order.
981             for (Node<E> p = head.next; p != null; p = p.next)
982                 s.writeObject(p.item);
983 
984             // Use trailing null as sentinel
985             s.writeObject(null);
986         } finally {
987             fullyUnlock();
988         }
989     }
990 
991     /**
992      * Reconstitutes this queue from a stream (that is, deserializes it).
993      * @param s the stream
994      * @throws ClassNotFoundException if the class of a serialized object
995      *         could not be found
996      * @throws java.io.IOException if an I/O error occurs
997      */
readObject(java.io.ObjectInputStream s)998     private void readObject(java.io.ObjectInputStream s)
999         throws java.io.IOException, ClassNotFoundException {
1000         // Read in capacity, and any hidden stuff
1001         s.defaultReadObject();
1002 
1003         count.set(0);
1004         last = head = new Node<E>(null);
1005 
1006         // Read in all elements and place in queue
1007         for (;;) {
1008             @SuppressWarnings("unchecked")
1009             E item = (E)s.readObject();
1010             if (item == null)
1011                 break;
1012             add(item);
1013         }
1014     }
1015 }
1016