1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import java.lang.ref.WeakReference;
39 import java.util.AbstractQueue;
40 import java.util.Arrays;
41 import java.util.Collection;
42 import java.util.Iterator;
43 import java.util.NoSuchElementException;
44 import java.util.Objects;
45 import java.util.Spliterator;
46 import java.util.Spliterators;
47 import java.util.concurrent.locks.Condition;
48 import java.util.concurrent.locks.ReentrantLock;
49 
50 // BEGIN android-note
51 // removed link to collections framework docs
52 // END android-note
53 
54 /**
55  * A bounded {@linkplain BlockingQueue blocking queue} backed by an
56  * array.  This queue orders elements FIFO (first-in-first-out).  The
57  * <em>head</em> of the queue is that element that has been on the
58  * queue the longest time.  The <em>tail</em> of the queue is that
59  * element that has been on the queue the shortest time. New elements
60  * are inserted at the tail of the queue, and the queue retrieval
61  * operations obtain elements at the head of the queue.
62  *
63  * <p>This is a classic &quot;bounded buffer&quot;, in which a
64  * fixed-sized array holds elements inserted by producers and
65  * extracted by consumers.  Once created, the capacity cannot be
66  * changed.  Attempts to {@code put} an element into a full queue
67  * will result in the operation blocking; attempts to {@code take} an
68  * element from an empty queue will similarly block.
69  *
70  * <p>This class supports an optional fairness policy for ordering
71  * waiting producer and consumer threads.  By default, this ordering
72  * is not guaranteed. However, a queue constructed with fairness set
73  * to {@code true} grants threads access in FIFO order. Fairness
74  * generally decreases throughput but reduces variability and avoids
75  * starvation.
76  *
77  * <p>This class and its iterator implement all of the
78  * <em>optional</em> methods of the {@link Collection} and {@link
79  * Iterator} interfaces.
80  *
81  * @since 1.5
82  * @author Doug Lea
83  * @param <E> the type of elements held in this queue
84  */
85 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
86         implements BlockingQueue<E>, java.io.Serializable {
87 
88     /**
89      * Serialization ID. This class relies on default serialization
90      * even for the items array, which is default-serialized, even if
91      * it is empty. Otherwise it could not be declared final, which is
92      * necessary here.
93      */
94     private static final long serialVersionUID = -817911632652898426L;
95 
96     /** The queued items */
97     final Object[] items;
98 
99     /** items index for next take, poll, peek or remove */
100     int takeIndex;
101 
102     /** items index for next put, offer, or add */
103     int putIndex;
104 
105     /** Number of elements in the queue */
106     int count;
107 
108     /*
109      * Concurrency control uses the classic two-condition algorithm
110      * found in any textbook.
111      */
112 
113     /** Main lock guarding all access */
114     final ReentrantLock lock;
115 
116     /** Condition for waiting takes */
117     private final Condition notEmpty;
118 
119     /** Condition for waiting puts */
120     private final Condition notFull;
121 
122     /**
123      * Shared state for currently active iterators, or null if there
124      * are known not to be any.  Allows queue operations to update
125      * iterator state.
126      */
127     transient Itrs itrs;
128 
129     // Internal helper methods
130 
131     /**
132      * Circularly decrements array index i.
133      */
dec(int i)134     final int dec(int i) {
135         return ((i == 0) ? items.length : i) - 1;
136     }
137 
138     /**
139      * Returns item at index i.
140      */
141     @SuppressWarnings("unchecked")
itemAt(int i)142     final E itemAt(int i) {
143         return (E) items[i];
144     }
145 
146     /**
147      * Inserts element at current put position, advances, and signals.
148      * Call only when holding lock.
149      */
enqueue(E x)150     private void enqueue(E x) {
151         // assert lock.getHoldCount() == 1;
152         // assert items[putIndex] == null;
153         final Object[] items = this.items;
154         items[putIndex] = x;
155         if (++putIndex == items.length) putIndex = 0;
156         count++;
157         notEmpty.signal();
158     }
159 
160     /**
161      * Extracts element at current take position, advances, and signals.
162      * Call only when holding lock.
163      */
dequeue()164     private E dequeue() {
165         // assert lock.getHoldCount() == 1;
166         // assert items[takeIndex] != null;
167         final Object[] items = this.items;
168         @SuppressWarnings("unchecked")
169         E x = (E) items[takeIndex];
170         items[takeIndex] = null;
171         if (++takeIndex == items.length) takeIndex = 0;
172         count--;
173         if (itrs != null)
174             itrs.elementDequeued();
175         notFull.signal();
176         return x;
177     }
178 
179     /**
180      * Deletes item at array index removeIndex.
181      * Utility for remove(Object) and iterator.remove.
182      * Call only when holding lock.
183      */
removeAt(final int removeIndex)184     void removeAt(final int removeIndex) {
185         // assert lock.getHoldCount() == 1;
186         // assert items[removeIndex] != null;
187         // assert removeIndex >= 0 && removeIndex < items.length;
188         final Object[] items = this.items;
189         if (removeIndex == takeIndex) {
190             // removing front item; just advance
191             items[takeIndex] = null;
192             if (++takeIndex == items.length) takeIndex = 0;
193             count--;
194             if (itrs != null)
195                 itrs.elementDequeued();
196         } else {
197             // an "interior" remove
198 
199             // slide over all others up through putIndex.
200             for (int i = removeIndex, putIndex = this.putIndex;;) {
201                 int pred = i;
202                 if (++i == items.length) i = 0;
203                 if (i == putIndex) {
204                     items[pred] = null;
205                     this.putIndex = pred;
206                     break;
207                 }
208                 items[pred] = items[i];
209             }
210             count--;
211             if (itrs != null)
212                 itrs.removedAt(removeIndex);
213         }
214         notFull.signal();
215     }
216 
217     /**
218      * Creates an {@code ArrayBlockingQueue} with the given (fixed)
219      * capacity and default access policy.
220      *
221      * @param capacity the capacity of this queue
222      * @throws IllegalArgumentException if {@code capacity < 1}
223      */
ArrayBlockingQueue(int capacity)224     public ArrayBlockingQueue(int capacity) {
225         this(capacity, false);
226     }
227 
228     /**
229      * Creates an {@code ArrayBlockingQueue} with the given (fixed)
230      * capacity and the specified access policy.
231      *
232      * @param capacity the capacity of this queue
233      * @param fair if {@code true} then queue accesses for threads blocked
234      *        on insertion or removal, are processed in FIFO order;
235      *        if {@code false} the access order is unspecified.
236      * @throws IllegalArgumentException if {@code capacity < 1}
237      */
ArrayBlockingQueue(int capacity, boolean fair)238     public ArrayBlockingQueue(int capacity, boolean fair) {
239         if (capacity <= 0)
240             throw new IllegalArgumentException();
241         this.items = new Object[capacity];
242         lock = new ReentrantLock(fair);
243         notEmpty = lock.newCondition();
244         notFull =  lock.newCondition();
245     }
246 
247     /**
248      * Creates an {@code ArrayBlockingQueue} with the given (fixed)
249      * capacity, the specified access policy and initially containing the
250      * elements of the given collection,
251      * added in traversal order of the collection's iterator.
252      *
253      * @param capacity the capacity of this queue
254      * @param fair if {@code true} then queue accesses for threads blocked
255      *        on insertion or removal, are processed in FIFO order;
256      *        if {@code false} the access order is unspecified.
257      * @param c the collection of elements to initially contain
258      * @throws IllegalArgumentException if {@code capacity} is less than
259      *         {@code c.size()}, or less than 1.
260      * @throws NullPointerException if the specified collection or any
261      *         of its elements are null
262      */
ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)263     public ArrayBlockingQueue(int capacity, boolean fair,
264                               Collection<? extends E> c) {
265         this(capacity, fair);
266 
267         final ReentrantLock lock = this.lock;
268         lock.lock(); // Lock only for visibility, not mutual exclusion
269         try {
270             int i = 0;
271             try {
272                 for (E e : c)
273                     items[i++] = Objects.requireNonNull(e);
274             } catch (ArrayIndexOutOfBoundsException ex) {
275                 throw new IllegalArgumentException();
276             }
277             count = i;
278             putIndex = (i == capacity) ? 0 : i;
279         } finally {
280             lock.unlock();
281         }
282     }
283 
284     /**
285      * Inserts the specified element at the tail of this queue if it is
286      * possible to do so immediately without exceeding the queue's capacity,
287      * returning {@code true} upon success and throwing an
288      * {@code IllegalStateException} if this queue is full.
289      *
290      * @param e the element to add
291      * @return {@code true} (as specified by {@link Collection#add})
292      * @throws IllegalStateException if this queue is full
293      * @throws NullPointerException if the specified element is null
294      */
add(E e)295     public boolean add(E e) {
296         return super.add(e);
297     }
298 
299     /**
300      * Inserts the specified element at the tail of this queue if it is
301      * possible to do so immediately without exceeding the queue's capacity,
302      * returning {@code true} upon success and {@code false} if this queue
303      * is full.  This method is generally preferable to method {@link #add},
304      * which can fail to insert an element only by throwing an exception.
305      *
306      * @throws NullPointerException if the specified element is null
307      */
offer(E e)308     public boolean offer(E e) {
309         Objects.requireNonNull(e);
310         final ReentrantLock lock = this.lock;
311         lock.lock();
312         try {
313             if (count == items.length)
314                 return false;
315             else {
316                 enqueue(e);
317                 return true;
318             }
319         } finally {
320             lock.unlock();
321         }
322     }
323 
324     /**
325      * Inserts the specified element at the tail of this queue, waiting
326      * for space to become available if the queue is full.
327      *
328      * @throws InterruptedException {@inheritDoc}
329      * @throws NullPointerException {@inheritDoc}
330      */
put(E e)331     public void put(E e) throws InterruptedException {
332         Objects.requireNonNull(e);
333         final ReentrantLock lock = this.lock;
334         lock.lockInterruptibly();
335         try {
336             while (count == items.length)
337                 notFull.await();
338             enqueue(e);
339         } finally {
340             lock.unlock();
341         }
342     }
343 
344     /**
345      * Inserts the specified element at the tail of this queue, waiting
346      * up to the specified wait time for space to become available if
347      * the queue is full.
348      *
349      * @throws InterruptedException {@inheritDoc}
350      * @throws NullPointerException {@inheritDoc}
351      */
offer(E e, long timeout, TimeUnit unit)352     public boolean offer(E e, long timeout, TimeUnit unit)
353         throws InterruptedException {
354 
355         Objects.requireNonNull(e);
356         long nanos = unit.toNanos(timeout);
357         final ReentrantLock lock = this.lock;
358         lock.lockInterruptibly();
359         try {
360             while (count == items.length) {
361                 if (nanos <= 0L)
362                     return false;
363                 nanos = notFull.awaitNanos(nanos);
364             }
365             enqueue(e);
366             return true;
367         } finally {
368             lock.unlock();
369         }
370     }
371 
poll()372     public E poll() {
373         final ReentrantLock lock = this.lock;
374         lock.lock();
375         try {
376             return (count == 0) ? null : dequeue();
377         } finally {
378             lock.unlock();
379         }
380     }
381 
take()382     public E take() throws InterruptedException {
383         final ReentrantLock lock = this.lock;
384         lock.lockInterruptibly();
385         try {
386             while (count == 0)
387                 notEmpty.await();
388             return dequeue();
389         } finally {
390             lock.unlock();
391         }
392     }
393 
poll(long timeout, TimeUnit unit)394     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
395         long nanos = unit.toNanos(timeout);
396         final ReentrantLock lock = this.lock;
397         lock.lockInterruptibly();
398         try {
399             while (count == 0) {
400                 if (nanos <= 0L)
401                     return null;
402                 nanos = notEmpty.awaitNanos(nanos);
403             }
404             return dequeue();
405         } finally {
406             lock.unlock();
407         }
408     }
409 
peek()410     public E peek() {
411         final ReentrantLock lock = this.lock;
412         lock.lock();
413         try {
414             return itemAt(takeIndex); // null when queue is empty
415         } finally {
416             lock.unlock();
417         }
418     }
419 
420     // this doc comment is overridden to remove the reference to collections
421     // greater in size than Integer.MAX_VALUE
422     /**
423      * Returns the number of elements in this queue.
424      *
425      * @return the number of elements in this queue
426      */
size()427     public int size() {
428         final ReentrantLock lock = this.lock;
429         lock.lock();
430         try {
431             return count;
432         } finally {
433             lock.unlock();
434         }
435     }
436 
437     // this doc comment is a modified copy of the inherited doc comment,
438     // without the reference to unlimited queues.
439     /**
440      * Returns the number of additional elements that this queue can ideally
441      * (in the absence of memory or resource constraints) accept without
442      * blocking. This is always equal to the initial capacity of this queue
443      * less the current {@code size} of this queue.
444      *
445      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
446      * an element will succeed by inspecting {@code remainingCapacity}
447      * because it may be the case that another thread is about to
448      * insert or remove an element.
449      */
remainingCapacity()450     public int remainingCapacity() {
451         final ReentrantLock lock = this.lock;
452         lock.lock();
453         try {
454             return items.length - count;
455         } finally {
456             lock.unlock();
457         }
458     }
459 
460     /**
461      * Removes a single instance of the specified element from this queue,
462      * if it is present.  More formally, removes an element {@code e} such
463      * that {@code o.equals(e)}, if this queue contains one or more such
464      * elements.
465      * Returns {@code true} if this queue contained the specified element
466      * (or equivalently, if this queue changed as a result of the call).
467      *
468      * <p>Removal of interior elements in circular array based queues
469      * is an intrinsically slow and disruptive operation, so should
470      * be undertaken only in exceptional circumstances, ideally
471      * only when the queue is known not to be accessible by other
472      * threads.
473      *
474      * @param o element to be removed from this queue, if present
475      * @return {@code true} if this queue changed as a result of the call
476      */
remove(Object o)477     public boolean remove(Object o) {
478         if (o == null) return false;
479         final ReentrantLock lock = this.lock;
480         lock.lock();
481         try {
482             if (count > 0) {
483                 final Object[] items = this.items;
484                 final int putIndex = this.putIndex;
485                 int i = takeIndex;
486                 do {
487                     if (o.equals(items[i])) {
488                         removeAt(i);
489                         return true;
490                     }
491                     if (++i == items.length) i = 0;
492                 } while (i != putIndex);
493             }
494             return false;
495         } finally {
496             lock.unlock();
497         }
498     }
499 
500     /**
501      * Returns {@code true} if this queue contains the specified element.
502      * More formally, returns {@code true} if and only if this queue contains
503      * at least one element {@code e} such that {@code o.equals(e)}.
504      *
505      * @param o object to be checked for containment in this queue
506      * @return {@code true} if this queue contains the specified element
507      */
contains(Object o)508     public boolean contains(Object o) {
509         if (o == null) return false;
510         final ReentrantLock lock = this.lock;
511         lock.lock();
512         try {
513             if (count > 0) {
514                 final Object[] items = this.items;
515                 final int putIndex = this.putIndex;
516                 int i = takeIndex;
517                 do {
518                     if (o.equals(items[i]))
519                         return true;
520                     if (++i == items.length) i = 0;
521                 } while (i != putIndex);
522             }
523             return false;
524         } finally {
525             lock.unlock();
526         }
527     }
528 
529     /**
530      * Returns an array containing all of the elements in this queue, in
531      * proper sequence.
532      *
533      * <p>The returned array will be "safe" in that no references to it are
534      * maintained by this queue.  (In other words, this method must allocate
535      * a new array).  The caller is thus free to modify the returned array.
536      *
537      * <p>This method acts as bridge between array-based and collection-based
538      * APIs.
539      *
540      * @return an array containing all of the elements in this queue
541      */
toArray()542     public Object[] toArray() {
543         final ReentrantLock lock = this.lock;
544         lock.lock();
545         try {
546             final Object[] items = this.items;
547             final int end = takeIndex + count;
548             final Object[] a = Arrays.copyOfRange(items, takeIndex, end);
549             if (end != putIndex)
550                 System.arraycopy(items, 0, a, items.length - takeIndex, putIndex);
551             return a;
552         } finally {
553             lock.unlock();
554         }
555     }
556 
557     /**
558      * Returns an array containing all of the elements in this queue, in
559      * proper sequence; the runtime type of the returned array is that of
560      * the specified array.  If the queue fits in the specified array, it
561      * is returned therein.  Otherwise, a new array is allocated with the
562      * runtime type of the specified array and the size of this queue.
563      *
564      * <p>If this queue fits in the specified array with room to spare
565      * (i.e., the array has more elements than this queue), the element in
566      * the array immediately following the end of the queue is set to
567      * {@code null}.
568      *
569      * <p>Like the {@link #toArray()} method, this method acts as bridge between
570      * array-based and collection-based APIs.  Further, this method allows
571      * precise control over the runtime type of the output array, and may,
572      * under certain circumstances, be used to save allocation costs.
573      *
574      * <p>Suppose {@code x} is a queue known to contain only strings.
575      * The following code can be used to dump the queue into a newly
576      * allocated array of {@code String}:
577      *
578      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
579      *
580      * Note that {@code toArray(new Object[0])} is identical in function to
581      * {@code toArray()}.
582      *
583      * @param a the array into which the elements of the queue are to
584      *          be stored, if it is big enough; otherwise, a new array of the
585      *          same runtime type is allocated for this purpose
586      * @return an array containing all of the elements in this queue
587      * @throws ArrayStoreException if the runtime type of the specified array
588      *         is not a supertype of the runtime type of every element in
589      *         this queue
590      * @throws NullPointerException if the specified array is null
591      */
592     @SuppressWarnings("unchecked")
toArray(T[] a)593     public <T> T[] toArray(T[] a) {
594         final ReentrantLock lock = this.lock;
595         lock.lock();
596         try {
597             final Object[] items = this.items;
598             final int count = this.count;
599             final int firstLeg = Math.min(items.length - takeIndex, count);
600             if (a.length < count) {
601                 a = (T[]) Arrays.copyOfRange(items, takeIndex, takeIndex + count,
602                                              a.getClass());
603             } else {
604                 System.arraycopy(items, takeIndex, a, 0, firstLeg);
605                 if (a.length > count)
606                     a[count] = null;
607             }
608             if (firstLeg < count)
609                 System.arraycopy(items, 0, a, firstLeg, putIndex);
610             return a;
611         } finally {
612             lock.unlock();
613         }
614     }
615 
toString()616     public String toString() {
617         return Helpers.collectionToString(this);
618     }
619 
620     /**
621      * Atomically removes all of the elements from this queue.
622      * The queue will be empty after this call returns.
623      */
clear()624     public void clear() {
625         final ReentrantLock lock = this.lock;
626         lock.lock();
627         try {
628             int k = count;
629             if (k > 0) {
630                 final Object[] items = this.items;
631                 final int putIndex = this.putIndex;
632                 int i = takeIndex;
633                 do {
634                     items[i] = null;
635                     if (++i == items.length) i = 0;
636                 } while (i != putIndex);
637                 takeIndex = putIndex;
638                 count = 0;
639                 if (itrs != null)
640                     itrs.queueIsEmpty();
641                 for (; k > 0 && lock.hasWaiters(notFull); k--)
642                     notFull.signal();
643             }
644         } finally {
645             lock.unlock();
646         }
647     }
648 
649     /**
650      * @throws UnsupportedOperationException {@inheritDoc}
651      * @throws ClassCastException            {@inheritDoc}
652      * @throws NullPointerException          {@inheritDoc}
653      * @throws IllegalArgumentException      {@inheritDoc}
654      */
drainTo(Collection<? super E> c)655     public int drainTo(Collection<? super E> c) {
656         return drainTo(c, Integer.MAX_VALUE);
657     }
658 
659     /**
660      * @throws UnsupportedOperationException {@inheritDoc}
661      * @throws ClassCastException            {@inheritDoc}
662      * @throws NullPointerException          {@inheritDoc}
663      * @throws IllegalArgumentException      {@inheritDoc}
664      */
drainTo(Collection<? super E> c, int maxElements)665     public int drainTo(Collection<? super E> c, int maxElements) {
666         Objects.requireNonNull(c);
667         if (c == this)
668             throw new IllegalArgumentException();
669         if (maxElements <= 0)
670             return 0;
671         final Object[] items = this.items;
672         final ReentrantLock lock = this.lock;
673         lock.lock();
674         try {
675             int n = Math.min(maxElements, count);
676             int take = takeIndex;
677             int i = 0;
678             try {
679                 while (i < n) {
680                     @SuppressWarnings("unchecked")
681                     E x = (E) items[take];
682                     c.add(x);
683                     items[take] = null;
684                     if (++take == items.length) take = 0;
685                     i++;
686                 }
687                 return n;
688             } finally {
689                 // Restore invariants even if c.add() threw
690                 if (i > 0) {
691                     count -= i;
692                     takeIndex = take;
693                     if (itrs != null) {
694                         if (count == 0)
695                             itrs.queueIsEmpty();
696                         else if (i > take)
697                             itrs.takeIndexWrapped();
698                     }
699                     for (; i > 0 && lock.hasWaiters(notFull); i--)
700                         notFull.signal();
701                 }
702             }
703         } finally {
704             lock.unlock();
705         }
706     }
707 
708     /**
709      * Returns an iterator over the elements in this queue in proper sequence.
710      * The elements will be returned in order from first (head) to last (tail).
711      *
712      * <p>The returned iterator is
713      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
714      *
715      * @return an iterator over the elements in this queue in proper sequence
716      */
iterator()717     public Iterator<E> iterator() {
718         return new Itr();
719     }
720 
721     /**
722      * Shared data between iterators and their queue, allowing queue
723      * modifications to update iterators when elements are removed.
724      *
725      * This adds a lot of complexity for the sake of correctly
726      * handling some uncommon operations, but the combination of
727      * circular-arrays and supporting interior removes (i.e., those
728      * not at head) would cause iterators to sometimes lose their
729      * places and/or (re)report elements they shouldn't.  To avoid
730      * this, when a queue has one or more iterators, it keeps iterator
731      * state consistent by:
732      *
733      * (1) keeping track of the number of "cycles", that is, the
734      *     number of times takeIndex has wrapped around to 0.
735      * (2) notifying all iterators via the callback removedAt whenever
736      *     an interior element is removed (and thus other elements may
737      *     be shifted).
738      *
739      * These suffice to eliminate iterator inconsistencies, but
740      * unfortunately add the secondary responsibility of maintaining
741      * the list of iterators.  We track all active iterators in a
742      * simple linked list (accessed only when the queue's lock is
743      * held) of weak references to Itr.  The list is cleaned up using
744      * 3 different mechanisms:
745      *
746      * (1) Whenever a new iterator is created, do some O(1) checking for
747      *     stale list elements.
748      *
749      * (2) Whenever takeIndex wraps around to 0, check for iterators
750      *     that have been unused for more than one wrap-around cycle.
751      *
752      * (3) Whenever the queue becomes empty, all iterators are notified
753      *     and this entire data structure is discarded.
754      *
755      * So in addition to the removedAt callback that is necessary for
756      * correctness, iterators have the shutdown and takeIndexWrapped
757      * callbacks that help remove stale iterators from the list.
758      *
759      * Whenever a list element is examined, it is expunged if either
760      * the GC has determined that the iterator is discarded, or if the
761      * iterator reports that it is "detached" (does not need any
762      * further state updates).  Overhead is maximal when takeIndex
763      * never advances, iterators are discarded before they are
764      * exhausted, and all removals are interior removes, in which case
765      * all stale iterators are discovered by the GC.  But even in this
766      * case we don't increase the amortized complexity.
767      *
768      * Care must be taken to keep list sweeping methods from
769      * reentrantly invoking another such method, causing subtle
770      * corruption bugs.
771      */
772     class Itrs {
773 
774         /**
775          * Node in a linked list of weak iterator references.
776          */
777         private class Node extends WeakReference<Itr> {
778             Node next;
779 
Node(Itr iterator, Node next)780             Node(Itr iterator, Node next) {
781                 super(iterator);
782                 this.next = next;
783             }
784         }
785 
786         /** Incremented whenever takeIndex wraps around to 0 */
787         int cycles;
788 
789         /** Linked list of weak iterator references */
790         private Node head;
791 
792         /** Used to expunge stale iterators */
793         private Node sweeper;
794 
795         private static final int SHORT_SWEEP_PROBES = 4;
796         private static final int LONG_SWEEP_PROBES = 16;
797 
Itrs(Itr initial)798         Itrs(Itr initial) {
799             register(initial);
800         }
801 
802         /**
803          * Sweeps itrs, looking for and expunging stale iterators.
804          * If at least one was found, tries harder to find more.
805          * Called only from iterating thread.
806          *
807          * @param tryHarder whether to start in try-harder mode, because
808          * there is known to be at least one iterator to collect
809          */
doSomeSweeping(boolean tryHarder)810         void doSomeSweeping(boolean tryHarder) {
811             // assert lock.getHoldCount() == 1;
812             // assert head != null;
813             int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
814             Node o, p;
815             final Node sweeper = this.sweeper;
816             boolean passedGo;   // to limit search to one full sweep
817 
818             if (sweeper == null) {
819                 o = null;
820                 p = head;
821                 passedGo = true;
822             } else {
823                 o = sweeper;
824                 p = o.next;
825                 passedGo = false;
826             }
827 
828             for (; probes > 0; probes--) {
829                 if (p == null) {
830                     if (passedGo)
831                         break;
832                     o = null;
833                     p = head;
834                     passedGo = true;
835                 }
836                 final Itr it = p.get();
837                 final Node next = p.next;
838                 if (it == null || it.isDetached()) {
839                     // found a discarded/exhausted iterator
840                     probes = LONG_SWEEP_PROBES; // "try harder"
841                     // unlink p
842                     p.clear();
843                     p.next = null;
844                     if (o == null) {
845                         head = next;
846                         if (next == null) {
847                             // We've run out of iterators to track; retire
848                             itrs = null;
849                             return;
850                         }
851                     }
852                     else
853                         o.next = next;
854                 } else {
855                     o = p;
856                 }
857                 p = next;
858             }
859 
860             this.sweeper = (p == null) ? null : o;
861         }
862 
863         /**
864          * Adds a new iterator to the linked list of tracked iterators.
865          */
register(Itr itr)866         void register(Itr itr) {
867             // assert lock.getHoldCount() == 1;
868             head = new Node(itr, head);
869         }
870 
871         /**
872          * Called whenever takeIndex wraps around to 0.
873          *
874          * Notifies all iterators, and expunges any that are now stale.
875          */
takeIndexWrapped()876         void takeIndexWrapped() {
877             // assert lock.getHoldCount() == 1;
878             cycles++;
879             for (Node o = null, p = head; p != null;) {
880                 final Itr it = p.get();
881                 final Node next = p.next;
882                 if (it == null || it.takeIndexWrapped()) {
883                     // unlink p
884                     // assert it == null || it.isDetached();
885                     p.clear();
886                     p.next = null;
887                     if (o == null)
888                         head = next;
889                     else
890                         o.next = next;
891                 } else {
892                     o = p;
893                 }
894                 p = next;
895             }
896             if (head == null)   // no more iterators to track
897                 itrs = null;
898         }
899 
900         /**
901          * Called whenever an interior remove (not at takeIndex) occurred.
902          *
903          * Notifies all iterators, and expunges any that are now stale.
904          */
removedAt(int removedIndex)905         void removedAt(int removedIndex) {
906             for (Node o = null, p = head; p != null;) {
907                 final Itr it = p.get();
908                 final Node next = p.next;
909                 if (it == null || it.removedAt(removedIndex)) {
910                     // unlink p
911                     // assert it == null || it.isDetached();
912                     p.clear();
913                     p.next = null;
914                     if (o == null)
915                         head = next;
916                     else
917                         o.next = next;
918                 } else {
919                     o = p;
920                 }
921                 p = next;
922             }
923             if (head == null)   // no more iterators to track
924                 itrs = null;
925         }
926 
927         /**
928          * Called whenever the queue becomes empty.
929          *
930          * Notifies all active iterators that the queue is empty,
931          * clears all weak refs, and unlinks the itrs datastructure.
932          */
queueIsEmpty()933         void queueIsEmpty() {
934             // assert lock.getHoldCount() == 1;
935             for (Node p = head; p != null; p = p.next) {
936                 Itr it = p.get();
937                 if (it != null) {
938                     p.clear();
939                     it.shutdown();
940                 }
941             }
942             head = null;
943             itrs = null;
944         }
945 
946         /**
947          * Called whenever an element has been dequeued (at takeIndex).
948          */
elementDequeued()949         void elementDequeued() {
950             // assert lock.getHoldCount() == 1;
951             if (count == 0)
952                 queueIsEmpty();
953             else if (takeIndex == 0)
954                 takeIndexWrapped();
955         }
956     }
957 
958     /**
959      * Iterator for ArrayBlockingQueue.
960      *
961      * To maintain weak consistency with respect to puts and takes, we
962      * read ahead one slot, so as to not report hasNext true but then
963      * not have an element to return.
964      *
965      * We switch into "detached" mode (allowing prompt unlinking from
966      * itrs without help from the GC) when all indices are negative, or
967      * when hasNext returns false for the first time.  This allows the
968      * iterator to track concurrent updates completely accurately,
969      * except for the corner case of the user calling Iterator.remove()
970      * after hasNext() returned false.  Even in this case, we ensure
971      * that we don't remove the wrong element by keeping track of the
972      * expected element to remove, in lastItem.  Yes, we may fail to
973      * remove lastItem from the queue if it moved due to an interleaved
974      * interior remove while in detached mode.
975      */
976     private class Itr implements Iterator<E> {
977         /** Index to look for new nextItem; NONE at end */
978         private int cursor;
979 
980         /** Element to be returned by next call to next(); null if none */
981         private E nextItem;
982 
983         /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
984         private int nextIndex;
985 
986         /** Last element returned; null if none or not detached. */
987         private E lastItem;
988 
989         /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
990         private int lastRet;
991 
992         /** Previous value of takeIndex, or DETACHED when detached */
993         private int prevTakeIndex;
994 
995         /** Previous value of iters.cycles */
996         private int prevCycles;
997 
998         /** Special index value indicating "not available" or "undefined" */
999         private static final int NONE = -1;
1000 
1001         /**
1002          * Special index value indicating "removed elsewhere", that is,
1003          * removed by some operation other than a call to this.remove().
1004          */
1005         private static final int REMOVED = -2;
1006 
1007         /** Special value for prevTakeIndex indicating "detached mode" */
1008         private static final int DETACHED = -3;
1009 
Itr()1010         Itr() {
1011             // assert lock.getHoldCount() == 0;
1012             lastRet = NONE;
1013             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1014             lock.lock();
1015             try {
1016                 if (count == 0) {
1017                     // assert itrs == null;
1018                     cursor = NONE;
1019                     nextIndex = NONE;
1020                     prevTakeIndex = DETACHED;
1021                 } else {
1022                     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1023                     prevTakeIndex = takeIndex;
1024                     nextItem = itemAt(nextIndex = takeIndex);
1025                     cursor = incCursor(takeIndex);
1026                     if (itrs == null) {
1027                         itrs = new Itrs(this);
1028                     } else {
1029                         itrs.register(this); // in this order
1030                         itrs.doSomeSweeping(false);
1031                     }
1032                     prevCycles = itrs.cycles;
1033                     // assert takeIndex >= 0;
1034                     // assert prevTakeIndex == takeIndex;
1035                     // assert nextIndex >= 0;
1036                     // assert nextItem != null;
1037                 }
1038             } finally {
1039                 lock.unlock();
1040             }
1041         }
1042 
isDetached()1043         boolean isDetached() {
1044             // assert lock.getHoldCount() == 1;
1045             return prevTakeIndex < 0;
1046         }
1047 
incCursor(int index)1048         private int incCursor(int index) {
1049             // assert lock.getHoldCount() == 1;
1050             if (++index == items.length) index = 0;
1051             if (index == putIndex) index = NONE;
1052             return index;
1053         }
1054 
1055         /**
1056          * Returns true if index is invalidated by the given number of
1057          * dequeues, starting from prevTakeIndex.
1058          */
invalidated(int index, int prevTakeIndex, long dequeues, int length)1059         private boolean invalidated(int index, int prevTakeIndex,
1060                                     long dequeues, int length) {
1061             if (index < 0)
1062                 return false;
1063             int distance = index - prevTakeIndex;
1064             if (distance < 0)
1065                 distance += length;
1066             return dequeues > distance;
1067         }
1068 
1069         /**
1070          * Adjusts indices to incorporate all dequeues since the last
1071          * operation on this iterator.  Call only from iterating thread.
1072          */
incorporateDequeues()1073         private void incorporateDequeues() {
1074             // assert lock.getHoldCount() == 1;
1075             // assert itrs != null;
1076             // assert !isDetached();
1077             // assert count > 0;
1078 
1079             final int cycles = itrs.cycles;
1080             final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1081             final int prevCycles = this.prevCycles;
1082             final int prevTakeIndex = this.prevTakeIndex;
1083 
1084             if (cycles != prevCycles || takeIndex != prevTakeIndex) {
1085                 final int len = items.length;
1086                 // how far takeIndex has advanced since the previous
1087                 // operation of this iterator
1088                 long dequeues = (cycles - prevCycles) * len
1089                     + (takeIndex - prevTakeIndex);
1090 
1091                 // Check indices for invalidation
1092                 if (invalidated(lastRet, prevTakeIndex, dequeues, len))
1093                     lastRet = REMOVED;
1094                 if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
1095                     nextIndex = REMOVED;
1096                 if (invalidated(cursor, prevTakeIndex, dequeues, len))
1097                     cursor = takeIndex;
1098 
1099                 if (cursor < 0 && nextIndex < 0 && lastRet < 0)
1100                     detach();
1101                 else {
1102                     this.prevCycles = cycles;
1103                     this.prevTakeIndex = takeIndex;
1104                 }
1105             }
1106         }
1107 
1108         /**
1109          * Called when itrs should stop tracking this iterator, either
1110          * because there are no more indices to update (cursor < 0 &&
1111          * nextIndex < 0 && lastRet < 0) or as a special exception, when
1112          * lastRet >= 0, because hasNext() is about to return false for the
1113          * first time.  Call only from iterating thread.
1114          */
detach()1115         private void detach() {
1116             // Switch to detached mode
1117             // assert lock.getHoldCount() == 1;
1118             // assert cursor == NONE;
1119             // assert nextIndex < 0;
1120             // assert lastRet < 0 || nextItem == null;
1121             // assert lastRet < 0 ^ lastItem != null;
1122             if (prevTakeIndex >= 0) {
1123                 // assert itrs != null;
1124                 prevTakeIndex = DETACHED;
1125                 // try to unlink from itrs (but not too hard)
1126                 itrs.doSomeSweeping(true);
1127             }
1128         }
1129 
1130         /**
1131          * For performance reasons, we would like not to acquire a lock in
1132          * hasNext in the common case.  To allow for this, we only access
1133          * fields (i.e. nextItem) that are not modified by update operations
1134          * triggered by queue modifications.
1135          */
hasNext()1136         public boolean hasNext() {
1137             // assert lock.getHoldCount() == 0;
1138             if (nextItem != null)
1139                 return true;
1140             noNext();
1141             return false;
1142         }
1143 
noNext()1144         private void noNext() {
1145             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1146             lock.lock();
1147             try {
1148                 // assert cursor == NONE;
1149                 // assert nextIndex == NONE;
1150                 if (!isDetached()) {
1151                     // assert lastRet >= 0;
1152                     incorporateDequeues(); // might update lastRet
1153                     if (lastRet >= 0) {
1154                         lastItem = itemAt(lastRet);
1155                         // assert lastItem != null;
1156                         detach();
1157                     }
1158                 }
1159                 // assert isDetached();
1160                 // assert lastRet < 0 ^ lastItem != null;
1161             } finally {
1162                 lock.unlock();
1163             }
1164         }
1165 
next()1166         public E next() {
1167             // assert lock.getHoldCount() == 0;
1168             final E x = nextItem;
1169             if (x == null)
1170                 throw new NoSuchElementException();
1171             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1172             lock.lock();
1173             try {
1174                 if (!isDetached())
1175                     incorporateDequeues();
1176                 // assert nextIndex != NONE;
1177                 // assert lastItem == null;
1178                 lastRet = nextIndex;
1179                 final int cursor = this.cursor;
1180                 if (cursor >= 0) {
1181                     nextItem = itemAt(nextIndex = cursor);
1182                     // assert nextItem != null;
1183                     this.cursor = incCursor(cursor);
1184                 } else {
1185                     nextIndex = NONE;
1186                     nextItem = null;
1187                 }
1188             } finally {
1189                 lock.unlock();
1190             }
1191             return x;
1192         }
1193 
remove()1194         public void remove() {
1195             // assert lock.getHoldCount() == 0;
1196             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1197             lock.lock();
1198             try {
1199                 if (!isDetached())
1200                     incorporateDequeues(); // might update lastRet or detach
1201                 final int lastRet = this.lastRet;
1202                 this.lastRet = NONE;
1203                 if (lastRet >= 0) {
1204                     if (!isDetached())
1205                         removeAt(lastRet);
1206                     else {
1207                         final E lastItem = this.lastItem;
1208                         // assert lastItem != null;
1209                         this.lastItem = null;
1210                         if (itemAt(lastRet) == lastItem)
1211                             removeAt(lastRet);
1212                     }
1213                 } else if (lastRet == NONE)
1214                     throw new IllegalStateException();
1215                 // else lastRet == REMOVED and the last returned element was
1216                 // previously asynchronously removed via an operation other
1217                 // than this.remove(), so nothing to do.
1218 
1219                 if (cursor < 0 && nextIndex < 0)
1220                     detach();
1221             } finally {
1222                 lock.unlock();
1223                 // assert lastRet == NONE;
1224                 // assert lastItem == null;
1225             }
1226         }
1227 
1228         /**
1229          * Called to notify the iterator that the queue is empty, or that it
1230          * has fallen hopelessly behind, so that it should abandon any
1231          * further iteration, except possibly to return one more element
1232          * from next(), as promised by returning true from hasNext().
1233          */
shutdown()1234         void shutdown() {
1235             // assert lock.getHoldCount() == 1;
1236             cursor = NONE;
1237             if (nextIndex >= 0)
1238                 nextIndex = REMOVED;
1239             if (lastRet >= 0) {
1240                 lastRet = REMOVED;
1241                 lastItem = null;
1242             }
1243             prevTakeIndex = DETACHED;
1244             // Don't set nextItem to null because we must continue to be
1245             // able to return it on next().
1246             //
1247             // Caller will unlink from itrs when convenient.
1248         }
1249 
distance(int index, int prevTakeIndex, int length)1250         private int distance(int index, int prevTakeIndex, int length) {
1251             int distance = index - prevTakeIndex;
1252             if (distance < 0)
1253                 distance += length;
1254             return distance;
1255         }
1256 
1257         /**
1258          * Called whenever an interior remove (not at takeIndex) occurred.
1259          *
1260          * @return true if this iterator should be unlinked from itrs
1261          */
removedAt(int removedIndex)1262         boolean removedAt(int removedIndex) {
1263             // assert lock.getHoldCount() == 1;
1264             if (isDetached())
1265                 return true;
1266 
1267             final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1268             final int prevTakeIndex = this.prevTakeIndex;
1269             final int len = items.length;
1270             // distance from prevTakeIndex to removedIndex
1271             final int removedDistance =
1272                 len * (itrs.cycles - this.prevCycles
1273                        + ((removedIndex < takeIndex) ? 1 : 0))
1274                 + (removedIndex - prevTakeIndex);
1275             // assert itrs.cycles - this.prevCycles >= 0;
1276             // assert itrs.cycles - this.prevCycles <= 1;
1277             // assert removedDistance > 0;
1278             // assert removedIndex != takeIndex;
1279             int cursor = this.cursor;
1280             if (cursor >= 0) {
1281                 int x = distance(cursor, prevTakeIndex, len);
1282                 if (x == removedDistance) {
1283                     if (cursor == putIndex)
1284                         this.cursor = cursor = NONE;
1285                 }
1286                 else if (x > removedDistance) {
1287                     // assert cursor != prevTakeIndex;
1288                     this.cursor = cursor = dec(cursor);
1289                 }
1290             }
1291             int lastRet = this.lastRet;
1292             if (lastRet >= 0) {
1293                 int x = distance(lastRet, prevTakeIndex, len);
1294                 if (x == removedDistance)
1295                     this.lastRet = lastRet = REMOVED;
1296                 else if (x > removedDistance)
1297                     this.lastRet = lastRet = dec(lastRet);
1298             }
1299             int nextIndex = this.nextIndex;
1300             if (nextIndex >= 0) {
1301                 int x = distance(nextIndex, prevTakeIndex, len);
1302                 if (x == removedDistance)
1303                     this.nextIndex = nextIndex = REMOVED;
1304                 else if (x > removedDistance)
1305                     this.nextIndex = nextIndex = dec(nextIndex);
1306             }
1307             if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
1308                 this.prevTakeIndex = DETACHED;
1309                 return true;
1310             }
1311             return false;
1312         }
1313 
1314         /**
1315          * Called whenever takeIndex wraps around to zero.
1316          *
1317          * @return true if this iterator should be unlinked from itrs
1318          */
takeIndexWrapped()1319         boolean takeIndexWrapped() {
1320             // assert lock.getHoldCount() == 1;
1321             if (isDetached())
1322                 return true;
1323             if (itrs.cycles - prevCycles > 1) {
1324                 // All the elements that existed at the time of the last
1325                 // operation are gone, so abandon further iteration.
1326                 shutdown();
1327                 return true;
1328             }
1329             return false;
1330         }
1331 
1332 //         /** Uncomment for debugging. */
1333 //         public String toString() {
1334 //             return ("cursor=" + cursor + " " +
1335 //                     "nextIndex=" + nextIndex + " " +
1336 //                     "lastRet=" + lastRet + " " +
1337 //                     "nextItem=" + nextItem + " " +
1338 //                     "lastItem=" + lastItem + " " +
1339 //                     "prevCycles=" + prevCycles + " " +
1340 //                     "prevTakeIndex=" + prevTakeIndex + " " +
1341 //                     "size()=" + size() + " " +
1342 //                     "remainingCapacity()=" + remainingCapacity());
1343 //         }
1344     }
1345 
1346     /**
1347      * Returns a {@link Spliterator} over the elements in this queue.
1348      *
1349      * <p>The returned spliterator is
1350      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1351      *
1352      * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
1353      * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
1354      *
1355      * @implNote
1356      * The {@code Spliterator} implements {@code trySplit} to permit limited
1357      * parallelism.
1358      *
1359      * @return a {@code Spliterator} over the elements in this queue
1360      * @since 1.8
1361      */
spliterator()1362     public Spliterator<E> spliterator() {
1363         return Spliterators.spliterator
1364             (this, (Spliterator.ORDERED |
1365                     Spliterator.NONNULL |
1366                     Spliterator.CONCURRENT));
1367     }
1368 
1369 }
1370