1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/publicdomain/zero/1.0/
5  */
6 
7 package java.util.concurrent;
8 
9 import java.util.AbstractQueue;
10 import java.util.Collection;
11 import java.util.Iterator;
12 import java.util.NoSuchElementException;
13 import java.util.Spliterator;
14 import java.util.Spliterators;
15 import java.util.concurrent.atomic.AtomicInteger;
16 import java.util.concurrent.locks.Condition;
17 import java.util.concurrent.locks.ReentrantLock;
18 import java.util.function.Consumer;
19 
20 // BEGIN android-note
21 // removed link to collections framework docs
22 // END android-note
23 
24 /**
25  * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
26  * linked nodes.
27  * This queue orders elements FIFO (first-in-first-out).
28  * The <em>head</em> of the queue is that element that has been on the
29  * queue the longest time.
30  * The <em>tail</em> of the queue is that element that has been on the
31  * queue the shortest time. New elements
32  * are inserted at the tail of the queue, and the queue retrieval
33  * operations obtain elements at the head of the queue.
34  * Linked queues typically have higher throughput than array-based queues but
35  * less predictable performance in most concurrent applications.
36  *
37  * <p>The optional capacity bound constructor argument serves as a
38  * way to prevent excessive queue expansion. The capacity, if unspecified,
39  * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
40  * dynamically created upon each insertion unless this would bring the
41  * queue above capacity.
42  *
43  * <p>This class and its iterator implement all of the
44  * <em>optional</em> methods of the {@link Collection} and {@link
45  * Iterator} interfaces.
46  *
47  * @since 1.5
48  * @author Doug Lea
49  * @param <E> the type of elements held in this queue
50  */
51 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
52         implements BlockingQueue<E>, java.io.Serializable {
53     private static final long serialVersionUID = -6903933977591709194L;
54 
55     /*
56      * A variant of the "two lock queue" algorithm.  The putLock gates
57      * entry to put (and offer), and has an associated condition for
58      * waiting puts.  Similarly for the takeLock.  The "count" field
59      * that they both rely on is maintained as an atomic to avoid
60      * needing to get both locks in most cases. Also, to minimize need
61      * for puts to get takeLock and vice-versa, cascading notifies are
62      * used. When a put notices that it has enabled at least one take,
63      * it signals taker. That taker in turn signals others if more
64      * items have been entered since the signal. And symmetrically for
65      * takes signalling puts. Operations such as remove(Object) and
66      * iterators acquire both locks.
67      *
68      * Visibility between writers and readers is provided as follows:
69      *
70      * Whenever an element is enqueued, the putLock is acquired and
71      * count updated.  A subsequent reader guarantees visibility to the
72      * enqueued Node by either acquiring the putLock (via fullyLock)
73      * or by acquiring the takeLock, and then reading n = count.get();
74      * this gives visibility to the first n items.
75      *
76      * To implement weakly consistent iterators, it appears we need to
77      * keep all Nodes GC-reachable from a predecessor dequeued Node.
78      * That would cause two problems:
79      * - allow a rogue Iterator to cause unbounded memory retention
80      * - cause cross-generational linking of old Nodes to new Nodes if
81      *   a Node was tenured while live, which generational GCs have a
82      *   hard time dealing with, causing repeated major collections.
83      * However, only non-deleted Nodes need to be reachable from
84      * dequeued Nodes, and reachability does not necessarily have to
85      * be of the kind understood by the GC.  We use the trick of
86      * linking a Node that has just been dequeued to itself.  Such a
87      * self-link implicitly means to advance to head.next.
88      */
89 
90     /**
91      * Linked list node class.
92      */
93     static class Node<E> {
94         E item;
95 
96         /**
97          * One of:
98          * - the real successor Node
99          * - this Node, meaning the successor is head.next
100          * - null, meaning there is no successor (this is the last node)
101          */
102         Node<E> next;
103 
Node(E x)104         Node(E x) { item = x; }
105     }
106 
107     /** The capacity bound, or Integer.MAX_VALUE if none */
108     private final int capacity;
109 
110     /** Current number of elements */
111     private final AtomicInteger count = new AtomicInteger();
112 
113     /**
114      * Head of linked list.
115      * Invariant: head.item == null
116      */
117     transient Node<E> head;
118 
119     /**
120      * Tail of linked list.
121      * Invariant: last.next == null
122      */
123     private transient Node<E> last;
124 
125     /** Lock held by take, poll, etc */
126     private final ReentrantLock takeLock = new ReentrantLock();
127 
128     /** Wait queue for waiting takes */
129     private final Condition notEmpty = takeLock.newCondition();
130 
131     /** Lock held by put, offer, etc */
132     private final ReentrantLock putLock = new ReentrantLock();
133 
134     /** Wait queue for waiting puts */
135     private final Condition notFull = putLock.newCondition();
136 
137     /**
138      * Signals a waiting take. Called only from put/offer (which do not
139      * otherwise ordinarily lock takeLock.)
140      */
signalNotEmpty()141     private void signalNotEmpty() {
142         final ReentrantLock takeLock = this.takeLock;
143         takeLock.lock();
144         try {
145             notEmpty.signal();
146         } finally {
147             takeLock.unlock();
148         }
149     }
150 
151     /**
152      * Signals a waiting put. Called only from take/poll.
153      */
signalNotFull()154     private void signalNotFull() {
155         final ReentrantLock putLock = this.putLock;
156         putLock.lock();
157         try {
158             notFull.signal();
159         } finally {
160             putLock.unlock();
161         }
162     }
163 
164     /**
165      * Links node at end of queue.
166      *
167      * @param node the node
168      */
enqueue(Node<E> node)169     private void enqueue(Node<E> node) {
170         // assert putLock.isHeldByCurrentThread();
171         // assert last.next == null;
172         last = last.next = node;
173     }
174 
175     /**
176      * Removes a node from head of queue.
177      *
178      * @return the node
179      */
dequeue()180     private E dequeue() {
181         // assert takeLock.isHeldByCurrentThread();
182         // assert head.item == null;
183         Node<E> h = head;
184         Node<E> first = h.next;
185         h.next = h; // help GC
186         head = first;
187         E x = first.item;
188         first.item = null;
189         return x;
190     }
191 
192     /**
193      * Locks to prevent both puts and takes.
194      */
fullyLock()195     void fullyLock() {
196         putLock.lock();
197         takeLock.lock();
198     }
199 
200     /**
201      * Unlocks to allow both puts and takes.
202      */
fullyUnlock()203     void fullyUnlock() {
204         takeLock.unlock();
205         putLock.unlock();
206     }
207 
208 //     /**
209 //      * Tells whether both locks are held by current thread.
210 //      */
211 //     boolean isFullyLocked() {
212 //         return (putLock.isHeldByCurrentThread() &&
213 //                 takeLock.isHeldByCurrentThread());
214 //     }
215 
216     /**
217      * Creates a {@code LinkedBlockingQueue} with a capacity of
218      * {@link Integer#MAX_VALUE}.
219      */
LinkedBlockingQueue()220     public LinkedBlockingQueue() {
221         this(Integer.MAX_VALUE);
222     }
223 
224     /**
225      * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
226      *
227      * @param capacity the capacity of this queue
228      * @throws IllegalArgumentException if {@code capacity} is not greater
229      *         than zero
230      */
LinkedBlockingQueue(int capacity)231     public LinkedBlockingQueue(int capacity) {
232         if (capacity <= 0) throw new IllegalArgumentException();
233         this.capacity = capacity;
234         last = head = new Node<E>(null);
235     }
236 
237     /**
238      * Creates a {@code LinkedBlockingQueue} with a capacity of
239      * {@link Integer#MAX_VALUE}, initially containing the elements of the
240      * given collection,
241      * added in traversal order of the collection's iterator.
242      *
243      * @param c the collection of elements to initially contain
244      * @throws NullPointerException if the specified collection or any
245      *         of its elements are null
246      */
LinkedBlockingQueue(Collection<? extends E> c)247     public LinkedBlockingQueue(Collection<? extends E> c) {
248         this(Integer.MAX_VALUE);
249         final ReentrantLock putLock = this.putLock;
250         putLock.lock(); // Never contended, but necessary for visibility
251         try {
252             int n = 0;
253             for (E e : c) {
254                 if (e == null)
255                     throw new NullPointerException();
256                 if (n == capacity)
257                     throw new IllegalStateException("Queue full");
258                 enqueue(new Node<E>(e));
259                 ++n;
260             }
261             count.set(n);
262         } finally {
263             putLock.unlock();
264         }
265     }
266 
267     // this doc comment is overridden to remove the reference to collections
268     // greater in size than Integer.MAX_VALUE
269     /**
270      * Returns the number of elements in this queue.
271      *
272      * @return the number of elements in this queue
273      */
size()274     public int size() {
275         return count.get();
276     }
277 
278     // this doc comment is a modified copy of the inherited doc comment,
279     // without the reference to unlimited queues.
280     /**
281      * Returns the number of additional elements that this queue can ideally
282      * (in the absence of memory or resource constraints) accept without
283      * blocking. This is always equal to the initial capacity of this queue
284      * less the current {@code size} of this queue.
285      *
286      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
287      * an element will succeed by inspecting {@code remainingCapacity}
288      * because it may be the case that another thread is about to
289      * insert or remove an element.
290      */
remainingCapacity()291     public int remainingCapacity() {
292         return capacity - count.get();
293     }
294 
295     /**
296      * Inserts the specified element at the tail of this queue, waiting if
297      * necessary for space to become available.
298      *
299      * @throws InterruptedException {@inheritDoc}
300      * @throws NullPointerException {@inheritDoc}
301      */
put(E e)302     public void put(E e) throws InterruptedException {
303         if (e == null) throw new NullPointerException();
304         // Note: convention in all put/take/etc is to preset local var
305         // holding count negative to indicate failure unless set.
306         int c = -1;
307         Node<E> node = new Node<E>(e);
308         final ReentrantLock putLock = this.putLock;
309         final AtomicInteger count = this.count;
310         putLock.lockInterruptibly();
311         try {
312             /*
313              * Note that count is used in wait guard even though it is
314              * not protected by lock. This works because count can
315              * only decrease at this point (all other puts are shut
316              * out by lock), and we (or some other waiting put) are
317              * signalled if it ever changes from capacity. Similarly
318              * for all other uses of count in other wait guards.
319              */
320             while (count.get() == capacity) {
321                 notFull.await();
322             }
323             enqueue(node);
324             c = count.getAndIncrement();
325             if (c + 1 < capacity)
326                 notFull.signal();
327         } finally {
328             putLock.unlock();
329         }
330         if (c == 0)
331             signalNotEmpty();
332     }
333 
334     /**
335      * Inserts the specified element at the tail of this queue, waiting if
336      * necessary up to the specified wait time for space to become available.
337      *
338      * @return {@code true} if successful, or {@code false} if
339      *         the specified waiting time elapses before space is available
340      * @throws InterruptedException {@inheritDoc}
341      * @throws NullPointerException {@inheritDoc}
342      */
offer(E e, long timeout, TimeUnit unit)343     public boolean offer(E e, long timeout, TimeUnit unit)
344         throws InterruptedException {
345 
346         if (e == null) throw new NullPointerException();
347         long nanos = unit.toNanos(timeout);
348         int c = -1;
349         final ReentrantLock putLock = this.putLock;
350         final AtomicInteger count = this.count;
351         putLock.lockInterruptibly();
352         try {
353             while (count.get() == capacity) {
354                 if (nanos <= 0L)
355                     return false;
356                 nanos = notFull.awaitNanos(nanos);
357             }
358             enqueue(new Node<E>(e));
359             c = count.getAndIncrement();
360             if (c + 1 < capacity)
361                 notFull.signal();
362         } finally {
363             putLock.unlock();
364         }
365         if (c == 0)
366             signalNotEmpty();
367         return true;
368     }
369 
370     /**
371      * Inserts the specified element at the tail of this queue if it is
372      * possible to do so immediately without exceeding the queue's capacity,
373      * returning {@code true} upon success and {@code false} if this queue
374      * is full.
375      * When using a capacity-restricted queue, this method is generally
376      * preferable to method {@link BlockingQueue#add add}, which can fail to
377      * insert an element only by throwing an exception.
378      *
379      * @throws NullPointerException if the specified element is null
380      */
offer(E e)381     public boolean offer(E e) {
382         if (e == null) throw new NullPointerException();
383         final AtomicInteger count = this.count;
384         if (count.get() == capacity)
385             return false;
386         int c = -1;
387         Node<E> node = new Node<E>(e);
388         final ReentrantLock putLock = this.putLock;
389         putLock.lock();
390         try {
391             if (count.get() < capacity) {
392                 enqueue(node);
393                 c = count.getAndIncrement();
394                 if (c + 1 < capacity)
395                     notFull.signal();
396             }
397         } finally {
398             putLock.unlock();
399         }
400         if (c == 0)
401             signalNotEmpty();
402         return c >= 0;
403     }
404 
take()405     public E take() throws InterruptedException {
406         E x;
407         int c = -1;
408         final AtomicInteger count = this.count;
409         final ReentrantLock takeLock = this.takeLock;
410         takeLock.lockInterruptibly();
411         try {
412             while (count.get() == 0) {
413                 notEmpty.await();
414             }
415             x = dequeue();
416             c = count.getAndDecrement();
417             if (c > 1)
418                 notEmpty.signal();
419         } finally {
420             takeLock.unlock();
421         }
422         if (c == capacity)
423             signalNotFull();
424         return x;
425     }
426 
poll(long timeout, TimeUnit unit)427     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
428         E x = null;
429         int c = -1;
430         long nanos = unit.toNanos(timeout);
431         final AtomicInteger count = this.count;
432         final ReentrantLock takeLock = this.takeLock;
433         takeLock.lockInterruptibly();
434         try {
435             while (count.get() == 0) {
436                 if (nanos <= 0L)
437                     return null;
438                 nanos = notEmpty.awaitNanos(nanos);
439             }
440             x = dequeue();
441             c = count.getAndDecrement();
442             if (c > 1)
443                 notEmpty.signal();
444         } finally {
445             takeLock.unlock();
446         }
447         if (c == capacity)
448             signalNotFull();
449         return x;
450     }
451 
poll()452     public E poll() {
453         final AtomicInteger count = this.count;
454         if (count.get() == 0)
455             return null;
456         E x = null;
457         int c = -1;
458         final ReentrantLock takeLock = this.takeLock;
459         takeLock.lock();
460         try {
461             if (count.get() > 0) {
462                 x = dequeue();
463                 c = count.getAndDecrement();
464                 if (c > 1)
465                     notEmpty.signal();
466             }
467         } finally {
468             takeLock.unlock();
469         }
470         if (c == capacity)
471             signalNotFull();
472         return x;
473     }
474 
peek()475     public E peek() {
476         if (count.get() == 0)
477             return null;
478         final ReentrantLock takeLock = this.takeLock;
479         takeLock.lock();
480         try {
481             return (count.get() > 0) ? head.next.item : null;
482         } finally {
483             takeLock.unlock();
484         }
485     }
486 
487     /**
488      * Unlinks interior Node p with predecessor trail.
489      */
unlink(Node<E> p, Node<E> trail)490     void unlink(Node<E> p, Node<E> trail) {
491         // assert isFullyLocked();
492         // p.next is not changed, to allow iterators that are
493         // traversing p to maintain their weak-consistency guarantee.
494         p.item = null;
495         trail.next = p.next;
496         if (last == p)
497             last = trail;
498         if (count.getAndDecrement() == capacity)
499             notFull.signal();
500     }
501 
502     /**
503      * Removes a single instance of the specified element from this queue,
504      * if it is present.  More formally, removes an element {@code e} such
505      * that {@code o.equals(e)}, if this queue contains one or more such
506      * elements.
507      * Returns {@code true} if this queue contained the specified element
508      * (or equivalently, if this queue changed as a result of the call).
509      *
510      * @param o element to be removed from this queue, if present
511      * @return {@code true} if this queue changed as a result of the call
512      */
remove(Object o)513     public boolean remove(Object o) {
514         if (o == null) return false;
515         fullyLock();
516         try {
517             for (Node<E> trail = head, p = trail.next;
518                  p != null;
519                  trail = p, p = p.next) {
520                 if (o.equals(p.item)) {
521                     unlink(p, trail);
522                     return true;
523                 }
524             }
525             return false;
526         } finally {
527             fullyUnlock();
528         }
529     }
530 
531     /**
532      * Returns {@code true} if this queue contains the specified element.
533      * More formally, returns {@code true} if and only if this queue contains
534      * at least one element {@code e} such that {@code o.equals(e)}.
535      *
536      * @param o object to be checked for containment in this queue
537      * @return {@code true} if this queue contains the specified element
538      */
contains(Object o)539     public boolean contains(Object o) {
540         if (o == null) return false;
541         fullyLock();
542         try {
543             for (Node<E> p = head.next; p != null; p = p.next)
544                 if (o.equals(p.item))
545                     return true;
546             return false;
547         } finally {
548             fullyUnlock();
549         }
550     }
551 
552     /**
553      * Returns an array containing all of the elements in this queue, in
554      * proper sequence.
555      *
556      * <p>The returned array will be "safe" in that no references to it are
557      * maintained by this queue.  (In other words, this method must allocate
558      * a new array).  The caller is thus free to modify the returned array.
559      *
560      * <p>This method acts as bridge between array-based and collection-based
561      * APIs.
562      *
563      * @return an array containing all of the elements in this queue
564      */
toArray()565     public Object[] toArray() {
566         fullyLock();
567         try {
568             int size = count.get();
569             Object[] a = new Object[size];
570             int k = 0;
571             for (Node<E> p = head.next; p != null; p = p.next)
572                 a[k++] = p.item;
573             return a;
574         } finally {
575             fullyUnlock();
576         }
577     }
578 
579     /**
580      * Returns an array containing all of the elements in this queue, in
581      * proper sequence; the runtime type of the returned array is that of
582      * the specified array.  If the queue fits in the specified array, it
583      * is returned therein.  Otherwise, a new array is allocated with the
584      * runtime type of the specified array and the size of this queue.
585      *
586      * <p>If this queue fits in the specified array with room to spare
587      * (i.e., the array has more elements than this queue), the element in
588      * the array immediately following the end of the queue is set to
589      * {@code null}.
590      *
591      * <p>Like the {@link #toArray()} method, this method acts as bridge between
592      * array-based and collection-based APIs.  Further, this method allows
593      * precise control over the runtime type of the output array, and may,
594      * under certain circumstances, be used to save allocation costs.
595      *
596      * <p>Suppose {@code x} is a queue known to contain only strings.
597      * The following code can be used to dump the queue into a newly
598      * allocated array of {@code String}:
599      *
600      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
601      *
602      * Note that {@code toArray(new Object[0])} is identical in function to
603      * {@code toArray()}.
604      *
605      * @param a the array into which the elements of the queue are to
606      *          be stored, if it is big enough; otherwise, a new array of the
607      *          same runtime type is allocated for this purpose
608      * @return an array containing all of the elements in this queue
609      * @throws ArrayStoreException if the runtime type of the specified array
610      *         is not a supertype of the runtime type of every element in
611      *         this queue
612      * @throws NullPointerException if the specified array is null
613      */
614     @SuppressWarnings("unchecked")
toArray(T[] a)615     public <T> T[] toArray(T[] a) {
616         fullyLock();
617         try {
618             int size = count.get();
619             if (a.length < size)
620                 a = (T[])java.lang.reflect.Array.newInstance
621                     (a.getClass().getComponentType(), size);
622 
623             int k = 0;
624             for (Node<E> p = head.next; p != null; p = p.next)
625                 a[k++] = (T)p.item;
626             if (a.length > k)
627                 a[k] = null;
628             return a;
629         } finally {
630             fullyUnlock();
631         }
632     }
633 
toString()634     public String toString() {
635         return Helpers.collectionToString(this);
636     }
637 
638     /**
639      * Atomically removes all of the elements from this queue.
640      * The queue will be empty after this call returns.
641      */
clear()642     public void clear() {
643         fullyLock();
644         try {
645             for (Node<E> p, h = head; (p = h.next) != null; h = p) {
646                 h.next = h;
647                 p.item = null;
648             }
649             head = last;
650             // assert head.item == null && head.next == null;
651             if (count.getAndSet(0) == capacity)
652                 notFull.signal();
653         } finally {
654             fullyUnlock();
655         }
656     }
657 
658     /**
659      * @throws UnsupportedOperationException {@inheritDoc}
660      * @throws ClassCastException            {@inheritDoc}
661      * @throws NullPointerException          {@inheritDoc}
662      * @throws IllegalArgumentException      {@inheritDoc}
663      */
drainTo(Collection<? super E> c)664     public int drainTo(Collection<? super E> c) {
665         return drainTo(c, Integer.MAX_VALUE);
666     }
667 
668     /**
669      * @throws UnsupportedOperationException {@inheritDoc}
670      * @throws ClassCastException            {@inheritDoc}
671      * @throws NullPointerException          {@inheritDoc}
672      * @throws IllegalArgumentException      {@inheritDoc}
673      */
drainTo(Collection<? super E> c, int maxElements)674     public int drainTo(Collection<? super E> c, int maxElements) {
675         if (c == null)
676             throw new NullPointerException();
677         if (c == this)
678             throw new IllegalArgumentException();
679         if (maxElements <= 0)
680             return 0;
681         boolean signalNotFull = false;
682         final ReentrantLock takeLock = this.takeLock;
683         takeLock.lock();
684         try {
685             int n = Math.min(maxElements, count.get());
686             // count.get provides visibility to first n Nodes
687             Node<E> h = head;
688             int i = 0;
689             try {
690                 while (i < n) {
691                     Node<E> p = h.next;
692                     c.add(p.item);
693                     p.item = null;
694                     h.next = h;
695                     h = p;
696                     ++i;
697                 }
698                 return n;
699             } finally {
700                 // Restore invariants even if c.add() threw
701                 if (i > 0) {
702                     // assert h.item == null;
703                     head = h;
704                     signalNotFull = (count.getAndAdd(-i) == capacity);
705                 }
706             }
707         } finally {
708             takeLock.unlock();
709             if (signalNotFull)
710                 signalNotFull();
711         }
712     }
713 
714     /**
715      * Returns an iterator over the elements in this queue in proper sequence.
716      * The elements will be returned in order from first (head) to last (tail).
717      *
718      * <p>The returned iterator is
719      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
720      *
721      * @return an iterator over the elements in this queue in proper sequence
722      */
iterator()723     public Iterator<E> iterator() {
724         return new Itr();
725     }
726 
727     private class Itr implements Iterator<E> {
728         /*
729          * Basic weakly-consistent iterator.  At all times hold the next
730          * item to hand out so that if hasNext() reports true, we will
731          * still have it to return even if lost race with a take etc.
732          */
733 
734         private Node<E> current;
735         private Node<E> lastRet;
736         private E currentElement;
737 
Itr()738         Itr() {
739             fullyLock();
740             try {
741                 current = head.next;
742                 if (current != null)
743                     currentElement = current.item;
744             } finally {
745                 fullyUnlock();
746             }
747         }
748 
hasNext()749         public boolean hasNext() {
750             return current != null;
751         }
752 
next()753         public E next() {
754             fullyLock();
755             try {
756                 if (current == null)
757                     throw new NoSuchElementException();
758                 lastRet = current;
759                 E item = null;
760                 // Unlike other traversal methods, iterators must handle both:
761                 // - dequeued nodes (p.next == p)
762                 // - (possibly multiple) interior removed nodes (p.item == null)
763                 for (Node<E> p = current, q;; p = q) {
764                     if ((q = p.next) == p)
765                         q = head.next;
766                     if (q == null || (item = q.item) != null) {
767                         current = q;
768                         E x = currentElement;
769                         currentElement = item;
770                         return x;
771                     }
772                 }
773             } finally {
774                 fullyUnlock();
775             }
776         }
777 
remove()778         public void remove() {
779             if (lastRet == null)
780                 throw new IllegalStateException();
781             fullyLock();
782             try {
783                 Node<E> node = lastRet;
784                 lastRet = null;
785                 for (Node<E> trail = head, p = trail.next;
786                      p != null;
787                      trail = p, p = p.next) {
788                     if (p == node) {
789                         unlink(p, trail);
790                         break;
791                     }
792                 }
793             } finally {
794                 fullyUnlock();
795             }
796         }
797     }
798 
799     /** A customized variant of Spliterators.IteratorSpliterator */
800     static final class LBQSpliterator<E> implements Spliterator<E> {
801         static final int MAX_BATCH = 1 << 25;  // max batch array size;
802         final LinkedBlockingQueue<E> queue;
803         Node<E> current;    // current node; null until initialized
804         int batch;          // batch size for splits
805         boolean exhausted;  // true when no more nodes
806         long est;           // size estimate
LBQSpliterator(LinkedBlockingQueue<E> queue)807         LBQSpliterator(LinkedBlockingQueue<E> queue) {
808             this.queue = queue;
809             this.est = queue.size();
810         }
811 
estimateSize()812         public long estimateSize() { return est; }
813 
trySplit()814         public Spliterator<E> trySplit() {
815             Node<E> h;
816             final LinkedBlockingQueue<E> q = this.queue;
817             int b = batch;
818             int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
819             if (!exhausted &&
820                 ((h = current) != null || (h = q.head.next) != null) &&
821                 h.next != null) {
822                 Object[] a = new Object[n];
823                 int i = 0;
824                 Node<E> p = current;
825                 q.fullyLock();
826                 try {
827                     if (p != null || (p = q.head.next) != null) {
828                         do {
829                             if ((a[i] = p.item) != null)
830                                 ++i;
831                         } while ((p = p.next) != null && i < n);
832                     }
833                 } finally {
834                     q.fullyUnlock();
835                 }
836                 if ((current = p) == null) {
837                     est = 0L;
838                     exhausted = true;
839                 }
840                 else if ((est -= i) < 0L)
841                     est = 0L;
842                 if (i > 0) {
843                     batch = i;
844                     return Spliterators.spliterator
845                         (a, 0, i, (Spliterator.ORDERED |
846                                    Spliterator.NONNULL |
847                                    Spliterator.CONCURRENT));
848                 }
849             }
850             return null;
851         }
852 
forEachRemaining(Consumer<? super E> action)853         public void forEachRemaining(Consumer<? super E> action) {
854             if (action == null) throw new NullPointerException();
855             final LinkedBlockingQueue<E> q = this.queue;
856             if (!exhausted) {
857                 exhausted = true;
858                 Node<E> p = current;
859                 do {
860                     E e = null;
861                     q.fullyLock();
862                     try {
863                         if (p == null)
864                             p = q.head.next;
865                         while (p != null) {
866                             e = p.item;
867                             p = p.next;
868                             if (e != null)
869                                 break;
870                         }
871                     } finally {
872                         q.fullyUnlock();
873                     }
874                     if (e != null)
875                         action.accept(e);
876                 } while (p != null);
877             }
878         }
879 
tryAdvance(Consumer<? super E> action)880         public boolean tryAdvance(Consumer<? super E> action) {
881             if (action == null) throw new NullPointerException();
882             final LinkedBlockingQueue<E> q = this.queue;
883             if (!exhausted) {
884                 E e = null;
885                 q.fullyLock();
886                 try {
887                     if (current == null)
888                         current = q.head.next;
889                     while (current != null) {
890                         e = current.item;
891                         current = current.next;
892                         if (e != null)
893                             break;
894                     }
895                 } finally {
896                     q.fullyUnlock();
897                 }
898                 if (current == null)
899                     exhausted = true;
900                 if (e != null) {
901                     action.accept(e);
902                     return true;
903                 }
904             }
905             return false;
906         }
907 
characteristics()908         public int characteristics() {
909             return Spliterator.ORDERED | Spliterator.NONNULL |
910                 Spliterator.CONCURRENT;
911         }
912     }
913 
914     /**
915      * Returns a {@link Spliterator} over the elements in this queue.
916      *
917      * <p>The returned spliterator is
918      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
919      *
920      * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
921      * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
922      *
923      * @implNote
924      * The {@code Spliterator} implements {@code trySplit} to permit limited
925      * parallelism.
926      *
927      * @return a {@code Spliterator} over the elements in this queue
928      * @since 1.8
929      */
spliterator()930     public Spliterator<E> spliterator() {
931         return new LBQSpliterator<E>(this);
932     }
933 
934     /**
935      * Saves this queue to a stream (that is, serializes it).
936      *
937      * @param s the stream
938      * @throws java.io.IOException if an I/O error occurs
939      * @serialData The capacity is emitted (int), followed by all of
940      * its elements (each an {@code Object}) in the proper order,
941      * followed by a null
942      */
writeObject(java.io.ObjectOutputStream s)943     private void writeObject(java.io.ObjectOutputStream s)
944         throws java.io.IOException {
945 
946         fullyLock();
947         try {
948             // Write out any hidden stuff, plus capacity
949             s.defaultWriteObject();
950 
951             // Write out all elements in the proper order.
952             for (Node<E> p = head.next; p != null; p = p.next)
953                 s.writeObject(p.item);
954 
955             // Use trailing null as sentinel
956             s.writeObject(null);
957         } finally {
958             fullyUnlock();
959         }
960     }
961 
962     /**
963      * Reconstitutes this queue from a stream (that is, deserializes it).
964      * @param s the stream
965      * @throws ClassNotFoundException if the class of a serialized object
966      *         could not be found
967      * @throws java.io.IOException if an I/O error occurs
968      */
readObject(java.io.ObjectInputStream s)969     private void readObject(java.io.ObjectInputStream s)
970         throws java.io.IOException, ClassNotFoundException {
971         // Read in capacity, and any hidden stuff
972         s.defaultReadObject();
973 
974         count.set(0);
975         last = head = new Node<E>(null);
976 
977         // Read in all elements and place in queue
978         for (;;) {
979             @SuppressWarnings("unchecked")
980             E item = (E)s.readObject();
981             if (item == null)
982                 break;
983             add(item);
984         }
985     }
986 }
987