1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import java.lang.ref.WeakReference;
39 import java.util.AbstractQueue;
40 import java.util.Arrays;
41 import java.util.Collection;
42 import java.util.Iterator;
43 import java.util.NoSuchElementException;
44 import java.util.Objects;
45 import java.util.Spliterator;
46 import java.util.Spliterators;
47 import java.util.concurrent.locks.Condition;
48 import java.util.concurrent.locks.ReentrantLock;
49 import java.util.function.Consumer;
50 import java.util.function.Predicate;
51 
52 /**
53  * A bounded {@linkplain BlockingQueue blocking queue} backed by an
54  * array.  This queue orders elements FIFO (first-in-first-out).  The
55  * <em>head</em> of the queue is that element that has been on the
56  * queue the longest time.  The <em>tail</em> of the queue is that
57  * element that has been on the queue the shortest time. New elements
58  * are inserted at the tail of the queue, and the queue retrieval
59  * operations obtain elements at the head of the queue.
60  *
61  * <p>This is a classic &quot;bounded buffer&quot;, in which a
62  * fixed-sized array holds elements inserted by producers and
63  * extracted by consumers.  Once created, the capacity cannot be
64  * changed.  Attempts to {@code put} an element into a full queue
65  * will result in the operation blocking; attempts to {@code take} an
66  * element from an empty queue will similarly block.
67  *
68  * <p>This class supports an optional fairness policy for ordering
69  * waiting producer and consumer threads.  By default, this ordering
70  * is not guaranteed. However, a queue constructed with fairness set
71  * to {@code true} grants threads access in FIFO order. Fairness
72  * generally decreases throughput but reduces variability and avoids
73  * starvation.
74  *
75  * <p>This class and its iterator implement all of the <em>optional</em>
76  * methods of the {@link Collection} and {@link Iterator} interfaces.
77  *
78  * <p>This class is a member of the
79  * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
80  * Java Collections Framework</a>.
81  *
82  * @since 1.5
83  * @author Doug Lea
84  * @param <E> the type of elements held in this queue
85  */
86 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
87         implements BlockingQueue<E>, java.io.Serializable {
88 
89     /*
90      * Much of the implementation mechanics, especially the unusual
91      * nested loops, are shared and co-maintained with ArrayDeque.
92      */
93 
94     /**
95      * Serialization ID. This class relies on default serialization
96      * even for the items array, which is default-serialized, even if
97      * it is empty. Otherwise it could not be declared final, which is
98      * necessary here.
99      */
100     private static final long serialVersionUID = -817911632652898426L;
101 
102     /** The queued items */
103     @SuppressWarnings("serial") // Conditionally serializable
104     final Object[] items;
105 
106     /** items index for next take, poll, peek or remove */
107     int takeIndex;
108 
109     /** items index for next put, offer, or add */
110     int putIndex;
111 
112     /** Number of elements in the queue */
113     int count;
114 
115     /*
116      * Concurrency control uses the classic two-condition algorithm
117      * found in any textbook.
118      */
119 
120     /** Main lock guarding all access */
121     final ReentrantLock lock;
122 
123     /** Condition for waiting takes */
124     @SuppressWarnings("serial")  // Classes implementing Condition may be serializable.
125     private final Condition notEmpty;
126 
127     /** Condition for waiting puts */
128     @SuppressWarnings("serial")  // Classes implementing Condition may be serializable.
129     private final Condition notFull;
130 
131     /**
132      * Shared state for currently active iterators, or null if there
133      * are known not to be any.  Allows queue operations to update
134      * iterator state.
135      */
136     transient Itrs itrs;
137 
138     // Internal helper methods
139 
140     /**
141      * Increments i, mod modulus.
142      * Precondition and postcondition: 0 <= i < modulus.
143      */
inc(int i, int modulus)144     static final int inc(int i, int modulus) {
145         if (++i >= modulus) i = 0;
146         return i;
147     }
148 
149     /**
150      * Decrements i, mod modulus.
151      * Precondition and postcondition: 0 <= i < modulus.
152      */
dec(int i, int modulus)153     static final int dec(int i, int modulus) {
154         if (--i < 0) i = modulus - 1;
155         return i;
156     }
157 
158     /**
159      * Returns item at index i.
160      */
161     @SuppressWarnings("unchecked")
itemAt(int i)162     final E itemAt(int i) {
163         return (E) items[i];
164     }
165 
166     /**
167      * Returns element at array index i.
168      * This is a slight abuse of generics, accepted by javac.
169      */
170     @SuppressWarnings("unchecked")
itemAt(Object[] items, int i)171     static <E> E itemAt(Object[] items, int i) {
172         return (E) items[i];
173     }
174 
175     /**
176      * Inserts element at current put position, advances, and signals.
177      * Call only when holding lock.
178      */
enqueue(E e)179     private void enqueue(E e) {
180         // assert lock.isHeldByCurrentThread();
181         // assert lock.getHoldCount() == 1;
182         // assert items[putIndex] == null;
183         final Object[] items = this.items;
184         items[putIndex] = e;
185         if (++putIndex == items.length) putIndex = 0;
186         count++;
187         notEmpty.signal();
188     }
189 
190     /**
191      * Extracts element at current take position, advances, and signals.
192      * Call only when holding lock.
193      */
dequeue()194     private E dequeue() {
195         // assert lock.isHeldByCurrentThread();
196         // assert lock.getHoldCount() == 1;
197         // assert items[takeIndex] != null;
198         final Object[] items = this.items;
199         @SuppressWarnings("unchecked")
200         E e = (E) items[takeIndex];
201         items[takeIndex] = null;
202         if (++takeIndex == items.length) takeIndex = 0;
203         count--;
204         if (itrs != null)
205             itrs.elementDequeued();
206         notFull.signal();
207         return e;
208     }
209 
210     /**
211      * Deletes item at array index removeIndex.
212      * Utility for remove(Object) and iterator.remove.
213      * Call only when holding lock.
214      */
removeAt(final int removeIndex)215     void removeAt(final int removeIndex) {
216         // assert lock.isHeldByCurrentThread();
217         // assert lock.getHoldCount() == 1;
218         // assert items[removeIndex] != null;
219         // assert removeIndex >= 0 && removeIndex < items.length;
220         final Object[] items = this.items;
221         if (removeIndex == takeIndex) {
222             // removing front item; just advance
223             items[takeIndex] = null;
224             if (++takeIndex == items.length) takeIndex = 0;
225             count--;
226             if (itrs != null)
227                 itrs.elementDequeued();
228         } else {
229             // an "interior" remove
230 
231             // slide over all others up through putIndex.
232             for (int i = removeIndex, putIndex = this.putIndex;;) {
233                 int pred = i;
234                 if (++i == items.length) i = 0;
235                 if (i == putIndex) {
236                     items[pred] = null;
237                     this.putIndex = pred;
238                     break;
239                 }
240                 items[pred] = items[i];
241             }
242             count--;
243             if (itrs != null)
244                 itrs.removedAt(removeIndex);
245         }
246         notFull.signal();
247     }
248 
249     /**
250      * Creates an {@code ArrayBlockingQueue} with the given (fixed)
251      * capacity and default access policy.
252      *
253      * @param capacity the capacity of this queue
254      * @throws IllegalArgumentException if {@code capacity < 1}
255      */
ArrayBlockingQueue(int capacity)256     public ArrayBlockingQueue(int capacity) {
257         this(capacity, false);
258     }
259 
260     /**
261      * Creates an {@code ArrayBlockingQueue} with the given (fixed)
262      * capacity and the specified access policy.
263      *
264      * @param capacity the capacity of this queue
265      * @param fair if {@code true} then queue accesses for threads blocked
266      *        on insertion or removal, are processed in FIFO order;
267      *        if {@code false} the access order is unspecified.
268      * @throws IllegalArgumentException if {@code capacity < 1}
269      */
ArrayBlockingQueue(int capacity, boolean fair)270     public ArrayBlockingQueue(int capacity, boolean fair) {
271         if (capacity <= 0)
272             throw new IllegalArgumentException();
273         this.items = new Object[capacity];
274         lock = new ReentrantLock(fair);
275         notEmpty = lock.newCondition();
276         notFull =  lock.newCondition();
277     }
278 
279     /**
280      * Creates an {@code ArrayBlockingQueue} with the given (fixed)
281      * capacity, the specified access policy and initially containing the
282      * elements of the given collection,
283      * added in traversal order of the collection's iterator.
284      *
285      * @param capacity the capacity of this queue
286      * @param fair if {@code true} then queue accesses for threads blocked
287      *        on insertion or removal, are processed in FIFO order;
288      *        if {@code false} the access order is unspecified.
289      * @param c the collection of elements to initially contain
290      * @throws IllegalArgumentException if {@code capacity} is less than
291      *         {@code c.size()}, or less than 1.
292      * @throws NullPointerException if the specified collection or any
293      *         of its elements are null
294      */
ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)295     public ArrayBlockingQueue(int capacity, boolean fair,
296                               Collection<? extends E> c) {
297         this(capacity, fair);
298 
299         final ReentrantLock lock = this.lock;
300         lock.lock(); // Lock only for visibility, not mutual exclusion
301         try {
302             final Object[] items = this.items;
303             int i = 0;
304             try {
305                 for (E e : c)
306                     items[i++] = Objects.requireNonNull(e);
307             } catch (ArrayIndexOutOfBoundsException ex) {
308                 throw new IllegalArgumentException();
309             }
310             count = i;
311             putIndex = (i == capacity) ? 0 : i;
312         } finally {
313             lock.unlock();
314         }
315     }
316 
317     /**
318      * Inserts the specified element at the tail of this queue if it is
319      * possible to do so immediately without exceeding the queue's capacity,
320      * returning {@code true} upon success and throwing an
321      * {@code IllegalStateException} if this queue is full.
322      *
323      * @param e the element to add
324      * @return {@code true} (as specified by {@link Collection#add})
325      * @throws IllegalStateException if this queue is full
326      * @throws NullPointerException if the specified element is null
327      */
add(E e)328     public boolean add(E e) {
329         return super.add(e);
330     }
331 
332     /**
333      * Inserts the specified element at the tail of this queue if it is
334      * possible to do so immediately without exceeding the queue's capacity,
335      * returning {@code true} upon success and {@code false} if this queue
336      * is full.  This method is generally preferable to method {@link #add},
337      * which can fail to insert an element only by throwing an exception.
338      *
339      * @throws NullPointerException if the specified element is null
340      */
offer(E e)341     public boolean offer(E e) {
342         Objects.requireNonNull(e);
343         final ReentrantLock lock = this.lock;
344         lock.lock();
345         try {
346             if (count == items.length)
347                 return false;
348             else {
349                 enqueue(e);
350                 return true;
351             }
352         } finally {
353             lock.unlock();
354         }
355     }
356 
357     /**
358      * Inserts the specified element at the tail of this queue, waiting
359      * for space to become available if the queue is full.
360      *
361      * @throws InterruptedException {@inheritDoc}
362      * @throws NullPointerException {@inheritDoc}
363      */
put(E e)364     public void put(E e) throws InterruptedException {
365         Objects.requireNonNull(e);
366         final ReentrantLock lock = this.lock;
367         lock.lockInterruptibly();
368         try {
369             while (count == items.length)
370                 notFull.await();
371             enqueue(e);
372         } finally {
373             lock.unlock();
374         }
375     }
376 
377     /**
378      * Inserts the specified element at the tail of this queue, waiting
379      * up to the specified wait time for space to become available if
380      * the queue is full.
381      *
382      * @throws InterruptedException {@inheritDoc}
383      * @throws NullPointerException {@inheritDoc}
384      */
offer(E e, long timeout, TimeUnit unit)385     public boolean offer(E e, long timeout, TimeUnit unit)
386         throws InterruptedException {
387 
388         Objects.requireNonNull(e);
389         long nanos = unit.toNanos(timeout);
390         final ReentrantLock lock = this.lock;
391         lock.lockInterruptibly();
392         try {
393             while (count == items.length) {
394                 if (nanos <= 0L)
395                     return false;
396                 nanos = notFull.awaitNanos(nanos);
397             }
398             enqueue(e);
399             return true;
400         } finally {
401             lock.unlock();
402         }
403     }
404 
poll()405     public E poll() {
406         final ReentrantLock lock = this.lock;
407         lock.lock();
408         try {
409             return (count == 0) ? null : dequeue();
410         } finally {
411             lock.unlock();
412         }
413     }
414 
take()415     public E take() throws InterruptedException {
416         final ReentrantLock lock = this.lock;
417         lock.lockInterruptibly();
418         try {
419             while (count == 0)
420                 notEmpty.await();
421             return dequeue();
422         } finally {
423             lock.unlock();
424         }
425     }
426 
poll(long timeout, TimeUnit unit)427     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
428         long nanos = unit.toNanos(timeout);
429         final ReentrantLock lock = this.lock;
430         lock.lockInterruptibly();
431         try {
432             while (count == 0) {
433                 if (nanos <= 0L)
434                     return null;
435                 nanos = notEmpty.awaitNanos(nanos);
436             }
437             return dequeue();
438         } finally {
439             lock.unlock();
440         }
441     }
442 
peek()443     public E peek() {
444         final ReentrantLock lock = this.lock;
445         lock.lock();
446         try {
447             return itemAt(takeIndex); // null when queue is empty
448         } finally {
449             lock.unlock();
450         }
451     }
452 
453     // this doc comment is overridden to remove the reference to collections
454     // greater in size than Integer.MAX_VALUE
455     /**
456      * Returns the number of elements in this queue.
457      *
458      * @return the number of elements in this queue
459      */
size()460     public int size() {
461         final ReentrantLock lock = this.lock;
462         lock.lock();
463         try {
464             return count;
465         } finally {
466             lock.unlock();
467         }
468     }
469 
470     // this doc comment is a modified copy of the inherited doc comment,
471     // without the reference to unlimited queues.
472     /**
473      * Returns the number of additional elements that this queue can ideally
474      * (in the absence of memory or resource constraints) accept without
475      * blocking. This is always equal to the initial capacity of this queue
476      * less the current {@code size} of this queue.
477      *
478      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
479      * an element will succeed by inspecting {@code remainingCapacity}
480      * because it may be the case that another thread is about to
481      * insert or remove an element.
482      */
remainingCapacity()483     public int remainingCapacity() {
484         final ReentrantLock lock = this.lock;
485         lock.lock();
486         try {
487             return items.length - count;
488         } finally {
489             lock.unlock();
490         }
491     }
492 
493     /**
494      * Removes a single instance of the specified element from this queue,
495      * if it is present.  More formally, removes an element {@code e} such
496      * that {@code o.equals(e)}, if this queue contains one or more such
497      * elements.
498      * Returns {@code true} if this queue contained the specified element
499      * (or equivalently, if this queue changed as a result of the call).
500      *
501      * <p>Removal of interior elements in circular array based queues
502      * is an intrinsically slow and disruptive operation, so should
503      * be undertaken only in exceptional circumstances, ideally
504      * only when the queue is known not to be accessible by other
505      * threads.
506      *
507      * @param o element to be removed from this queue, if present
508      * @return {@code true} if this queue changed as a result of the call
509      */
remove(Object o)510     public boolean remove(Object o) {
511         if (o == null) return false;
512         final ReentrantLock lock = this.lock;
513         lock.lock();
514         try {
515             if (count > 0) {
516                 final Object[] items = this.items;
517                 for (int i = takeIndex, end = putIndex,
518                          to = (i < end) ? end : items.length;
519                      ; i = 0, to = end) {
520                     for (; i < to; i++)
521                         if (o.equals(items[i])) {
522                             removeAt(i);
523                             return true;
524                         }
525                     if (to == end) break;
526                 }
527             }
528             return false;
529         } finally {
530             lock.unlock();
531         }
532     }
533 
534     /**
535      * Returns {@code true} if this queue contains the specified element.
536      * More formally, returns {@code true} if and only if this queue contains
537      * at least one element {@code e} such that {@code o.equals(e)}.
538      *
539      * @param o object to be checked for containment in this queue
540      * @return {@code true} if this queue contains the specified element
541      */
contains(Object o)542     public boolean contains(Object o) {
543         if (o == null) return false;
544         final ReentrantLock lock = this.lock;
545         lock.lock();
546         try {
547             if (count > 0) {
548                 final Object[] items = this.items;
549                 for (int i = takeIndex, end = putIndex,
550                          to = (i < end) ? end : items.length;
551                      ; i = 0, to = end) {
552                     for (; i < to; i++)
553                         if (o.equals(items[i]))
554                             return true;
555                     if (to == end) break;
556                 }
557             }
558             return false;
559         } finally {
560             lock.unlock();
561         }
562     }
563 
564     /**
565      * Returns an array containing all of the elements in this queue, in
566      * proper sequence.
567      *
568      * <p>The returned array will be "safe" in that no references to it are
569      * maintained by this queue.  (In other words, this method must allocate
570      * a new array).  The caller is thus free to modify the returned array.
571      *
572      * <p>This method acts as bridge between array-based and collection-based
573      * APIs.
574      *
575      * @return an array containing all of the elements in this queue
576      */
toArray()577     public Object[] toArray() {
578         final ReentrantLock lock = this.lock;
579         lock.lock();
580         try {
581             final Object[] items = this.items;
582             final int end = takeIndex + count;
583             final Object[] a = Arrays.copyOfRange(items, takeIndex, end);
584             if (end != putIndex)
585                 System.arraycopy(items, 0, a, items.length - takeIndex, putIndex);
586             return a;
587         } finally {
588             lock.unlock();
589         }
590     }
591 
592     /**
593      * Returns an array containing all of the elements in this queue, in
594      * proper sequence; the runtime type of the returned array is that of
595      * the specified array.  If the queue fits in the specified array, it
596      * is returned therein.  Otherwise, a new array is allocated with the
597      * runtime type of the specified array and the size of this queue.
598      *
599      * <p>If this queue fits in the specified array with room to spare
600      * (i.e., the array has more elements than this queue), the element in
601      * the array immediately following the end of the queue is set to
602      * {@code null}.
603      *
604      * <p>Like the {@link #toArray()} method, this method acts as bridge between
605      * array-based and collection-based APIs.  Further, this method allows
606      * precise control over the runtime type of the output array, and may,
607      * under certain circumstances, be used to save allocation costs.
608      *
609      * <p>Suppose {@code x} is a queue known to contain only strings.
610      * The following code can be used to dump the queue into a newly
611      * allocated array of {@code String}:
612      *
613      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
614      *
615      * Note that {@code toArray(new Object[0])} is identical in function to
616      * {@code toArray()}.
617      *
618      * @param a the array into which the elements of the queue are to
619      *          be stored, if it is big enough; otherwise, a new array of the
620      *          same runtime type is allocated for this purpose
621      * @return an array containing all of the elements in this queue
622      * @throws ArrayStoreException if the runtime type of the specified array
623      *         is not a supertype of the runtime type of every element in
624      *         this queue
625      * @throws NullPointerException if the specified array is null
626      */
627     @SuppressWarnings("unchecked")
toArray(T[] a)628     public <T> T[] toArray(T[] a) {
629         final ReentrantLock lock = this.lock;
630         lock.lock();
631         try {
632             final Object[] items = this.items;
633             final int count = this.count;
634             final int firstLeg = Math.min(items.length - takeIndex, count);
635             if (a.length < count) {
636                 a = (T[]) Arrays.copyOfRange(items, takeIndex, takeIndex + count,
637                                              a.getClass());
638             } else {
639                 System.arraycopy(items, takeIndex, a, 0, firstLeg);
640                 if (a.length > count)
641                     a[count] = null;
642             }
643             if (firstLeg < count)
644                 System.arraycopy(items, 0, a, firstLeg, putIndex);
645             return a;
646         } finally {
647             lock.unlock();
648         }
649     }
650 
toString()651     public String toString() {
652         return Helpers.collectionToString(this);
653     }
654 
655     /**
656      * Atomically removes all of the elements from this queue.
657      * The queue will be empty after this call returns.
658      */
clear()659     public void clear() {
660         final ReentrantLock lock = this.lock;
661         lock.lock();
662         try {
663             int k;
664             if ((k = count) > 0) {
665                 circularClear(items, takeIndex, putIndex);
666                 takeIndex = putIndex;
667                 count = 0;
668                 if (itrs != null)
669                     itrs.queueIsEmpty();
670                 for (; k > 0 && lock.hasWaiters(notFull); k--)
671                     notFull.signal();
672             }
673         } finally {
674             lock.unlock();
675         }
676     }
677 
678     /**
679      * Nulls out slots starting at array index i, upto index end.
680      * Condition i == end means "full" - the entire array is cleared.
681      */
circularClear(Object[] items, int i, int end)682     private static void circularClear(Object[] items, int i, int end) {
683         // assert 0 <= i && i < items.length;
684         // assert 0 <= end && end < items.length;
685         for (int to = (i < end) ? end : items.length;
686              ; i = 0, to = end) {
687             for (; i < to; i++) items[i] = null;
688             if (to == end) break;
689         }
690     }
691 
692     /**
693      * @throws UnsupportedOperationException {@inheritDoc}
694      * @throws ClassCastException            {@inheritDoc}
695      * @throws NullPointerException          {@inheritDoc}
696      * @throws IllegalArgumentException      {@inheritDoc}
697      */
drainTo(Collection<? super E> c)698     public int drainTo(Collection<? super E> c) {
699         return drainTo(c, Integer.MAX_VALUE);
700     }
701 
702     /**
703      * @throws UnsupportedOperationException {@inheritDoc}
704      * @throws ClassCastException            {@inheritDoc}
705      * @throws NullPointerException          {@inheritDoc}
706      * @throws IllegalArgumentException      {@inheritDoc}
707      */
drainTo(Collection<? super E> c, int maxElements)708     public int drainTo(Collection<? super E> c, int maxElements) {
709         Objects.requireNonNull(c);
710         if (c == this)
711             throw new IllegalArgumentException();
712         if (maxElements <= 0)
713             return 0;
714         final Object[] items = this.items;
715         final ReentrantLock lock = this.lock;
716         lock.lock();
717         try {
718             int n = Math.min(maxElements, count);
719             int take = takeIndex;
720             int i = 0;
721             try {
722                 while (i < n) {
723                     @SuppressWarnings("unchecked")
724                     E e = (E) items[take];
725                     c.add(e);
726                     items[take] = null;
727                     if (++take == items.length) take = 0;
728                     i++;
729                 }
730                 return n;
731             } finally {
732                 // Restore invariants even if c.add() threw
733                 if (i > 0) {
734                     count -= i;
735                     takeIndex = take;
736                     if (itrs != null) {
737                         if (count == 0)
738                             itrs.queueIsEmpty();
739                         else if (i > take)
740                             itrs.takeIndexWrapped();
741                     }
742                     for (; i > 0 && lock.hasWaiters(notFull); i--)
743                         notFull.signal();
744                 }
745             }
746         } finally {
747             lock.unlock();
748         }
749     }
750 
751     /**
752      * Returns an iterator over the elements in this queue in proper sequence.
753      * The elements will be returned in order from first (head) to last (tail).
754      *
755      * <p>The returned iterator is
756      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
757      *
758      * @return an iterator over the elements in this queue in proper sequence
759      */
iterator()760     public Iterator<E> iterator() {
761         return new Itr();
762     }
763 
764     /**
765      * Shared data between iterators and their queue, allowing queue
766      * modifications to update iterators when elements are removed.
767      *
768      * This adds a lot of complexity for the sake of correctly
769      * handling some uncommon operations, but the combination of
770      * circular-arrays and supporting interior removes (i.e., those
771      * not at head) would cause iterators to sometimes lose their
772      * places and/or (re)report elements they shouldn't.  To avoid
773      * this, when a queue has one or more iterators, it keeps iterator
774      * state consistent by:
775      *
776      * (1) keeping track of the number of "cycles", that is, the
777      *     number of times takeIndex has wrapped around to 0.
778      * (2) notifying all iterators via the callback removedAt whenever
779      *     an interior element is removed (and thus other elements may
780      *     be shifted).
781      *
782      * These suffice to eliminate iterator inconsistencies, but
783      * unfortunately add the secondary responsibility of maintaining
784      * the list of iterators.  We track all active iterators in a
785      * simple linked list (accessed only when the queue's lock is
786      * held) of weak references to Itr.  The list is cleaned up using
787      * 3 different mechanisms:
788      *
789      * (1) Whenever a new iterator is created, do some O(1) checking for
790      *     stale list elements.
791      *
792      * (2) Whenever takeIndex wraps around to 0, check for iterators
793      *     that have been unused for more than one wrap-around cycle.
794      *
795      * (3) Whenever the queue becomes empty, all iterators are notified
796      *     and this entire data structure is discarded.
797      *
798      * So in addition to the removedAt callback that is necessary for
799      * correctness, iterators have the shutdown and takeIndexWrapped
800      * callbacks that help remove stale iterators from the list.
801      *
802      * Whenever a list element is examined, it is expunged if either
803      * the GC has determined that the iterator is discarded, or if the
804      * iterator reports that it is "detached" (does not need any
805      * further state updates).  Overhead is maximal when takeIndex
806      * never advances, iterators are discarded before they are
807      * exhausted, and all removals are interior removes, in which case
808      * all stale iterators are discovered by the GC.  But even in this
809      * case we don't increase the amortized complexity.
810      *
811      * Care must be taken to keep list sweeping methods from
812      * reentrantly invoking another such method, causing subtle
813      * corruption bugs.
814      */
815     class Itrs {
816 
817         /**
818          * Node in a linked list of weak iterator references.
819          */
820         private class Node extends WeakReference<Itr> {
821             Node next;
822 
Node(Itr iterator, Node next)823             Node(Itr iterator, Node next) {
824                 super(iterator);
825                 this.next = next;
826             }
827         }
828 
829         /** Incremented whenever takeIndex wraps around to 0 */
830         int cycles;
831 
832         /** Linked list of weak iterator references */
833         private Node head;
834 
835         /** Used to expunge stale iterators */
836         private Node sweeper;
837 
838         private static final int SHORT_SWEEP_PROBES = 4;
839         private static final int LONG_SWEEP_PROBES = 16;
840 
Itrs(Itr initial)841         Itrs(Itr initial) {
842             register(initial);
843         }
844 
845         /**
846          * Sweeps itrs, looking for and expunging stale iterators.
847          * If at least one was found, tries harder to find more.
848          * Called only from iterating thread.
849          *
850          * @param tryHarder whether to start in try-harder mode, because
851          * there is known to be at least one iterator to collect
852          */
doSomeSweeping(boolean tryHarder)853         void doSomeSweeping(boolean tryHarder) {
854             // assert lock.isHeldByCurrentThread();
855             // assert head != null;
856             int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
857             Node o, p;
858             final Node sweeper = this.sweeper;
859             boolean passedGo;   // to limit search to one full sweep
860 
861             if (sweeper == null) {
862                 o = null;
863                 p = head;
864                 passedGo = true;
865             } else {
866                 o = sweeper;
867                 p = o.next;
868                 passedGo = false;
869             }
870 
871             for (; probes > 0; probes--) {
872                 if (p == null) {
873                     if (passedGo)
874                         break;
875                     o = null;
876                     p = head;
877                     passedGo = true;
878                 }
879                 final Itr it = p.get();
880                 final Node next = p.next;
881                 if (it == null || it.isDetached()) {
882                     // found a discarded/exhausted iterator
883                     probes = LONG_SWEEP_PROBES; // "try harder"
884                     // unlink p
885                     p.clear();
886                     p.next = null;
887                     if (o == null) {
888                         head = next;
889                         if (next == null) {
890                             // We've run out of iterators to track; retire
891                             itrs = null;
892                             return;
893                         }
894                     }
895                     else
896                         o.next = next;
897                 } else {
898                     o = p;
899                 }
900                 p = next;
901             }
902 
903             this.sweeper = (p == null) ? null : o;
904         }
905 
906         /**
907          * Adds a new iterator to the linked list of tracked iterators.
908          */
register(Itr itr)909         void register(Itr itr) {
910             // assert lock.isHeldByCurrentThread();
911             head = new Node(itr, head);
912         }
913 
914         /**
915          * Called whenever takeIndex wraps around to 0.
916          *
917          * Notifies all iterators, and expunges any that are now stale.
918          */
takeIndexWrapped()919         void takeIndexWrapped() {
920             // assert lock.isHeldByCurrentThread();
921             cycles++;
922             for (Node o = null, p = head; p != null;) {
923                 final Itr it = p.get();
924                 final Node next = p.next;
925                 if (it == null || it.takeIndexWrapped()) {
926                     // unlink p
927                     // assert it == null || it.isDetached();
928                     p.clear();
929                     p.next = null;
930                     if (o == null)
931                         head = next;
932                     else
933                         o.next = next;
934                 } else {
935                     o = p;
936                 }
937                 p = next;
938             }
939             if (head == null)   // no more iterators to track
940                 itrs = null;
941         }
942 
943         /**
944          * Called whenever an interior remove (not at takeIndex) occurred.
945          *
946          * Notifies all iterators, and expunges any that are now stale.
947          */
removedAt(int removedIndex)948         void removedAt(int removedIndex) {
949             for (Node o = null, p = head; p != null;) {
950                 final Itr it = p.get();
951                 final Node next = p.next;
952                 if (it == null || it.removedAt(removedIndex)) {
953                     // unlink p
954                     // assert it == null || it.isDetached();
955                     p.clear();
956                     p.next = null;
957                     if (o == null)
958                         head = next;
959                     else
960                         o.next = next;
961                 } else {
962                     o = p;
963                 }
964                 p = next;
965             }
966             if (head == null)   // no more iterators to track
967                 itrs = null;
968         }
969 
970         /**
971          * Called whenever the queue becomes empty.
972          *
973          * Notifies all active iterators that the queue is empty,
974          * clears all weak refs, and unlinks the itrs datastructure.
975          */
queueIsEmpty()976         void queueIsEmpty() {
977             // assert lock.isHeldByCurrentThread();
978             for (Node p = head; p != null; p = p.next) {
979                 Itr it = p.get();
980                 if (it != null) {
981                     p.clear();
982                     it.shutdown();
983                 }
984             }
985             head = null;
986             itrs = null;
987         }
988 
989         /**
990          * Called whenever an element has been dequeued (at takeIndex).
991          */
elementDequeued()992         void elementDequeued() {
993             // assert lock.isHeldByCurrentThread();
994             if (count == 0)
995                 queueIsEmpty();
996             else if (takeIndex == 0)
997                 takeIndexWrapped();
998         }
999     }
1000 
1001     /**
1002      * Iterator for ArrayBlockingQueue.
1003      *
1004      * To maintain weak consistency with respect to puts and takes, we
1005      * read ahead one slot, so as to not report hasNext true but then
1006      * not have an element to return.
1007      *
1008      * We switch into "detached" mode (allowing prompt unlinking from
1009      * itrs without help from the GC) when all indices are negative, or
1010      * when hasNext returns false for the first time.  This allows the
1011      * iterator to track concurrent updates completely accurately,
1012      * except for the corner case of the user calling Iterator.remove()
1013      * after hasNext() returned false.  Even in this case, we ensure
1014      * that we don't remove the wrong element by keeping track of the
1015      * expected element to remove, in lastItem.  Yes, we may fail to
1016      * remove lastItem from the queue if it moved due to an interleaved
1017      * interior remove while in detached mode.
1018      *
1019      * Method forEachRemaining, added in Java 8, is treated similarly
1020      * to hasNext returning false, in that we switch to detached mode,
1021      * but we regard it as an even stronger request to "close" this
1022      * iteration, and don't bother supporting subsequent remove().
1023      */
1024     private class Itr implements Iterator<E> {
1025         /** Index to look for new nextItem; NONE at end */
1026         private int cursor;
1027 
1028         /** Element to be returned by next call to next(); null if none */
1029         private E nextItem;
1030 
1031         /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
1032         private int nextIndex;
1033 
1034         /** Last element returned; null if none or not detached. */
1035         private E lastItem;
1036 
1037         /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
1038         private int lastRet;
1039 
1040         /** Previous value of takeIndex, or DETACHED when detached */
1041         private int prevTakeIndex;
1042 
1043         /** Previous value of iters.cycles */
1044         private int prevCycles;
1045 
1046         /** Special index value indicating "not available" or "undefined" */
1047         private static final int NONE = -1;
1048 
1049         /**
1050          * Special index value indicating "removed elsewhere", that is,
1051          * removed by some operation other than a call to this.remove().
1052          */
1053         private static final int REMOVED = -2;
1054 
1055         /** Special value for prevTakeIndex indicating "detached mode" */
1056         private static final int DETACHED = -3;
1057 
Itr()1058         Itr() {
1059             lastRet = NONE;
1060             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1061             lock.lock();
1062             try {
1063                 if (count == 0) {
1064                     // assert itrs == null;
1065                     cursor = NONE;
1066                     nextIndex = NONE;
1067                     prevTakeIndex = DETACHED;
1068                 } else {
1069                     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1070                     prevTakeIndex = takeIndex;
1071                     nextItem = itemAt(nextIndex = takeIndex);
1072                     cursor = incCursor(takeIndex);
1073                     if (itrs == null) {
1074                         itrs = new Itrs(this);
1075                     } else {
1076                         itrs.register(this); // in this order
1077                         itrs.doSomeSweeping(false);
1078                     }
1079                     prevCycles = itrs.cycles;
1080                     // assert takeIndex >= 0;
1081                     // assert prevTakeIndex == takeIndex;
1082                     // assert nextIndex >= 0;
1083                     // assert nextItem != null;
1084                 }
1085             } finally {
1086                 lock.unlock();
1087             }
1088         }
1089 
isDetached()1090         boolean isDetached() {
1091             // assert lock.isHeldByCurrentThread();
1092             return prevTakeIndex < 0;
1093         }
1094 
incCursor(int index)1095         private int incCursor(int index) {
1096             // assert lock.isHeldByCurrentThread();
1097             if (++index == items.length) index = 0;
1098             if (index == putIndex) index = NONE;
1099             return index;
1100         }
1101 
1102         /**
1103          * Returns true if index is invalidated by the given number of
1104          * dequeues, starting from prevTakeIndex.
1105          */
invalidated(int index, int prevTakeIndex, long dequeues, int length)1106         private boolean invalidated(int index, int prevTakeIndex,
1107                                     long dequeues, int length) {
1108             if (index < 0)
1109                 return false;
1110             int distance = index - prevTakeIndex;
1111             if (distance < 0)
1112                 distance += length;
1113             return dequeues > distance;
1114         }
1115 
1116         /**
1117          * Adjusts indices to incorporate all dequeues since the last
1118          * operation on this iterator.  Call only from iterating thread.
1119          */
incorporateDequeues()1120         private void incorporateDequeues() {
1121             // assert lock.isHeldByCurrentThread();
1122             // assert itrs != null;
1123             // assert !isDetached();
1124             // assert count > 0;
1125 
1126             final int cycles = itrs.cycles;
1127             final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1128             final int prevCycles = this.prevCycles;
1129             final int prevTakeIndex = this.prevTakeIndex;
1130 
1131             if (cycles != prevCycles || takeIndex != prevTakeIndex) {
1132                 final int len = items.length;
1133                 // how far takeIndex has advanced since the previous
1134                 // operation of this iterator
1135                 long dequeues = (long) (cycles - prevCycles) * len
1136                     + (takeIndex - prevTakeIndex);
1137 
1138                 // Check indices for invalidation
1139                 if (invalidated(lastRet, prevTakeIndex, dequeues, len))
1140                     lastRet = REMOVED;
1141                 if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
1142                     nextIndex = REMOVED;
1143                 if (invalidated(cursor, prevTakeIndex, dequeues, len))
1144                     cursor = takeIndex;
1145 
1146                 if (cursor < 0 && nextIndex < 0 && lastRet < 0)
1147                     detach();
1148                 else {
1149                     this.prevCycles = cycles;
1150                     this.prevTakeIndex = takeIndex;
1151                 }
1152             }
1153         }
1154 
1155         /**
1156          * Called when itrs should stop tracking this iterator, either
1157          * because there are no more indices to update (cursor < 0 &&
1158          * nextIndex < 0 && lastRet < 0) or as a special exception, when
1159          * lastRet >= 0, because hasNext() is about to return false for the
1160          * first time.  Call only from iterating thread.
1161          */
detach()1162         private void detach() {
1163             // Switch to detached mode
1164             // assert lock.isHeldByCurrentThread();
1165             // assert cursor == NONE;
1166             // assert nextIndex < 0;
1167             // assert lastRet < 0 || nextItem == null;
1168             // assert lastRet < 0 ^ lastItem != null;
1169             if (prevTakeIndex >= 0) {
1170                 // assert itrs != null;
1171                 prevTakeIndex = DETACHED;
1172                 // try to unlink from itrs (but not too hard)
1173                 itrs.doSomeSweeping(true);
1174             }
1175         }
1176 
1177         /**
1178          * For performance reasons, we would like not to acquire a lock in
1179          * hasNext in the common case.  To allow for this, we only access
1180          * fields (i.e. nextItem) that are not modified by update operations
1181          * triggered by queue modifications.
1182          */
hasNext()1183         public boolean hasNext() {
1184             if (nextItem != null)
1185                 return true;
1186             noNext();
1187             return false;
1188         }
1189 
noNext()1190         private void noNext() {
1191             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1192             lock.lock();
1193             try {
1194                 // assert cursor == NONE;
1195                 // assert nextIndex == NONE;
1196                 if (!isDetached()) {
1197                     // assert lastRet >= 0;
1198                     incorporateDequeues(); // might update lastRet
1199                     if (lastRet >= 0) {
1200                         lastItem = itemAt(lastRet);
1201                         // assert lastItem != null;
1202                         detach();
1203                     }
1204                 }
1205                 // assert isDetached();
1206                 // assert lastRet < 0 ^ lastItem != null;
1207             } finally {
1208                 lock.unlock();
1209             }
1210         }
1211 
next()1212         public E next() {
1213             final E e = nextItem;
1214             if (e == null)
1215                 throw new NoSuchElementException();
1216             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1217             lock.lock();
1218             try {
1219                 if (!isDetached())
1220                     incorporateDequeues();
1221                 // assert nextIndex != NONE;
1222                 // assert lastItem == null;
1223                 lastRet = nextIndex;
1224                 final int cursor = this.cursor;
1225                 if (cursor >= 0) {
1226                     nextItem = itemAt(nextIndex = cursor);
1227                     // assert nextItem != null;
1228                     this.cursor = incCursor(cursor);
1229                 } else {
1230                     nextIndex = NONE;
1231                     nextItem = null;
1232                     if (lastRet == REMOVED) detach();
1233                 }
1234             } finally {
1235                 lock.unlock();
1236             }
1237             return e;
1238         }
1239 
forEachRemaining(Consumer<? super E> action)1240         public void forEachRemaining(Consumer<? super E> action) {
1241             Objects.requireNonNull(action);
1242             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1243             lock.lock();
1244             try {
1245                 final E e = nextItem;
1246                 if (e == null) return;
1247                 if (!isDetached())
1248                     incorporateDequeues();
1249                 action.accept(e);
1250                 if (isDetached() || cursor < 0) return;
1251                 final Object[] items = ArrayBlockingQueue.this.items;
1252                 for (int i = cursor, end = putIndex,
1253                          to = (i < end) ? end : items.length;
1254                      ; i = 0, to = end) {
1255                     for (; i < to; i++)
1256                         action.accept(itemAt(items, i));
1257                     if (to == end) break;
1258                 }
1259             } finally {
1260                 // Calling forEachRemaining is a strong hint that this
1261                 // iteration is surely over; supporting remove() after
1262                 // forEachRemaining() is more trouble than it's worth
1263                 cursor = nextIndex = lastRet = NONE;
1264                 nextItem = lastItem = null;
1265                 detach();
1266                 lock.unlock();
1267             }
1268         }
1269 
remove()1270         public void remove() {
1271             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1272             lock.lock();
1273             // assert lock.getHoldCount() == 1;
1274             try {
1275                 if (!isDetached())
1276                     incorporateDequeues(); // might update lastRet or detach
1277                 final int lastRet = this.lastRet;
1278                 this.lastRet = NONE;
1279                 if (lastRet >= 0) {
1280                     if (!isDetached())
1281                         removeAt(lastRet);
1282                     else {
1283                         final E lastItem = this.lastItem;
1284                         // assert lastItem != null;
1285                         this.lastItem = null;
1286                         if (itemAt(lastRet) == lastItem)
1287                             removeAt(lastRet);
1288                     }
1289                 } else if (lastRet == NONE)
1290                     throw new IllegalStateException();
1291                 // else lastRet == REMOVED and the last returned element was
1292                 // previously asynchronously removed via an operation other
1293                 // than this.remove(), so nothing to do.
1294 
1295                 if (cursor < 0 && nextIndex < 0)
1296                     detach();
1297             } finally {
1298                 lock.unlock();
1299                 // assert lastRet == NONE;
1300                 // assert lastItem == null;
1301             }
1302         }
1303 
1304         /**
1305          * Called to notify the iterator that the queue is empty, or that it
1306          * has fallen hopelessly behind, so that it should abandon any
1307          * further iteration, except possibly to return one more element
1308          * from next(), as promised by returning true from hasNext().
1309          */
shutdown()1310         void shutdown() {
1311             // assert lock.isHeldByCurrentThread();
1312             cursor = NONE;
1313             if (nextIndex >= 0)
1314                 nextIndex = REMOVED;
1315             if (lastRet >= 0) {
1316                 lastRet = REMOVED;
1317                 lastItem = null;
1318             }
1319             prevTakeIndex = DETACHED;
1320             // Don't set nextItem to null because we must continue to be
1321             // able to return it on next().
1322             //
1323             // Caller will unlink from itrs when convenient.
1324         }
1325 
distance(int index, int prevTakeIndex, int length)1326         private int distance(int index, int prevTakeIndex, int length) {
1327             int distance = index - prevTakeIndex;
1328             if (distance < 0)
1329                 distance += length;
1330             return distance;
1331         }
1332 
1333         /**
1334          * Called whenever an interior remove (not at takeIndex) occurred.
1335          *
1336          * @return true if this iterator should be unlinked from itrs
1337          */
removedAt(int removedIndex)1338         boolean removedAt(int removedIndex) {
1339             // assert lock.isHeldByCurrentThread();
1340             if (isDetached())
1341                 return true;
1342 
1343             final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1344             final int prevTakeIndex = this.prevTakeIndex;
1345             final int len = items.length;
1346             // distance from prevTakeIndex to removedIndex
1347             final int removedDistance =
1348                 len * (itrs.cycles - this.prevCycles
1349                        + ((removedIndex < takeIndex) ? 1 : 0))
1350                 + (removedIndex - prevTakeIndex);
1351             // assert itrs.cycles - this.prevCycles >= 0;
1352             // assert itrs.cycles - this.prevCycles <= 1;
1353             // assert removedDistance > 0;
1354             // assert removedIndex != takeIndex;
1355             int cursor = this.cursor;
1356             if (cursor >= 0) {
1357                 int x = distance(cursor, prevTakeIndex, len);
1358                 if (x == removedDistance) {
1359                     if (cursor == putIndex)
1360                         this.cursor = cursor = NONE;
1361                 }
1362                 else if (x > removedDistance) {
1363                     // assert cursor != prevTakeIndex;
1364                     this.cursor = cursor = dec(cursor, len);
1365                 }
1366             }
1367             int lastRet = this.lastRet;
1368             if (lastRet >= 0) {
1369                 int x = distance(lastRet, prevTakeIndex, len);
1370                 if (x == removedDistance)
1371                     this.lastRet = lastRet = REMOVED;
1372                 else if (x > removedDistance)
1373                     this.lastRet = lastRet = dec(lastRet, len);
1374             }
1375             int nextIndex = this.nextIndex;
1376             if (nextIndex >= 0) {
1377                 int x = distance(nextIndex, prevTakeIndex, len);
1378                 if (x == removedDistance)
1379                     this.nextIndex = nextIndex = REMOVED;
1380                 else if (x > removedDistance)
1381                     this.nextIndex = nextIndex = dec(nextIndex, len);
1382             }
1383             if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
1384                 this.prevTakeIndex = DETACHED;
1385                 return true;
1386             }
1387             return false;
1388         }
1389 
1390         /**
1391          * Called whenever takeIndex wraps around to zero.
1392          *
1393          * @return true if this iterator should be unlinked from itrs
1394          */
takeIndexWrapped()1395         boolean takeIndexWrapped() {
1396             // assert lock.isHeldByCurrentThread();
1397             if (isDetached())
1398                 return true;
1399             if (itrs.cycles - prevCycles > 1) {
1400                 // All the elements that existed at the time of the last
1401                 // operation are gone, so abandon further iteration.
1402                 shutdown();
1403                 return true;
1404             }
1405             return false;
1406         }
1407 
1408 //         /** Uncomment for debugging. */
1409 //         public String toString() {
1410 //             return ("cursor=" + cursor + " " +
1411 //                     "nextIndex=" + nextIndex + " " +
1412 //                     "lastRet=" + lastRet + " " +
1413 //                     "nextItem=" + nextItem + " " +
1414 //                     "lastItem=" + lastItem + " " +
1415 //                     "prevCycles=" + prevCycles + " " +
1416 //                     "prevTakeIndex=" + prevTakeIndex + " " +
1417 //                     "size()=" + size() + " " +
1418 //                     "remainingCapacity()=" + remainingCapacity());
1419 //         }
1420     }
1421 
1422     /**
1423      * Returns a {@link Spliterator} over the elements in this queue.
1424      *
1425      * <p>The returned spliterator is
1426      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1427      *
1428      * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
1429      * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
1430      *
1431      * @implNote
1432      * The {@code Spliterator} implements {@code trySplit} to permit limited
1433      * parallelism.
1434      *
1435      * @return a {@code Spliterator} over the elements in this queue
1436      * @since 1.8
1437      */
spliterator()1438     public Spliterator<E> spliterator() {
1439         return Spliterators.spliterator
1440             (this, (Spliterator.ORDERED |
1441                     Spliterator.NONNULL |
1442                     Spliterator.CONCURRENT));
1443     }
1444 
1445     /**
1446      * @throws NullPointerException {@inheritDoc}
1447      */
forEach(Consumer<? super E> action)1448     public void forEach(Consumer<? super E> action) {
1449         Objects.requireNonNull(action);
1450         final ReentrantLock lock = this.lock;
1451         lock.lock();
1452         try {
1453             if (count > 0) {
1454                 final Object[] items = this.items;
1455                 for (int i = takeIndex, end = putIndex,
1456                          to = (i < end) ? end : items.length;
1457                      ; i = 0, to = end) {
1458                     for (; i < to; i++)
1459                         action.accept(itemAt(items, i));
1460                     if (to == end) break;
1461                 }
1462             }
1463         } finally {
1464             lock.unlock();
1465         }
1466     }
1467 
1468     /**
1469      * @throws NullPointerException {@inheritDoc}
1470      */
removeIf(Predicate<? super E> filter)1471     public boolean removeIf(Predicate<? super E> filter) {
1472         Objects.requireNonNull(filter);
1473         return bulkRemove(filter);
1474     }
1475 
1476     /**
1477      * @throws NullPointerException {@inheritDoc}
1478      */
removeAll(Collection<?> c)1479     public boolean removeAll(Collection<?> c) {
1480         Objects.requireNonNull(c);
1481         return bulkRemove(e -> c.contains(e));
1482     }
1483 
1484     /**
1485      * @throws NullPointerException {@inheritDoc}
1486      */
retainAll(Collection<?> c)1487     public boolean retainAll(Collection<?> c) {
1488         Objects.requireNonNull(c);
1489         return bulkRemove(e -> !c.contains(e));
1490     }
1491 
1492     /** Implementation of bulk remove methods. */
bulkRemove(Predicate<? super E> filter)1493     private boolean bulkRemove(Predicate<? super E> filter) {
1494         final ReentrantLock lock = this.lock;
1495         lock.lock();
1496         try {
1497             if (itrs == null) { // check for active iterators
1498                 if (count > 0) {
1499                     final Object[] items = this.items;
1500                     // Optimize for initial run of survivors
1501                     for (int i = takeIndex, end = putIndex,
1502                              to = (i < end) ? end : items.length;
1503                          ; i = 0, to = end) {
1504                         for (; i < to; i++)
1505                             if (filter.test(itemAt(items, i)))
1506                                 return bulkRemoveModified(filter, i);
1507                         if (to == end) break;
1508                     }
1509                 }
1510                 return false;
1511             }
1512         } finally {
1513             lock.unlock();
1514         }
1515         // Active iterators are too hairy!
1516         // Punting (for now) to the slow n^2 algorithm ...
1517         return super.removeIf(filter);
1518     }
1519 
1520     // A tiny bit set implementation
1521 
nBits(int n)1522     private static long[] nBits(int n) {
1523         return new long[((n - 1) >> 6) + 1];
1524     }
setBit(long[] bits, int i)1525     private static void setBit(long[] bits, int i) {
1526         bits[i >> 6] |= 1L << i;
1527     }
isClear(long[] bits, int i)1528     private static boolean isClear(long[] bits, int i) {
1529         return (bits[i >> 6] & (1L << i)) == 0;
1530     }
1531 
1532     /**
1533      * Returns circular distance from i to j, disambiguating i == j to
1534      * items.length; never returns 0.
1535      */
distanceNonEmpty(int i, int j)1536     private int distanceNonEmpty(int i, int j) {
1537         if ((j -= i) <= 0) j += items.length;
1538         return j;
1539     }
1540 
1541     /**
1542      * Helper for bulkRemove, in case of at least one deletion.
1543      * Tolerate predicates that reentrantly access the collection for
1544      * read (but not write), so traverse once to find elements to
1545      * delete, a second pass to physically expunge.
1546      *
1547      * @param beg valid index of first element to be deleted
1548      */
bulkRemoveModified( Predicate<? super E> filter, final int beg)1549     private boolean bulkRemoveModified(
1550         Predicate<? super E> filter, final int beg) {
1551         final Object[] es = items;
1552         final int capacity = items.length;
1553         final int end = putIndex;
1554         final long[] deathRow = nBits(distanceNonEmpty(beg, putIndex));
1555         deathRow[0] = 1L;   // set bit 0
1556         for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg;
1557              ; i = 0, to = end, k -= capacity) {
1558             for (; i < to; i++)
1559                 if (filter.test(itemAt(es, i)))
1560                     setBit(deathRow, i - k);
1561             if (to == end) break;
1562         }
1563         // a two-finger traversal, with hare i reading, tortoise w writing
1564         int w = beg;
1565         for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg;
1566              ; w = 0) { // w rejoins i on second leg
1567             // In this loop, i and w are on the same leg, with i > w
1568             for (; i < to; i++)
1569                 if (isClear(deathRow, i - k))
1570                     es[w++] = es[i];
1571             if (to == end) break;
1572             // In this loop, w is on the first leg, i on the second
1573             for (i = 0, to = end, k -= capacity; i < to && w < capacity; i++)
1574                 if (isClear(deathRow, i - k))
1575                     es[w++] = es[i];
1576             if (i >= to) {
1577                 if (w == capacity) w = 0; // "corner" case
1578                 break;
1579             }
1580         }
1581         count -= distanceNonEmpty(w, end);
1582         circularClear(es, putIndex = w, end);
1583         return true;
1584     }
1585 
1586     /** debugging */
checkInvariants()1587     void checkInvariants() {
1588         // meta-assertions
1589         // assert lock.isHeldByCurrentThread();
1590         if (!invariantsSatisfied()) {
1591             String detail = String.format(
1592                 "takeIndex=%d putIndex=%d count=%d capacity=%d items=%s",
1593                 takeIndex, putIndex, count, items.length,
1594                 Arrays.toString(items));
1595             System.err.println(detail);
1596             throw new AssertionError(detail);
1597         }
1598     }
1599 
invariantsSatisfied()1600     private boolean invariantsSatisfied() {
1601         // Unlike ArrayDeque, we have a count field but no spare slot.
1602         // We prefer ArrayDeque's strategy (and the names of its fields!),
1603         // but our field layout is baked into the serial form, and so is
1604         // too annoying to change.
1605         //
1606         // putIndex == takeIndex must be disambiguated by checking count.
1607         int capacity = items.length;
1608         return capacity > 0
1609             && items.getClass() == Object[].class
1610             && (takeIndex | putIndex | count) >= 0
1611             && takeIndex <  capacity
1612             && putIndex  <  capacity
1613             && count     <= capacity
1614             && (putIndex - takeIndex - count) % capacity == 0
1615             && (count == 0 || items[takeIndex] != null)
1616             && (count == capacity || items[putIndex] == null)
1617             && (count == 0 || items[dec(putIndex, capacity)] != null);
1618     }
1619 
1620     /**
1621      * Reconstitutes this queue from a stream (that is, deserializes it).
1622      *
1623      * @param s the stream
1624      * @throws ClassNotFoundException if the class of a serialized object
1625      *         could not be found
1626      * @throws java.io.InvalidObjectException if invariants are violated
1627      * @throws java.io.IOException if an I/O error occurs
1628      */
readObject(java.io.ObjectInputStream s)1629     private void readObject(java.io.ObjectInputStream s)
1630         throws java.io.IOException, ClassNotFoundException {
1631 
1632         // Read in items array and various fields
1633         s.defaultReadObject();
1634 
1635         if (!invariantsSatisfied())
1636             throw new java.io.InvalidObjectException("invariants violated");
1637     }
1638 }
1639