1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7 package java.util.concurrent; 8 9 import java.lang.ref.WeakReference; 10 import java.util.AbstractQueue; 11 import java.util.Arrays; 12 import java.util.Collection; 13 import java.util.Iterator; 14 import java.util.NoSuchElementException; 15 import java.util.Objects; 16 import java.util.Spliterator; 17 import java.util.Spliterators; 18 import java.util.concurrent.locks.Condition; 19 import java.util.concurrent.locks.ReentrantLock; 20 21 // BEGIN android-note 22 // removed link to collections framework docs 23 // END android-note 24 25 /** 26 * A bounded {@linkplain BlockingQueue blocking queue} backed by an 27 * array. This queue orders elements FIFO (first-in-first-out). The 28 * <em>head</em> of the queue is that element that has been on the 29 * queue the longest time. The <em>tail</em> of the queue is that 30 * element that has been on the queue the shortest time. New elements 31 * are inserted at the tail of the queue, and the queue retrieval 32 * operations obtain elements at the head of the queue. 33 * 34 * <p>This is a classic "bounded buffer", in which a 35 * fixed-sized array holds elements inserted by producers and 36 * extracted by consumers. Once created, the capacity cannot be 37 * changed. Attempts to {@code put} an element into a full queue 38 * will result in the operation blocking; attempts to {@code take} an 39 * element from an empty queue will similarly block. 40 * 41 * <p>This class supports an optional fairness policy for ordering 42 * waiting producer and consumer threads. By default, this ordering 43 * is not guaranteed. However, a queue constructed with fairness set 44 * to {@code true} grants threads access in FIFO order. Fairness 45 * generally decreases throughput but reduces variability and avoids 46 * starvation. 47 * 48 * <p>This class and its iterator implement all of the 49 * <em>optional</em> methods of the {@link Collection} and {@link 50 * Iterator} interfaces. 51 * 52 * @since 1.5 53 * @author Doug Lea 54 * @param <E> the type of elements held in this queue 55 */ 56 public class ArrayBlockingQueue<E> extends AbstractQueue<E> 57 implements BlockingQueue<E>, java.io.Serializable { 58 59 /** 60 * Serialization ID. This class relies on default serialization 61 * even for the items array, which is default-serialized, even if 62 * it is empty. Otherwise it could not be declared final, which is 63 * necessary here. 64 */ 65 private static final long serialVersionUID = -817911632652898426L; 66 67 /** The queued items */ 68 final Object[] items; 69 70 /** items index for next take, poll, peek or remove */ 71 int takeIndex; 72 73 /** items index for next put, offer, or add */ 74 int putIndex; 75 76 /** Number of elements in the queue */ 77 int count; 78 79 /* 80 * Concurrency control uses the classic two-condition algorithm 81 * found in any textbook. 82 */ 83 84 /** Main lock guarding all access */ 85 final ReentrantLock lock; 86 87 /** Condition for waiting takes */ 88 private final Condition notEmpty; 89 90 /** Condition for waiting puts */ 91 private final Condition notFull; 92 93 /** 94 * Shared state for currently active iterators, or null if there 95 * are known not to be any. Allows queue operations to update 96 * iterator state. 97 */ 98 transient Itrs itrs; 99 100 // Internal helper methods 101 102 /** 103 * Circularly decrements array index i. 104 */ dec(int i)105 final int dec(int i) { 106 return ((i == 0) ? items.length : i) - 1; 107 } 108 109 /** 110 * Returns item at index i. 111 */ 112 @SuppressWarnings("unchecked") itemAt(int i)113 final E itemAt(int i) { 114 return (E) items[i]; 115 } 116 117 /** 118 * Inserts element at current put position, advances, and signals. 119 * Call only when holding lock. 120 */ enqueue(E x)121 private void enqueue(E x) { 122 // assert lock.getHoldCount() == 1; 123 // assert items[putIndex] == null; 124 final Object[] items = this.items; 125 items[putIndex] = x; 126 if (++putIndex == items.length) putIndex = 0; 127 count++; 128 notEmpty.signal(); 129 } 130 131 /** 132 * Extracts element at current take position, advances, and signals. 133 * Call only when holding lock. 134 */ dequeue()135 private E dequeue() { 136 // assert lock.getHoldCount() == 1; 137 // assert items[takeIndex] != null; 138 final Object[] items = this.items; 139 @SuppressWarnings("unchecked") 140 E x = (E) items[takeIndex]; 141 items[takeIndex] = null; 142 if (++takeIndex == items.length) takeIndex = 0; 143 count--; 144 if (itrs != null) 145 itrs.elementDequeued(); 146 notFull.signal(); 147 return x; 148 } 149 150 /** 151 * Deletes item at array index removeIndex. 152 * Utility for remove(Object) and iterator.remove. 153 * Call only when holding lock. 154 */ removeAt(final int removeIndex)155 void removeAt(final int removeIndex) { 156 // assert lock.getHoldCount() == 1; 157 // assert items[removeIndex] != null; 158 // assert removeIndex >= 0 && removeIndex < items.length; 159 final Object[] items = this.items; 160 if (removeIndex == takeIndex) { 161 // removing front item; just advance 162 items[takeIndex] = null; 163 if (++takeIndex == items.length) takeIndex = 0; 164 count--; 165 if (itrs != null) 166 itrs.elementDequeued(); 167 } else { 168 // an "interior" remove 169 170 // slide over all others up through putIndex. 171 for (int i = removeIndex, putIndex = this.putIndex;;) { 172 int pred = i; 173 if (++i == items.length) i = 0; 174 if (i == putIndex) { 175 items[pred] = null; 176 this.putIndex = pred; 177 break; 178 } 179 items[pred] = items[i]; 180 } 181 count--; 182 if (itrs != null) 183 itrs.removedAt(removeIndex); 184 } 185 notFull.signal(); 186 } 187 188 /** 189 * Creates an {@code ArrayBlockingQueue} with the given (fixed) 190 * capacity and default access policy. 191 * 192 * @param capacity the capacity of this queue 193 * @throws IllegalArgumentException if {@code capacity < 1} 194 */ ArrayBlockingQueue(int capacity)195 public ArrayBlockingQueue(int capacity) { 196 this(capacity, false); 197 } 198 199 /** 200 * Creates an {@code ArrayBlockingQueue} with the given (fixed) 201 * capacity and the specified access policy. 202 * 203 * @param capacity the capacity of this queue 204 * @param fair if {@code true} then queue accesses for threads blocked 205 * on insertion or removal, are processed in FIFO order; 206 * if {@code false} the access order is unspecified. 207 * @throws IllegalArgumentException if {@code capacity < 1} 208 */ ArrayBlockingQueue(int capacity, boolean fair)209 public ArrayBlockingQueue(int capacity, boolean fair) { 210 if (capacity <= 0) 211 throw new IllegalArgumentException(); 212 this.items = new Object[capacity]; 213 lock = new ReentrantLock(fair); 214 notEmpty = lock.newCondition(); 215 notFull = lock.newCondition(); 216 } 217 218 /** 219 * Creates an {@code ArrayBlockingQueue} with the given (fixed) 220 * capacity, the specified access policy and initially containing the 221 * elements of the given collection, 222 * added in traversal order of the collection's iterator. 223 * 224 * @param capacity the capacity of this queue 225 * @param fair if {@code true} then queue accesses for threads blocked 226 * on insertion or removal, are processed in FIFO order; 227 * if {@code false} the access order is unspecified. 228 * @param c the collection of elements to initially contain 229 * @throws IllegalArgumentException if {@code capacity} is less than 230 * {@code c.size()}, or less than 1. 231 * @throws NullPointerException if the specified collection or any 232 * of its elements are null 233 */ ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)234 public ArrayBlockingQueue(int capacity, boolean fair, 235 Collection<? extends E> c) { 236 this(capacity, fair); 237 238 final ReentrantLock lock = this.lock; 239 lock.lock(); // Lock only for visibility, not mutual exclusion 240 try { 241 int i = 0; 242 try { 243 for (E e : c) 244 items[i++] = Objects.requireNonNull(e); 245 } catch (ArrayIndexOutOfBoundsException ex) { 246 throw new IllegalArgumentException(); 247 } 248 count = i; 249 putIndex = (i == capacity) ? 0 : i; 250 } finally { 251 lock.unlock(); 252 } 253 } 254 255 /** 256 * Inserts the specified element at the tail of this queue if it is 257 * possible to do so immediately without exceeding the queue's capacity, 258 * returning {@code true} upon success and throwing an 259 * {@code IllegalStateException} if this queue is full. 260 * 261 * @param e the element to add 262 * @return {@code true} (as specified by {@link Collection#add}) 263 * @throws IllegalStateException if this queue is full 264 * @throws NullPointerException if the specified element is null 265 */ add(E e)266 public boolean add(E e) { 267 return super.add(e); 268 } 269 270 /** 271 * Inserts the specified element at the tail of this queue if it is 272 * possible to do so immediately without exceeding the queue's capacity, 273 * returning {@code true} upon success and {@code false} if this queue 274 * is full. This method is generally preferable to method {@link #add}, 275 * which can fail to insert an element only by throwing an exception. 276 * 277 * @throws NullPointerException if the specified element is null 278 */ offer(E e)279 public boolean offer(E e) { 280 Objects.requireNonNull(e); 281 final ReentrantLock lock = this.lock; 282 lock.lock(); 283 try { 284 if (count == items.length) 285 return false; 286 else { 287 enqueue(e); 288 return true; 289 } 290 } finally { 291 lock.unlock(); 292 } 293 } 294 295 /** 296 * Inserts the specified element at the tail of this queue, waiting 297 * for space to become available if the queue is full. 298 * 299 * @throws InterruptedException {@inheritDoc} 300 * @throws NullPointerException {@inheritDoc} 301 */ put(E e)302 public void put(E e) throws InterruptedException { 303 Objects.requireNonNull(e); 304 final ReentrantLock lock = this.lock; 305 lock.lockInterruptibly(); 306 try { 307 while (count == items.length) 308 notFull.await(); 309 enqueue(e); 310 } finally { 311 lock.unlock(); 312 } 313 } 314 315 /** 316 * Inserts the specified element at the tail of this queue, waiting 317 * up to the specified wait time for space to become available if 318 * the queue is full. 319 * 320 * @throws InterruptedException {@inheritDoc} 321 * @throws NullPointerException {@inheritDoc} 322 */ offer(E e, long timeout, TimeUnit unit)323 public boolean offer(E e, long timeout, TimeUnit unit) 324 throws InterruptedException { 325 326 Objects.requireNonNull(e); 327 long nanos = unit.toNanos(timeout); 328 final ReentrantLock lock = this.lock; 329 lock.lockInterruptibly(); 330 try { 331 while (count == items.length) { 332 if (nanos <= 0L) 333 return false; 334 nanos = notFull.awaitNanos(nanos); 335 } 336 enqueue(e); 337 return true; 338 } finally { 339 lock.unlock(); 340 } 341 } 342 poll()343 public E poll() { 344 final ReentrantLock lock = this.lock; 345 lock.lock(); 346 try { 347 return (count == 0) ? null : dequeue(); 348 } finally { 349 lock.unlock(); 350 } 351 } 352 take()353 public E take() throws InterruptedException { 354 final ReentrantLock lock = this.lock; 355 lock.lockInterruptibly(); 356 try { 357 while (count == 0) 358 notEmpty.await(); 359 return dequeue(); 360 } finally { 361 lock.unlock(); 362 } 363 } 364 poll(long timeout, TimeUnit unit)365 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 366 long nanos = unit.toNanos(timeout); 367 final ReentrantLock lock = this.lock; 368 lock.lockInterruptibly(); 369 try { 370 while (count == 0) { 371 if (nanos <= 0L) 372 return null; 373 nanos = notEmpty.awaitNanos(nanos); 374 } 375 return dequeue(); 376 } finally { 377 lock.unlock(); 378 } 379 } 380 peek()381 public E peek() { 382 final ReentrantLock lock = this.lock; 383 lock.lock(); 384 try { 385 return itemAt(takeIndex); // null when queue is empty 386 } finally { 387 lock.unlock(); 388 } 389 } 390 391 // this doc comment is overridden to remove the reference to collections 392 // greater in size than Integer.MAX_VALUE 393 /** 394 * Returns the number of elements in this queue. 395 * 396 * @return the number of elements in this queue 397 */ size()398 public int size() { 399 final ReentrantLock lock = this.lock; 400 lock.lock(); 401 try { 402 return count; 403 } finally { 404 lock.unlock(); 405 } 406 } 407 408 // this doc comment is a modified copy of the inherited doc comment, 409 // without the reference to unlimited queues. 410 /** 411 * Returns the number of additional elements that this queue can ideally 412 * (in the absence of memory or resource constraints) accept without 413 * blocking. This is always equal to the initial capacity of this queue 414 * less the current {@code size} of this queue. 415 * 416 * <p>Note that you <em>cannot</em> always tell if an attempt to insert 417 * an element will succeed by inspecting {@code remainingCapacity} 418 * because it may be the case that another thread is about to 419 * insert or remove an element. 420 */ remainingCapacity()421 public int remainingCapacity() { 422 final ReentrantLock lock = this.lock; 423 lock.lock(); 424 try { 425 return items.length - count; 426 } finally { 427 lock.unlock(); 428 } 429 } 430 431 /** 432 * Removes a single instance of the specified element from this queue, 433 * if it is present. More formally, removes an element {@code e} such 434 * that {@code o.equals(e)}, if this queue contains one or more such 435 * elements. 436 * Returns {@code true} if this queue contained the specified element 437 * (or equivalently, if this queue changed as a result of the call). 438 * 439 * <p>Removal of interior elements in circular array based queues 440 * is an intrinsically slow and disruptive operation, so should 441 * be undertaken only in exceptional circumstances, ideally 442 * only when the queue is known not to be accessible by other 443 * threads. 444 * 445 * @param o element to be removed from this queue, if present 446 * @return {@code true} if this queue changed as a result of the call 447 */ remove(Object o)448 public boolean remove(Object o) { 449 if (o == null) return false; 450 final ReentrantLock lock = this.lock; 451 lock.lock(); 452 try { 453 if (count > 0) { 454 final Object[] items = this.items; 455 final int putIndex = this.putIndex; 456 int i = takeIndex; 457 do { 458 if (o.equals(items[i])) { 459 removeAt(i); 460 return true; 461 } 462 if (++i == items.length) i = 0; 463 } while (i != putIndex); 464 } 465 return false; 466 } finally { 467 lock.unlock(); 468 } 469 } 470 471 /** 472 * Returns {@code true} if this queue contains the specified element. 473 * More formally, returns {@code true} if and only if this queue contains 474 * at least one element {@code e} such that {@code o.equals(e)}. 475 * 476 * @param o object to be checked for containment in this queue 477 * @return {@code true} if this queue contains the specified element 478 */ contains(Object o)479 public boolean contains(Object o) { 480 if (o == null) return false; 481 final ReentrantLock lock = this.lock; 482 lock.lock(); 483 try { 484 if (count > 0) { 485 final Object[] items = this.items; 486 final int putIndex = this.putIndex; 487 int i = takeIndex; 488 do { 489 if (o.equals(items[i])) 490 return true; 491 if (++i == items.length) i = 0; 492 } while (i != putIndex); 493 } 494 return false; 495 } finally { 496 lock.unlock(); 497 } 498 } 499 500 /** 501 * Returns an array containing all of the elements in this queue, in 502 * proper sequence. 503 * 504 * <p>The returned array will be "safe" in that no references to it are 505 * maintained by this queue. (In other words, this method must allocate 506 * a new array). The caller is thus free to modify the returned array. 507 * 508 * <p>This method acts as bridge between array-based and collection-based 509 * APIs. 510 * 511 * @return an array containing all of the elements in this queue 512 */ toArray()513 public Object[] toArray() { 514 final ReentrantLock lock = this.lock; 515 lock.lock(); 516 try { 517 final Object[] items = this.items; 518 final int end = takeIndex + count; 519 final Object[] a = Arrays.copyOfRange(items, takeIndex, end); 520 if (end != putIndex) 521 System.arraycopy(items, 0, a, items.length - takeIndex, putIndex); 522 return a; 523 } finally { 524 lock.unlock(); 525 } 526 } 527 528 /** 529 * Returns an array containing all of the elements in this queue, in 530 * proper sequence; the runtime type of the returned array is that of 531 * the specified array. If the queue fits in the specified array, it 532 * is returned therein. Otherwise, a new array is allocated with the 533 * runtime type of the specified array and the size of this queue. 534 * 535 * <p>If this queue fits in the specified array with room to spare 536 * (i.e., the array has more elements than this queue), the element in 537 * the array immediately following the end of the queue is set to 538 * {@code null}. 539 * 540 * <p>Like the {@link #toArray()} method, this method acts as bridge between 541 * array-based and collection-based APIs. Further, this method allows 542 * precise control over the runtime type of the output array, and may, 543 * under certain circumstances, be used to save allocation costs. 544 * 545 * <p>Suppose {@code x} is a queue known to contain only strings. 546 * The following code can be used to dump the queue into a newly 547 * allocated array of {@code String}: 548 * 549 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre> 550 * 551 * Note that {@code toArray(new Object[0])} is identical in function to 552 * {@code toArray()}. 553 * 554 * @param a the array into which the elements of the queue are to 555 * be stored, if it is big enough; otherwise, a new array of the 556 * same runtime type is allocated for this purpose 557 * @return an array containing all of the elements in this queue 558 * @throws ArrayStoreException if the runtime type of the specified array 559 * is not a supertype of the runtime type of every element in 560 * this queue 561 * @throws NullPointerException if the specified array is null 562 */ 563 @SuppressWarnings("unchecked") toArray(T[] a)564 public <T> T[] toArray(T[] a) { 565 final ReentrantLock lock = this.lock; 566 lock.lock(); 567 try { 568 final Object[] items = this.items; 569 final int count = this.count; 570 final int firstLeg = Math.min(items.length - takeIndex, count); 571 if (a.length < count) { 572 a = (T[]) Arrays.copyOfRange(items, takeIndex, takeIndex + count, 573 a.getClass()); 574 } else { 575 System.arraycopy(items, takeIndex, a, 0, firstLeg); 576 if (a.length > count) 577 a[count] = null; 578 } 579 if (firstLeg < count) 580 System.arraycopy(items, 0, a, firstLeg, putIndex); 581 return a; 582 } finally { 583 lock.unlock(); 584 } 585 } 586 toString()587 public String toString() { 588 return Helpers.collectionToString(this); 589 } 590 591 /** 592 * Atomically removes all of the elements from this queue. 593 * The queue will be empty after this call returns. 594 */ clear()595 public void clear() { 596 final ReentrantLock lock = this.lock; 597 lock.lock(); 598 try { 599 int k = count; 600 if (k > 0) { 601 final Object[] items = this.items; 602 final int putIndex = this.putIndex; 603 int i = takeIndex; 604 do { 605 items[i] = null; 606 if (++i == items.length) i = 0; 607 } while (i != putIndex); 608 takeIndex = putIndex; 609 count = 0; 610 if (itrs != null) 611 itrs.queueIsEmpty(); 612 for (; k > 0 && lock.hasWaiters(notFull); k--) 613 notFull.signal(); 614 } 615 } finally { 616 lock.unlock(); 617 } 618 } 619 620 /** 621 * @throws UnsupportedOperationException {@inheritDoc} 622 * @throws ClassCastException {@inheritDoc} 623 * @throws NullPointerException {@inheritDoc} 624 * @throws IllegalArgumentException {@inheritDoc} 625 */ drainTo(Collection<? super E> c)626 public int drainTo(Collection<? super E> c) { 627 return drainTo(c, Integer.MAX_VALUE); 628 } 629 630 /** 631 * @throws UnsupportedOperationException {@inheritDoc} 632 * @throws ClassCastException {@inheritDoc} 633 * @throws NullPointerException {@inheritDoc} 634 * @throws IllegalArgumentException {@inheritDoc} 635 */ drainTo(Collection<? super E> c, int maxElements)636 public int drainTo(Collection<? super E> c, int maxElements) { 637 Objects.requireNonNull(c); 638 if (c == this) 639 throw new IllegalArgumentException(); 640 if (maxElements <= 0) 641 return 0; 642 final Object[] items = this.items; 643 final ReentrantLock lock = this.lock; 644 lock.lock(); 645 try { 646 int n = Math.min(maxElements, count); 647 int take = takeIndex; 648 int i = 0; 649 try { 650 while (i < n) { 651 @SuppressWarnings("unchecked") 652 E x = (E) items[take]; 653 c.add(x); 654 items[take] = null; 655 if (++take == items.length) take = 0; 656 i++; 657 } 658 return n; 659 } finally { 660 // Restore invariants even if c.add() threw 661 if (i > 0) { 662 count -= i; 663 takeIndex = take; 664 if (itrs != null) { 665 if (count == 0) 666 itrs.queueIsEmpty(); 667 else if (i > take) 668 itrs.takeIndexWrapped(); 669 } 670 for (; i > 0 && lock.hasWaiters(notFull); i--) 671 notFull.signal(); 672 } 673 } 674 } finally { 675 lock.unlock(); 676 } 677 } 678 679 /** 680 * Returns an iterator over the elements in this queue in proper sequence. 681 * The elements will be returned in order from first (head) to last (tail). 682 * 683 * <p>The returned iterator is 684 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 685 * 686 * @return an iterator over the elements in this queue in proper sequence 687 */ iterator()688 public Iterator<E> iterator() { 689 return new Itr(); 690 } 691 692 /** 693 * Shared data between iterators and their queue, allowing queue 694 * modifications to update iterators when elements are removed. 695 * 696 * This adds a lot of complexity for the sake of correctly 697 * handling some uncommon operations, but the combination of 698 * circular-arrays and supporting interior removes (i.e., those 699 * not at head) would cause iterators to sometimes lose their 700 * places and/or (re)report elements they shouldn't. To avoid 701 * this, when a queue has one or more iterators, it keeps iterator 702 * state consistent by: 703 * 704 * (1) keeping track of the number of "cycles", that is, the 705 * number of times takeIndex has wrapped around to 0. 706 * (2) notifying all iterators via the callback removedAt whenever 707 * an interior element is removed (and thus other elements may 708 * be shifted). 709 * 710 * These suffice to eliminate iterator inconsistencies, but 711 * unfortunately add the secondary responsibility of maintaining 712 * the list of iterators. We track all active iterators in a 713 * simple linked list (accessed only when the queue's lock is 714 * held) of weak references to Itr. The list is cleaned up using 715 * 3 different mechanisms: 716 * 717 * (1) Whenever a new iterator is created, do some O(1) checking for 718 * stale list elements. 719 * 720 * (2) Whenever takeIndex wraps around to 0, check for iterators 721 * that have been unused for more than one wrap-around cycle. 722 * 723 * (3) Whenever the queue becomes empty, all iterators are notified 724 * and this entire data structure is discarded. 725 * 726 * So in addition to the removedAt callback that is necessary for 727 * correctness, iterators have the shutdown and takeIndexWrapped 728 * callbacks that help remove stale iterators from the list. 729 * 730 * Whenever a list element is examined, it is expunged if either 731 * the GC has determined that the iterator is discarded, or if the 732 * iterator reports that it is "detached" (does not need any 733 * further state updates). Overhead is maximal when takeIndex 734 * never advances, iterators are discarded before they are 735 * exhausted, and all removals are interior removes, in which case 736 * all stale iterators are discovered by the GC. But even in this 737 * case we don't increase the amortized complexity. 738 * 739 * Care must be taken to keep list sweeping methods from 740 * reentrantly invoking another such method, causing subtle 741 * corruption bugs. 742 */ 743 class Itrs { 744 745 /** 746 * Node in a linked list of weak iterator references. 747 */ 748 private class Node extends WeakReference<Itr> { 749 Node next; 750 Node(Itr iterator, Node next)751 Node(Itr iterator, Node next) { 752 super(iterator); 753 this.next = next; 754 } 755 } 756 757 /** Incremented whenever takeIndex wraps around to 0 */ 758 int cycles; 759 760 /** Linked list of weak iterator references */ 761 private Node head; 762 763 /** Used to expunge stale iterators */ 764 private Node sweeper; 765 766 private static final int SHORT_SWEEP_PROBES = 4; 767 private static final int LONG_SWEEP_PROBES = 16; 768 Itrs(Itr initial)769 Itrs(Itr initial) { 770 register(initial); 771 } 772 773 /** 774 * Sweeps itrs, looking for and expunging stale iterators. 775 * If at least one was found, tries harder to find more. 776 * Called only from iterating thread. 777 * 778 * @param tryHarder whether to start in try-harder mode, because 779 * there is known to be at least one iterator to collect 780 */ doSomeSweeping(boolean tryHarder)781 void doSomeSweeping(boolean tryHarder) { 782 // assert lock.getHoldCount() == 1; 783 // assert head != null; 784 int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES; 785 Node o, p; 786 final Node sweeper = this.sweeper; 787 boolean passedGo; // to limit search to one full sweep 788 789 if (sweeper == null) { 790 o = null; 791 p = head; 792 passedGo = true; 793 } else { 794 o = sweeper; 795 p = o.next; 796 passedGo = false; 797 } 798 799 for (; probes > 0; probes--) { 800 if (p == null) { 801 if (passedGo) 802 break; 803 o = null; 804 p = head; 805 passedGo = true; 806 } 807 final Itr it = p.get(); 808 final Node next = p.next; 809 if (it == null || it.isDetached()) { 810 // found a discarded/exhausted iterator 811 probes = LONG_SWEEP_PROBES; // "try harder" 812 // unlink p 813 p.clear(); 814 p.next = null; 815 if (o == null) { 816 head = next; 817 if (next == null) { 818 // We've run out of iterators to track; retire 819 itrs = null; 820 return; 821 } 822 } 823 else 824 o.next = next; 825 } else { 826 o = p; 827 } 828 p = next; 829 } 830 831 this.sweeper = (p == null) ? null : o; 832 } 833 834 /** 835 * Adds a new iterator to the linked list of tracked iterators. 836 */ register(Itr itr)837 void register(Itr itr) { 838 // assert lock.getHoldCount() == 1; 839 head = new Node(itr, head); 840 } 841 842 /** 843 * Called whenever takeIndex wraps around to 0. 844 * 845 * Notifies all iterators, and expunges any that are now stale. 846 */ takeIndexWrapped()847 void takeIndexWrapped() { 848 // assert lock.getHoldCount() == 1; 849 cycles++; 850 for (Node o = null, p = head; p != null;) { 851 final Itr it = p.get(); 852 final Node next = p.next; 853 if (it == null || it.takeIndexWrapped()) { 854 // unlink p 855 // assert it == null || it.isDetached(); 856 p.clear(); 857 p.next = null; 858 if (o == null) 859 head = next; 860 else 861 o.next = next; 862 } else { 863 o = p; 864 } 865 p = next; 866 } 867 if (head == null) // no more iterators to track 868 itrs = null; 869 } 870 871 /** 872 * Called whenever an interior remove (not at takeIndex) occurred. 873 * 874 * Notifies all iterators, and expunges any that are now stale. 875 */ removedAt(int removedIndex)876 void removedAt(int removedIndex) { 877 for (Node o = null, p = head; p != null;) { 878 final Itr it = p.get(); 879 final Node next = p.next; 880 if (it == null || it.removedAt(removedIndex)) { 881 // unlink p 882 // assert it == null || it.isDetached(); 883 p.clear(); 884 p.next = null; 885 if (o == null) 886 head = next; 887 else 888 o.next = next; 889 } else { 890 o = p; 891 } 892 p = next; 893 } 894 if (head == null) // no more iterators to track 895 itrs = null; 896 } 897 898 /** 899 * Called whenever the queue becomes empty. 900 * 901 * Notifies all active iterators that the queue is empty, 902 * clears all weak refs, and unlinks the itrs datastructure. 903 */ queueIsEmpty()904 void queueIsEmpty() { 905 // assert lock.getHoldCount() == 1; 906 for (Node p = head; p != null; p = p.next) { 907 Itr it = p.get(); 908 if (it != null) { 909 p.clear(); 910 it.shutdown(); 911 } 912 } 913 head = null; 914 itrs = null; 915 } 916 917 /** 918 * Called whenever an element has been dequeued (at takeIndex). 919 */ elementDequeued()920 void elementDequeued() { 921 // assert lock.getHoldCount() == 1; 922 if (count == 0) 923 queueIsEmpty(); 924 else if (takeIndex == 0) 925 takeIndexWrapped(); 926 } 927 } 928 929 /** 930 * Iterator for ArrayBlockingQueue. 931 * 932 * To maintain weak consistency with respect to puts and takes, we 933 * read ahead one slot, so as to not report hasNext true but then 934 * not have an element to return. 935 * 936 * We switch into "detached" mode (allowing prompt unlinking from 937 * itrs without help from the GC) when all indices are negative, or 938 * when hasNext returns false for the first time. This allows the 939 * iterator to track concurrent updates completely accurately, 940 * except for the corner case of the user calling Iterator.remove() 941 * after hasNext() returned false. Even in this case, we ensure 942 * that we don't remove the wrong element by keeping track of the 943 * expected element to remove, in lastItem. Yes, we may fail to 944 * remove lastItem from the queue if it moved due to an interleaved 945 * interior remove while in detached mode. 946 */ 947 private class Itr implements Iterator<E> { 948 /** Index to look for new nextItem; NONE at end */ 949 private int cursor; 950 951 /** Element to be returned by next call to next(); null if none */ 952 private E nextItem; 953 954 /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */ 955 private int nextIndex; 956 957 /** Last element returned; null if none or not detached. */ 958 private E lastItem; 959 960 /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */ 961 private int lastRet; 962 963 /** Previous value of takeIndex, or DETACHED when detached */ 964 private int prevTakeIndex; 965 966 /** Previous value of iters.cycles */ 967 private int prevCycles; 968 969 /** Special index value indicating "not available" or "undefined" */ 970 private static final int NONE = -1; 971 972 /** 973 * Special index value indicating "removed elsewhere", that is, 974 * removed by some operation other than a call to this.remove(). 975 */ 976 private static final int REMOVED = -2; 977 978 /** Special value for prevTakeIndex indicating "detached mode" */ 979 private static final int DETACHED = -3; 980 Itr()981 Itr() { 982 // assert lock.getHoldCount() == 0; 983 lastRet = NONE; 984 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 985 lock.lock(); 986 try { 987 if (count == 0) { 988 // assert itrs == null; 989 cursor = NONE; 990 nextIndex = NONE; 991 prevTakeIndex = DETACHED; 992 } else { 993 final int takeIndex = ArrayBlockingQueue.this.takeIndex; 994 prevTakeIndex = takeIndex; 995 nextItem = itemAt(nextIndex = takeIndex); 996 cursor = incCursor(takeIndex); 997 if (itrs == null) { 998 itrs = new Itrs(this); 999 } else { 1000 itrs.register(this); // in this order 1001 itrs.doSomeSweeping(false); 1002 } 1003 prevCycles = itrs.cycles; 1004 // assert takeIndex >= 0; 1005 // assert prevTakeIndex == takeIndex; 1006 // assert nextIndex >= 0; 1007 // assert nextItem != null; 1008 } 1009 } finally { 1010 lock.unlock(); 1011 } 1012 } 1013 isDetached()1014 boolean isDetached() { 1015 // assert lock.getHoldCount() == 1; 1016 return prevTakeIndex < 0; 1017 } 1018 incCursor(int index)1019 private int incCursor(int index) { 1020 // assert lock.getHoldCount() == 1; 1021 if (++index == items.length) index = 0; 1022 if (index == putIndex) index = NONE; 1023 return index; 1024 } 1025 1026 /** 1027 * Returns true if index is invalidated by the given number of 1028 * dequeues, starting from prevTakeIndex. 1029 */ invalidated(int index, int prevTakeIndex, long dequeues, int length)1030 private boolean invalidated(int index, int prevTakeIndex, 1031 long dequeues, int length) { 1032 if (index < 0) 1033 return false; 1034 int distance = index - prevTakeIndex; 1035 if (distance < 0) 1036 distance += length; 1037 return dequeues > distance; 1038 } 1039 1040 /** 1041 * Adjusts indices to incorporate all dequeues since the last 1042 * operation on this iterator. Call only from iterating thread. 1043 */ incorporateDequeues()1044 private void incorporateDequeues() { 1045 // assert lock.getHoldCount() == 1; 1046 // assert itrs != null; 1047 // assert !isDetached(); 1048 // assert count > 0; 1049 1050 final int cycles = itrs.cycles; 1051 final int takeIndex = ArrayBlockingQueue.this.takeIndex; 1052 final int prevCycles = this.prevCycles; 1053 final int prevTakeIndex = this.prevTakeIndex; 1054 1055 if (cycles != prevCycles || takeIndex != prevTakeIndex) { 1056 final int len = items.length; 1057 // how far takeIndex has advanced since the previous 1058 // operation of this iterator 1059 long dequeues = (cycles - prevCycles) * len 1060 + (takeIndex - prevTakeIndex); 1061 1062 // Check indices for invalidation 1063 if (invalidated(lastRet, prevTakeIndex, dequeues, len)) 1064 lastRet = REMOVED; 1065 if (invalidated(nextIndex, prevTakeIndex, dequeues, len)) 1066 nextIndex = REMOVED; 1067 if (invalidated(cursor, prevTakeIndex, dequeues, len)) 1068 cursor = takeIndex; 1069 1070 if (cursor < 0 && nextIndex < 0 && lastRet < 0) 1071 detach(); 1072 else { 1073 this.prevCycles = cycles; 1074 this.prevTakeIndex = takeIndex; 1075 } 1076 } 1077 } 1078 1079 /** 1080 * Called when itrs should stop tracking this iterator, either 1081 * because there are no more indices to update (cursor < 0 && 1082 * nextIndex < 0 && lastRet < 0) or as a special exception, when 1083 * lastRet >= 0, because hasNext() is about to return false for the 1084 * first time. Call only from iterating thread. 1085 */ detach()1086 private void detach() { 1087 // Switch to detached mode 1088 // assert lock.getHoldCount() == 1; 1089 // assert cursor == NONE; 1090 // assert nextIndex < 0; 1091 // assert lastRet < 0 || nextItem == null; 1092 // assert lastRet < 0 ^ lastItem != null; 1093 if (prevTakeIndex >= 0) { 1094 // assert itrs != null; 1095 prevTakeIndex = DETACHED; 1096 // try to unlink from itrs (but not too hard) 1097 itrs.doSomeSweeping(true); 1098 } 1099 } 1100 1101 /** 1102 * For performance reasons, we would like not to acquire a lock in 1103 * hasNext in the common case. To allow for this, we only access 1104 * fields (i.e. nextItem) that are not modified by update operations 1105 * triggered by queue modifications. 1106 */ hasNext()1107 public boolean hasNext() { 1108 // assert lock.getHoldCount() == 0; 1109 if (nextItem != null) 1110 return true; 1111 noNext(); 1112 return false; 1113 } 1114 noNext()1115 private void noNext() { 1116 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1117 lock.lock(); 1118 try { 1119 // assert cursor == NONE; 1120 // assert nextIndex == NONE; 1121 if (!isDetached()) { 1122 // assert lastRet >= 0; 1123 incorporateDequeues(); // might update lastRet 1124 if (lastRet >= 0) { 1125 lastItem = itemAt(lastRet); 1126 // assert lastItem != null; 1127 detach(); 1128 } 1129 } 1130 // assert isDetached(); 1131 // assert lastRet < 0 ^ lastItem != null; 1132 } finally { 1133 lock.unlock(); 1134 } 1135 } 1136 next()1137 public E next() { 1138 // assert lock.getHoldCount() == 0; 1139 final E x = nextItem; 1140 if (x == null) 1141 throw new NoSuchElementException(); 1142 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1143 lock.lock(); 1144 try { 1145 if (!isDetached()) 1146 incorporateDequeues(); 1147 // assert nextIndex != NONE; 1148 // assert lastItem == null; 1149 lastRet = nextIndex; 1150 final int cursor = this.cursor; 1151 if (cursor >= 0) { 1152 nextItem = itemAt(nextIndex = cursor); 1153 // assert nextItem != null; 1154 this.cursor = incCursor(cursor); 1155 } else { 1156 nextIndex = NONE; 1157 nextItem = null; 1158 } 1159 } finally { 1160 lock.unlock(); 1161 } 1162 return x; 1163 } 1164 remove()1165 public void remove() { 1166 // assert lock.getHoldCount() == 0; 1167 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1168 lock.lock(); 1169 try { 1170 if (!isDetached()) 1171 incorporateDequeues(); // might update lastRet or detach 1172 final int lastRet = this.lastRet; 1173 this.lastRet = NONE; 1174 if (lastRet >= 0) { 1175 if (!isDetached()) 1176 removeAt(lastRet); 1177 else { 1178 final E lastItem = this.lastItem; 1179 // assert lastItem != null; 1180 this.lastItem = null; 1181 if (itemAt(lastRet) == lastItem) 1182 removeAt(lastRet); 1183 } 1184 } else if (lastRet == NONE) 1185 throw new IllegalStateException(); 1186 // else lastRet == REMOVED and the last returned element was 1187 // previously asynchronously removed via an operation other 1188 // than this.remove(), so nothing to do. 1189 1190 if (cursor < 0 && nextIndex < 0) 1191 detach(); 1192 } finally { 1193 lock.unlock(); 1194 // assert lastRet == NONE; 1195 // assert lastItem == null; 1196 } 1197 } 1198 1199 /** 1200 * Called to notify the iterator that the queue is empty, or that it 1201 * has fallen hopelessly behind, so that it should abandon any 1202 * further iteration, except possibly to return one more element 1203 * from next(), as promised by returning true from hasNext(). 1204 */ shutdown()1205 void shutdown() { 1206 // assert lock.getHoldCount() == 1; 1207 cursor = NONE; 1208 if (nextIndex >= 0) 1209 nextIndex = REMOVED; 1210 if (lastRet >= 0) { 1211 lastRet = REMOVED; 1212 lastItem = null; 1213 } 1214 prevTakeIndex = DETACHED; 1215 // Don't set nextItem to null because we must continue to be 1216 // able to return it on next(). 1217 // 1218 // Caller will unlink from itrs when convenient. 1219 } 1220 distance(int index, int prevTakeIndex, int length)1221 private int distance(int index, int prevTakeIndex, int length) { 1222 int distance = index - prevTakeIndex; 1223 if (distance < 0) 1224 distance += length; 1225 return distance; 1226 } 1227 1228 /** 1229 * Called whenever an interior remove (not at takeIndex) occurred. 1230 * 1231 * @return true if this iterator should be unlinked from itrs 1232 */ removedAt(int removedIndex)1233 boolean removedAt(int removedIndex) { 1234 // assert lock.getHoldCount() == 1; 1235 if (isDetached()) 1236 return true; 1237 1238 final int takeIndex = ArrayBlockingQueue.this.takeIndex; 1239 final int prevTakeIndex = this.prevTakeIndex; 1240 final int len = items.length; 1241 // distance from prevTakeIndex to removedIndex 1242 final int removedDistance = 1243 len * (itrs.cycles - this.prevCycles 1244 + ((removedIndex < takeIndex) ? 1 : 0)) 1245 + (removedIndex - prevTakeIndex); 1246 // assert itrs.cycles - this.prevCycles >= 0; 1247 // assert itrs.cycles - this.prevCycles <= 1; 1248 // assert removedDistance > 0; 1249 // assert removedIndex != takeIndex; 1250 int cursor = this.cursor; 1251 if (cursor >= 0) { 1252 int x = distance(cursor, prevTakeIndex, len); 1253 if (x == removedDistance) { 1254 if (cursor == putIndex) 1255 this.cursor = cursor = NONE; 1256 } 1257 else if (x > removedDistance) { 1258 // assert cursor != prevTakeIndex; 1259 this.cursor = cursor = dec(cursor); 1260 } 1261 } 1262 int lastRet = this.lastRet; 1263 if (lastRet >= 0) { 1264 int x = distance(lastRet, prevTakeIndex, len); 1265 if (x == removedDistance) 1266 this.lastRet = lastRet = REMOVED; 1267 else if (x > removedDistance) 1268 this.lastRet = lastRet = dec(lastRet); 1269 } 1270 int nextIndex = this.nextIndex; 1271 if (nextIndex >= 0) { 1272 int x = distance(nextIndex, prevTakeIndex, len); 1273 if (x == removedDistance) 1274 this.nextIndex = nextIndex = REMOVED; 1275 else if (x > removedDistance) 1276 this.nextIndex = nextIndex = dec(nextIndex); 1277 } 1278 if (cursor < 0 && nextIndex < 0 && lastRet < 0) { 1279 this.prevTakeIndex = DETACHED; 1280 return true; 1281 } 1282 return false; 1283 } 1284 1285 /** 1286 * Called whenever takeIndex wraps around to zero. 1287 * 1288 * @return true if this iterator should be unlinked from itrs 1289 */ takeIndexWrapped()1290 boolean takeIndexWrapped() { 1291 // assert lock.getHoldCount() == 1; 1292 if (isDetached()) 1293 return true; 1294 if (itrs.cycles - prevCycles > 1) { 1295 // All the elements that existed at the time of the last 1296 // operation are gone, so abandon further iteration. 1297 shutdown(); 1298 return true; 1299 } 1300 return false; 1301 } 1302 1303 // /** Uncomment for debugging. */ 1304 // public String toString() { 1305 // return ("cursor=" + cursor + " " + 1306 // "nextIndex=" + nextIndex + " " + 1307 // "lastRet=" + lastRet + " " + 1308 // "nextItem=" + nextItem + " " + 1309 // "lastItem=" + lastItem + " " + 1310 // "prevCycles=" + prevCycles + " " + 1311 // "prevTakeIndex=" + prevTakeIndex + " " + 1312 // "size()=" + size() + " " + 1313 // "remainingCapacity()=" + remainingCapacity()); 1314 // } 1315 } 1316 1317 /** 1318 * Returns a {@link Spliterator} over the elements in this queue. 1319 * 1320 * <p>The returned spliterator is 1321 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 1322 * 1323 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT}, 1324 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}. 1325 * 1326 * @implNote 1327 * The {@code Spliterator} implements {@code trySplit} to permit limited 1328 * parallelism. 1329 * 1330 * @return a {@code Spliterator} over the elements in this queue 1331 * @since 1.8 1332 */ spliterator()1333 public Spliterator<E> spliterator() { 1334 return Spliterators.spliterator 1335 (this, (Spliterator.ORDERED | 1336 Spliterator.NONNULL | 1337 Spliterator.CONCURRENT)); 1338 } 1339 1340 } 1341