1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.lang.ref.WeakReference; 39 import java.util.AbstractQueue; 40 import java.util.Arrays; 41 import java.util.Collection; 42 import java.util.Iterator; 43 import java.util.NoSuchElementException; 44 import java.util.Objects; 45 import java.util.Spliterator; 46 import java.util.Spliterators; 47 import java.util.concurrent.locks.Condition; 48 import java.util.concurrent.locks.ReentrantLock; 49 import java.util.function.Consumer; 50 import java.util.function.Predicate; 51 52 /** 53 * A bounded {@linkplain BlockingQueue blocking queue} backed by an 54 * array. This queue orders elements FIFO (first-in-first-out). The 55 * <em>head</em> of the queue is that element that has been on the 56 * queue the longest time. The <em>tail</em> of the queue is that 57 * element that has been on the queue the shortest time. New elements 58 * are inserted at the tail of the queue, and the queue retrieval 59 * operations obtain elements at the head of the queue. 60 * 61 * <p>This is a classic "bounded buffer", in which a 62 * fixed-sized array holds elements inserted by producers and 63 * extracted by consumers. Once created, the capacity cannot be 64 * changed. Attempts to {@code put} an element into a full queue 65 * will result in the operation blocking; attempts to {@code take} an 66 * element from an empty queue will similarly block. 67 * 68 * <p>This class supports an optional fairness policy for ordering 69 * waiting producer and consumer threads. By default, this ordering 70 * is not guaranteed. However, a queue constructed with fairness set 71 * to {@code true} grants threads access in FIFO order. Fairness 72 * generally decreases throughput but reduces variability and avoids 73 * starvation. 74 * 75 * <p>This class and its iterator implement all of the <em>optional</em> 76 * methods of the {@link Collection} and {@link Iterator} interfaces. 77 * 78 * <p>This class is a member of the 79 * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework"> 80 * Java Collections Framework</a>. 81 * 82 * @since 1.5 83 * @author Doug Lea 84 * @param <E> the type of elements held in this queue 85 */ 86 public class ArrayBlockingQueue<E> extends AbstractQueue<E> 87 implements BlockingQueue<E>, java.io.Serializable { 88 89 /* 90 * Much of the implementation mechanics, especially the unusual 91 * nested loops, are shared and co-maintained with ArrayDeque. 92 */ 93 94 /** 95 * Serialization ID. This class relies on default serialization 96 * even for the items array, which is default-serialized, even if 97 * it is empty. Otherwise it could not be declared final, which is 98 * necessary here. 99 */ 100 private static final long serialVersionUID = -817911632652898426L; 101 102 /** The queued items */ 103 @SuppressWarnings("serial") // Conditionally serializable 104 final Object[] items; 105 106 /** items index for next take, poll, peek or remove */ 107 int takeIndex; 108 109 /** items index for next put, offer, or add */ 110 int putIndex; 111 112 /** Number of elements in the queue */ 113 int count; 114 115 /* 116 * Concurrency control uses the classic two-condition algorithm 117 * found in any textbook. 118 */ 119 120 /** Main lock guarding all access */ 121 final ReentrantLock lock; 122 123 /** Condition for waiting takes */ 124 @SuppressWarnings("serial") // Classes implementing Condition may be serializable. 125 private final Condition notEmpty; 126 127 /** Condition for waiting puts */ 128 @SuppressWarnings("serial") // Classes implementing Condition may be serializable. 129 private final Condition notFull; 130 131 /** 132 * Shared state for currently active iterators, or null if there 133 * are known not to be any. Allows queue operations to update 134 * iterator state. 135 */ 136 transient Itrs itrs; 137 138 // Internal helper methods 139 140 /** 141 * Increments i, mod modulus. 142 * Precondition and postcondition: 0 <= i < modulus. 143 */ inc(int i, int modulus)144 static final int inc(int i, int modulus) { 145 if (++i >= modulus) i = 0; 146 return i; 147 } 148 149 /** 150 * Decrements i, mod modulus. 151 * Precondition and postcondition: 0 <= i < modulus. 152 */ dec(int i, int modulus)153 static final int dec(int i, int modulus) { 154 if (--i < 0) i = modulus - 1; 155 return i; 156 } 157 158 /** 159 * Returns item at index i. 160 */ 161 @SuppressWarnings("unchecked") itemAt(int i)162 final E itemAt(int i) { 163 return (E) items[i]; 164 } 165 166 /** 167 * Returns element at array index i. 168 * This is a slight abuse of generics, accepted by javac. 169 */ 170 @SuppressWarnings("unchecked") itemAt(Object[] items, int i)171 static <E> E itemAt(Object[] items, int i) { 172 return (E) items[i]; 173 } 174 175 /** 176 * Inserts element at current put position, advances, and signals. 177 * Call only when holding lock. 178 */ enqueue(E e)179 private void enqueue(E e) { 180 // assert lock.isHeldByCurrentThread(); 181 // assert lock.getHoldCount() == 1; 182 // assert items[putIndex] == null; 183 final Object[] items = this.items; 184 items[putIndex] = e; 185 if (++putIndex == items.length) putIndex = 0; 186 count++; 187 notEmpty.signal(); 188 } 189 190 /** 191 * Extracts element at current take position, advances, and signals. 192 * Call only when holding lock. 193 */ dequeue()194 private E dequeue() { 195 // assert lock.isHeldByCurrentThread(); 196 // assert lock.getHoldCount() == 1; 197 // assert items[takeIndex] != null; 198 final Object[] items = this.items; 199 @SuppressWarnings("unchecked") 200 E e = (E) items[takeIndex]; 201 items[takeIndex] = null; 202 if (++takeIndex == items.length) takeIndex = 0; 203 count--; 204 if (itrs != null) 205 itrs.elementDequeued(); 206 notFull.signal(); 207 return e; 208 } 209 210 /** 211 * Deletes item at array index removeIndex. 212 * Utility for remove(Object) and iterator.remove. 213 * Call only when holding lock. 214 */ removeAt(final int removeIndex)215 void removeAt(final int removeIndex) { 216 // assert lock.isHeldByCurrentThread(); 217 // assert lock.getHoldCount() == 1; 218 // assert items[removeIndex] != null; 219 // assert removeIndex >= 0 && removeIndex < items.length; 220 final Object[] items = this.items; 221 if (removeIndex == takeIndex) { 222 // removing front item; just advance 223 items[takeIndex] = null; 224 if (++takeIndex == items.length) takeIndex = 0; 225 count--; 226 if (itrs != null) 227 itrs.elementDequeued(); 228 } else { 229 // an "interior" remove 230 231 // slide over all others up through putIndex. 232 for (int i = removeIndex, putIndex = this.putIndex;;) { 233 int pred = i; 234 if (++i == items.length) i = 0; 235 if (i == putIndex) { 236 items[pred] = null; 237 this.putIndex = pred; 238 break; 239 } 240 items[pred] = items[i]; 241 } 242 count--; 243 if (itrs != null) 244 itrs.removedAt(removeIndex); 245 } 246 notFull.signal(); 247 } 248 249 /** 250 * Creates an {@code ArrayBlockingQueue} with the given (fixed) 251 * capacity and default access policy. 252 * 253 * @param capacity the capacity of this queue 254 * @throws IllegalArgumentException if {@code capacity < 1} 255 */ ArrayBlockingQueue(int capacity)256 public ArrayBlockingQueue(int capacity) { 257 this(capacity, false); 258 } 259 260 /** 261 * Creates an {@code ArrayBlockingQueue} with the given (fixed) 262 * capacity and the specified access policy. 263 * 264 * @param capacity the capacity of this queue 265 * @param fair if {@code true} then queue accesses for threads blocked 266 * on insertion or removal, are processed in FIFO order; 267 * if {@code false} the access order is unspecified. 268 * @throws IllegalArgumentException if {@code capacity < 1} 269 */ ArrayBlockingQueue(int capacity, boolean fair)270 public ArrayBlockingQueue(int capacity, boolean fair) { 271 if (capacity <= 0) 272 throw new IllegalArgumentException(); 273 this.items = new Object[capacity]; 274 lock = new ReentrantLock(fair); 275 notEmpty = lock.newCondition(); 276 notFull = lock.newCondition(); 277 } 278 279 /** 280 * Creates an {@code ArrayBlockingQueue} with the given (fixed) 281 * capacity, the specified access policy and initially containing the 282 * elements of the given collection, 283 * added in traversal order of the collection's iterator. 284 * 285 * @param capacity the capacity of this queue 286 * @param fair if {@code true} then queue accesses for threads blocked 287 * on insertion or removal, are processed in FIFO order; 288 * if {@code false} the access order is unspecified. 289 * @param c the collection of elements to initially contain 290 * @throws IllegalArgumentException if {@code capacity} is less than 291 * {@code c.size()}, or less than 1. 292 * @throws NullPointerException if the specified collection or any 293 * of its elements are null 294 */ ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)295 public ArrayBlockingQueue(int capacity, boolean fair, 296 Collection<? extends E> c) { 297 this(capacity, fair); 298 299 final ReentrantLock lock = this.lock; 300 lock.lock(); // Lock only for visibility, not mutual exclusion 301 try { 302 final Object[] items = this.items; 303 int i = 0; 304 try { 305 for (E e : c) 306 items[i++] = Objects.requireNonNull(e); 307 } catch (ArrayIndexOutOfBoundsException ex) { 308 throw new IllegalArgumentException(); 309 } 310 count = i; 311 putIndex = (i == capacity) ? 0 : i; 312 } finally { 313 lock.unlock(); 314 } 315 } 316 317 /** 318 * Inserts the specified element at the tail of this queue if it is 319 * possible to do so immediately without exceeding the queue's capacity, 320 * returning {@code true} upon success and throwing an 321 * {@code IllegalStateException} if this queue is full. 322 * 323 * @param e the element to add 324 * @return {@code true} (as specified by {@link Collection#add}) 325 * @throws IllegalStateException if this queue is full 326 * @throws NullPointerException if the specified element is null 327 */ add(E e)328 public boolean add(E e) { 329 return super.add(e); 330 } 331 332 /** 333 * Inserts the specified element at the tail of this queue if it is 334 * possible to do so immediately without exceeding the queue's capacity, 335 * returning {@code true} upon success and {@code false} if this queue 336 * is full. This method is generally preferable to method {@link #add}, 337 * which can fail to insert an element only by throwing an exception. 338 * 339 * @throws NullPointerException if the specified element is null 340 */ offer(E e)341 public boolean offer(E e) { 342 Objects.requireNonNull(e); 343 final ReentrantLock lock = this.lock; 344 lock.lock(); 345 try { 346 if (count == items.length) 347 return false; 348 else { 349 enqueue(e); 350 return true; 351 } 352 } finally { 353 lock.unlock(); 354 } 355 } 356 357 /** 358 * Inserts the specified element at the tail of this queue, waiting 359 * for space to become available if the queue is full. 360 * 361 * @throws InterruptedException {@inheritDoc} 362 * @throws NullPointerException {@inheritDoc} 363 */ put(E e)364 public void put(E e) throws InterruptedException { 365 Objects.requireNonNull(e); 366 final ReentrantLock lock = this.lock; 367 lock.lockInterruptibly(); 368 try { 369 while (count == items.length) 370 notFull.await(); 371 enqueue(e); 372 } finally { 373 lock.unlock(); 374 } 375 } 376 377 /** 378 * Inserts the specified element at the tail of this queue, waiting 379 * up to the specified wait time for space to become available if 380 * the queue is full. 381 * 382 * @throws InterruptedException {@inheritDoc} 383 * @throws NullPointerException {@inheritDoc} 384 */ offer(E e, long timeout, TimeUnit unit)385 public boolean offer(E e, long timeout, TimeUnit unit) 386 throws InterruptedException { 387 388 Objects.requireNonNull(e); 389 long nanos = unit.toNanos(timeout); 390 final ReentrantLock lock = this.lock; 391 lock.lockInterruptibly(); 392 try { 393 while (count == items.length) { 394 if (nanos <= 0L) 395 return false; 396 nanos = notFull.awaitNanos(nanos); 397 } 398 enqueue(e); 399 return true; 400 } finally { 401 lock.unlock(); 402 } 403 } 404 poll()405 public E poll() { 406 final ReentrantLock lock = this.lock; 407 lock.lock(); 408 try { 409 return (count == 0) ? null : dequeue(); 410 } finally { 411 lock.unlock(); 412 } 413 } 414 take()415 public E take() throws InterruptedException { 416 final ReentrantLock lock = this.lock; 417 lock.lockInterruptibly(); 418 try { 419 while (count == 0) 420 notEmpty.await(); 421 return dequeue(); 422 } finally { 423 lock.unlock(); 424 } 425 } 426 poll(long timeout, TimeUnit unit)427 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 428 long nanos = unit.toNanos(timeout); 429 final ReentrantLock lock = this.lock; 430 lock.lockInterruptibly(); 431 try { 432 while (count == 0) { 433 if (nanos <= 0L) 434 return null; 435 nanos = notEmpty.awaitNanos(nanos); 436 } 437 return dequeue(); 438 } finally { 439 lock.unlock(); 440 } 441 } 442 peek()443 public E peek() { 444 final ReentrantLock lock = this.lock; 445 lock.lock(); 446 try { 447 return itemAt(takeIndex); // null when queue is empty 448 } finally { 449 lock.unlock(); 450 } 451 } 452 453 // this doc comment is overridden to remove the reference to collections 454 // greater in size than Integer.MAX_VALUE 455 /** 456 * Returns the number of elements in this queue. 457 * 458 * @return the number of elements in this queue 459 */ size()460 public int size() { 461 final ReentrantLock lock = this.lock; 462 lock.lock(); 463 try { 464 return count; 465 } finally { 466 lock.unlock(); 467 } 468 } 469 470 // this doc comment is a modified copy of the inherited doc comment, 471 // without the reference to unlimited queues. 472 /** 473 * Returns the number of additional elements that this queue can ideally 474 * (in the absence of memory or resource constraints) accept without 475 * blocking. This is always equal to the initial capacity of this queue 476 * less the current {@code size} of this queue. 477 * 478 * <p>Note that you <em>cannot</em> always tell if an attempt to insert 479 * an element will succeed by inspecting {@code remainingCapacity} 480 * because it may be the case that another thread is about to 481 * insert or remove an element. 482 */ remainingCapacity()483 public int remainingCapacity() { 484 final ReentrantLock lock = this.lock; 485 lock.lock(); 486 try { 487 return items.length - count; 488 } finally { 489 lock.unlock(); 490 } 491 } 492 493 /** 494 * Removes a single instance of the specified element from this queue, 495 * if it is present. More formally, removes an element {@code e} such 496 * that {@code o.equals(e)}, if this queue contains one or more such 497 * elements. 498 * Returns {@code true} if this queue contained the specified element 499 * (or equivalently, if this queue changed as a result of the call). 500 * 501 * <p>Removal of interior elements in circular array based queues 502 * is an intrinsically slow and disruptive operation, so should 503 * be undertaken only in exceptional circumstances, ideally 504 * only when the queue is known not to be accessible by other 505 * threads. 506 * 507 * @param o element to be removed from this queue, if present 508 * @return {@code true} if this queue changed as a result of the call 509 */ remove(Object o)510 public boolean remove(Object o) { 511 if (o == null) return false; 512 final ReentrantLock lock = this.lock; 513 lock.lock(); 514 try { 515 if (count > 0) { 516 final Object[] items = this.items; 517 for (int i = takeIndex, end = putIndex, 518 to = (i < end) ? end : items.length; 519 ; i = 0, to = end) { 520 for (; i < to; i++) 521 if (o.equals(items[i])) { 522 removeAt(i); 523 return true; 524 } 525 if (to == end) break; 526 } 527 } 528 return false; 529 } finally { 530 lock.unlock(); 531 } 532 } 533 534 /** 535 * Returns {@code true} if this queue contains the specified element. 536 * More formally, returns {@code true} if and only if this queue contains 537 * at least one element {@code e} such that {@code o.equals(e)}. 538 * 539 * @param o object to be checked for containment in this queue 540 * @return {@code true} if this queue contains the specified element 541 */ contains(Object o)542 public boolean contains(Object o) { 543 if (o == null) return false; 544 final ReentrantLock lock = this.lock; 545 lock.lock(); 546 try { 547 if (count > 0) { 548 final Object[] items = this.items; 549 for (int i = takeIndex, end = putIndex, 550 to = (i < end) ? end : items.length; 551 ; i = 0, to = end) { 552 for (; i < to; i++) 553 if (o.equals(items[i])) 554 return true; 555 if (to == end) break; 556 } 557 } 558 return false; 559 } finally { 560 lock.unlock(); 561 } 562 } 563 564 /** 565 * Returns an array containing all of the elements in this queue, in 566 * proper sequence. 567 * 568 * <p>The returned array will be "safe" in that no references to it are 569 * maintained by this queue. (In other words, this method must allocate 570 * a new array). The caller is thus free to modify the returned array. 571 * 572 * <p>This method acts as bridge between array-based and collection-based 573 * APIs. 574 * 575 * @return an array containing all of the elements in this queue 576 */ toArray()577 public Object[] toArray() { 578 final ReentrantLock lock = this.lock; 579 lock.lock(); 580 try { 581 final Object[] items = this.items; 582 final int end = takeIndex + count; 583 final Object[] a = Arrays.copyOfRange(items, takeIndex, end); 584 if (end != putIndex) 585 System.arraycopy(items, 0, a, items.length - takeIndex, putIndex); 586 return a; 587 } finally { 588 lock.unlock(); 589 } 590 } 591 592 /** 593 * Returns an array containing all of the elements in this queue, in 594 * proper sequence; the runtime type of the returned array is that of 595 * the specified array. If the queue fits in the specified array, it 596 * is returned therein. Otherwise, a new array is allocated with the 597 * runtime type of the specified array and the size of this queue. 598 * 599 * <p>If this queue fits in the specified array with room to spare 600 * (i.e., the array has more elements than this queue), the element in 601 * the array immediately following the end of the queue is set to 602 * {@code null}. 603 * 604 * <p>Like the {@link #toArray()} method, this method acts as bridge between 605 * array-based and collection-based APIs. Further, this method allows 606 * precise control over the runtime type of the output array, and may, 607 * under certain circumstances, be used to save allocation costs. 608 * 609 * <p>Suppose {@code x} is a queue known to contain only strings. 610 * The following code can be used to dump the queue into a newly 611 * allocated array of {@code String}: 612 * 613 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre> 614 * 615 * Note that {@code toArray(new Object[0])} is identical in function to 616 * {@code toArray()}. 617 * 618 * @param a the array into which the elements of the queue are to 619 * be stored, if it is big enough; otherwise, a new array of the 620 * same runtime type is allocated for this purpose 621 * @return an array containing all of the elements in this queue 622 * @throws ArrayStoreException if the runtime type of the specified array 623 * is not a supertype of the runtime type of every element in 624 * this queue 625 * @throws NullPointerException if the specified array is null 626 */ 627 @SuppressWarnings("unchecked") toArray(T[] a)628 public <T> T[] toArray(T[] a) { 629 final ReentrantLock lock = this.lock; 630 lock.lock(); 631 try { 632 final Object[] items = this.items; 633 final int count = this.count; 634 final int firstLeg = Math.min(items.length - takeIndex, count); 635 if (a.length < count) { 636 a = (T[]) Arrays.copyOfRange(items, takeIndex, takeIndex + count, 637 a.getClass()); 638 } else { 639 System.arraycopy(items, takeIndex, a, 0, firstLeg); 640 if (a.length > count) 641 a[count] = null; 642 } 643 if (firstLeg < count) 644 System.arraycopy(items, 0, a, firstLeg, putIndex); 645 return a; 646 } finally { 647 lock.unlock(); 648 } 649 } 650 toString()651 public String toString() { 652 return Helpers.collectionToString(this); 653 } 654 655 /** 656 * Atomically removes all of the elements from this queue. 657 * The queue will be empty after this call returns. 658 */ clear()659 public void clear() { 660 final ReentrantLock lock = this.lock; 661 lock.lock(); 662 try { 663 int k; 664 if ((k = count) > 0) { 665 circularClear(items, takeIndex, putIndex); 666 takeIndex = putIndex; 667 count = 0; 668 if (itrs != null) 669 itrs.queueIsEmpty(); 670 for (; k > 0 && lock.hasWaiters(notFull); k--) 671 notFull.signal(); 672 } 673 } finally { 674 lock.unlock(); 675 } 676 } 677 678 /** 679 * Nulls out slots starting at array index i, upto index end. 680 * Condition i == end means "full" - the entire array is cleared. 681 */ circularClear(Object[] items, int i, int end)682 private static void circularClear(Object[] items, int i, int end) { 683 // assert 0 <= i && i < items.length; 684 // assert 0 <= end && end < items.length; 685 for (int to = (i < end) ? end : items.length; 686 ; i = 0, to = end) { 687 for (; i < to; i++) items[i] = null; 688 if (to == end) break; 689 } 690 } 691 692 /** 693 * @throws UnsupportedOperationException {@inheritDoc} 694 * @throws ClassCastException {@inheritDoc} 695 * @throws NullPointerException {@inheritDoc} 696 * @throws IllegalArgumentException {@inheritDoc} 697 */ drainTo(Collection<? super E> c)698 public int drainTo(Collection<? super E> c) { 699 return drainTo(c, Integer.MAX_VALUE); 700 } 701 702 /** 703 * @throws UnsupportedOperationException {@inheritDoc} 704 * @throws ClassCastException {@inheritDoc} 705 * @throws NullPointerException {@inheritDoc} 706 * @throws IllegalArgumentException {@inheritDoc} 707 */ drainTo(Collection<? super E> c, int maxElements)708 public int drainTo(Collection<? super E> c, int maxElements) { 709 Objects.requireNonNull(c); 710 if (c == this) 711 throw new IllegalArgumentException(); 712 if (maxElements <= 0) 713 return 0; 714 final Object[] items = this.items; 715 final ReentrantLock lock = this.lock; 716 lock.lock(); 717 try { 718 int n = Math.min(maxElements, count); 719 int take = takeIndex; 720 int i = 0; 721 try { 722 while (i < n) { 723 @SuppressWarnings("unchecked") 724 E e = (E) items[take]; 725 c.add(e); 726 items[take] = null; 727 if (++take == items.length) take = 0; 728 i++; 729 } 730 return n; 731 } finally { 732 // Restore invariants even if c.add() threw 733 if (i > 0) { 734 count -= i; 735 takeIndex = take; 736 if (itrs != null) { 737 if (count == 0) 738 itrs.queueIsEmpty(); 739 else if (i > take) 740 itrs.takeIndexWrapped(); 741 } 742 for (; i > 0 && lock.hasWaiters(notFull); i--) 743 notFull.signal(); 744 } 745 } 746 } finally { 747 lock.unlock(); 748 } 749 } 750 751 /** 752 * Returns an iterator over the elements in this queue in proper sequence. 753 * The elements will be returned in order from first (head) to last (tail). 754 * 755 * <p>The returned iterator is 756 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 757 * 758 * @return an iterator over the elements in this queue in proper sequence 759 */ iterator()760 public Iterator<E> iterator() { 761 return new Itr(); 762 } 763 764 /** 765 * Shared data between iterators and their queue, allowing queue 766 * modifications to update iterators when elements are removed. 767 * 768 * This adds a lot of complexity for the sake of correctly 769 * handling some uncommon operations, but the combination of 770 * circular-arrays and supporting interior removes (i.e., those 771 * not at head) would cause iterators to sometimes lose their 772 * places and/or (re)report elements they shouldn't. To avoid 773 * this, when a queue has one or more iterators, it keeps iterator 774 * state consistent by: 775 * 776 * (1) keeping track of the number of "cycles", that is, the 777 * number of times takeIndex has wrapped around to 0. 778 * (2) notifying all iterators via the callback removedAt whenever 779 * an interior element is removed (and thus other elements may 780 * be shifted). 781 * 782 * These suffice to eliminate iterator inconsistencies, but 783 * unfortunately add the secondary responsibility of maintaining 784 * the list of iterators. We track all active iterators in a 785 * simple linked list (accessed only when the queue's lock is 786 * held) of weak references to Itr. The list is cleaned up using 787 * 3 different mechanisms: 788 * 789 * (1) Whenever a new iterator is created, do some O(1) checking for 790 * stale list elements. 791 * 792 * (2) Whenever takeIndex wraps around to 0, check for iterators 793 * that have been unused for more than one wrap-around cycle. 794 * 795 * (3) Whenever the queue becomes empty, all iterators are notified 796 * and this entire data structure is discarded. 797 * 798 * So in addition to the removedAt callback that is necessary for 799 * correctness, iterators have the shutdown and takeIndexWrapped 800 * callbacks that help remove stale iterators from the list. 801 * 802 * Whenever a list element is examined, it is expunged if either 803 * the GC has determined that the iterator is discarded, or if the 804 * iterator reports that it is "detached" (does not need any 805 * further state updates). Overhead is maximal when takeIndex 806 * never advances, iterators are discarded before they are 807 * exhausted, and all removals are interior removes, in which case 808 * all stale iterators are discovered by the GC. But even in this 809 * case we don't increase the amortized complexity. 810 * 811 * Care must be taken to keep list sweeping methods from 812 * reentrantly invoking another such method, causing subtle 813 * corruption bugs. 814 */ 815 class Itrs { 816 817 /** 818 * Node in a linked list of weak iterator references. 819 */ 820 private class Node extends WeakReference<Itr> { 821 Node next; 822 Node(Itr iterator, Node next)823 Node(Itr iterator, Node next) { 824 super(iterator); 825 this.next = next; 826 } 827 } 828 829 /** Incremented whenever takeIndex wraps around to 0 */ 830 int cycles; 831 832 /** Linked list of weak iterator references */ 833 private Node head; 834 835 /** Used to expunge stale iterators */ 836 private Node sweeper; 837 838 private static final int SHORT_SWEEP_PROBES = 4; 839 private static final int LONG_SWEEP_PROBES = 16; 840 Itrs(Itr initial)841 Itrs(Itr initial) { 842 register(initial); 843 } 844 845 /** 846 * Sweeps itrs, looking for and expunging stale iterators. 847 * If at least one was found, tries harder to find more. 848 * Called only from iterating thread. 849 * 850 * @param tryHarder whether to start in try-harder mode, because 851 * there is known to be at least one iterator to collect 852 */ doSomeSweeping(boolean tryHarder)853 void doSomeSweeping(boolean tryHarder) { 854 // assert lock.isHeldByCurrentThread(); 855 // assert head != null; 856 int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES; 857 Node o, p; 858 final Node sweeper = this.sweeper; 859 boolean passedGo; // to limit search to one full sweep 860 861 if (sweeper == null) { 862 o = null; 863 p = head; 864 passedGo = true; 865 } else { 866 o = sweeper; 867 p = o.next; 868 passedGo = false; 869 } 870 871 for (; probes > 0; probes--) { 872 if (p == null) { 873 if (passedGo) 874 break; 875 o = null; 876 p = head; 877 passedGo = true; 878 } 879 final Itr it = p.get(); 880 final Node next = p.next; 881 if (it == null || it.isDetached()) { 882 // found a discarded/exhausted iterator 883 probes = LONG_SWEEP_PROBES; // "try harder" 884 // unlink p 885 p.clear(); 886 p.next = null; 887 if (o == null) { 888 head = next; 889 if (next == null) { 890 // We've run out of iterators to track; retire 891 itrs = null; 892 return; 893 } 894 } 895 else 896 o.next = next; 897 } else { 898 o = p; 899 } 900 p = next; 901 } 902 903 this.sweeper = (p == null) ? null : o; 904 } 905 906 /** 907 * Adds a new iterator to the linked list of tracked iterators. 908 */ register(Itr itr)909 void register(Itr itr) { 910 // assert lock.isHeldByCurrentThread(); 911 head = new Node(itr, head); 912 } 913 914 /** 915 * Called whenever takeIndex wraps around to 0. 916 * 917 * Notifies all iterators, and expunges any that are now stale. 918 */ takeIndexWrapped()919 void takeIndexWrapped() { 920 // assert lock.isHeldByCurrentThread(); 921 cycles++; 922 for (Node o = null, p = head; p != null;) { 923 final Itr it = p.get(); 924 final Node next = p.next; 925 if (it == null || it.takeIndexWrapped()) { 926 // unlink p 927 // assert it == null || it.isDetached(); 928 p.clear(); 929 p.next = null; 930 if (o == null) 931 head = next; 932 else 933 o.next = next; 934 } else { 935 o = p; 936 } 937 p = next; 938 } 939 if (head == null) // no more iterators to track 940 itrs = null; 941 } 942 943 /** 944 * Called whenever an interior remove (not at takeIndex) occurred. 945 * 946 * Notifies all iterators, and expunges any that are now stale. 947 */ removedAt(int removedIndex)948 void removedAt(int removedIndex) { 949 for (Node o = null, p = head; p != null;) { 950 final Itr it = p.get(); 951 final Node next = p.next; 952 if (it == null || it.removedAt(removedIndex)) { 953 // unlink p 954 // assert it == null || it.isDetached(); 955 p.clear(); 956 p.next = null; 957 if (o == null) 958 head = next; 959 else 960 o.next = next; 961 } else { 962 o = p; 963 } 964 p = next; 965 } 966 if (head == null) // no more iterators to track 967 itrs = null; 968 } 969 970 /** 971 * Called whenever the queue becomes empty. 972 * 973 * Notifies all active iterators that the queue is empty, 974 * clears all weak refs, and unlinks the itrs datastructure. 975 */ queueIsEmpty()976 void queueIsEmpty() { 977 // assert lock.isHeldByCurrentThread(); 978 for (Node p = head; p != null; p = p.next) { 979 Itr it = p.get(); 980 if (it != null) { 981 p.clear(); 982 it.shutdown(); 983 } 984 } 985 head = null; 986 itrs = null; 987 } 988 989 /** 990 * Called whenever an element has been dequeued (at takeIndex). 991 */ elementDequeued()992 void elementDequeued() { 993 // assert lock.isHeldByCurrentThread(); 994 if (count == 0) 995 queueIsEmpty(); 996 else if (takeIndex == 0) 997 takeIndexWrapped(); 998 } 999 } 1000 1001 /** 1002 * Iterator for ArrayBlockingQueue. 1003 * 1004 * To maintain weak consistency with respect to puts and takes, we 1005 * read ahead one slot, so as to not report hasNext true but then 1006 * not have an element to return. 1007 * 1008 * We switch into "detached" mode (allowing prompt unlinking from 1009 * itrs without help from the GC) when all indices are negative, or 1010 * when hasNext returns false for the first time. This allows the 1011 * iterator to track concurrent updates completely accurately, 1012 * except for the corner case of the user calling Iterator.remove() 1013 * after hasNext() returned false. Even in this case, we ensure 1014 * that we don't remove the wrong element by keeping track of the 1015 * expected element to remove, in lastItem. Yes, we may fail to 1016 * remove lastItem from the queue if it moved due to an interleaved 1017 * interior remove while in detached mode. 1018 * 1019 * Method forEachRemaining, added in Java 8, is treated similarly 1020 * to hasNext returning false, in that we switch to detached mode, 1021 * but we regard it as an even stronger request to "close" this 1022 * iteration, and don't bother supporting subsequent remove(). 1023 */ 1024 private class Itr implements Iterator<E> { 1025 /** Index to look for new nextItem; NONE at end */ 1026 private int cursor; 1027 1028 /** Element to be returned by next call to next(); null if none */ 1029 private E nextItem; 1030 1031 /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */ 1032 private int nextIndex; 1033 1034 /** Last element returned; null if none or not detached. */ 1035 private E lastItem; 1036 1037 /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */ 1038 private int lastRet; 1039 1040 /** Previous value of takeIndex, or DETACHED when detached */ 1041 private int prevTakeIndex; 1042 1043 /** Previous value of iters.cycles */ 1044 private int prevCycles; 1045 1046 /** Special index value indicating "not available" or "undefined" */ 1047 private static final int NONE = -1; 1048 1049 /** 1050 * Special index value indicating "removed elsewhere", that is, 1051 * removed by some operation other than a call to this.remove(). 1052 */ 1053 private static final int REMOVED = -2; 1054 1055 /** Special value for prevTakeIndex indicating "detached mode" */ 1056 private static final int DETACHED = -3; 1057 Itr()1058 Itr() { 1059 lastRet = NONE; 1060 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1061 lock.lock(); 1062 try { 1063 if (count == 0) { 1064 // assert itrs == null; 1065 cursor = NONE; 1066 nextIndex = NONE; 1067 prevTakeIndex = DETACHED; 1068 } else { 1069 final int takeIndex = ArrayBlockingQueue.this.takeIndex; 1070 prevTakeIndex = takeIndex; 1071 nextItem = itemAt(nextIndex = takeIndex); 1072 cursor = incCursor(takeIndex); 1073 if (itrs == null) { 1074 itrs = new Itrs(this); 1075 } else { 1076 itrs.register(this); // in this order 1077 itrs.doSomeSweeping(false); 1078 } 1079 prevCycles = itrs.cycles; 1080 // assert takeIndex >= 0; 1081 // assert prevTakeIndex == takeIndex; 1082 // assert nextIndex >= 0; 1083 // assert nextItem != null; 1084 } 1085 } finally { 1086 lock.unlock(); 1087 } 1088 } 1089 isDetached()1090 boolean isDetached() { 1091 // assert lock.isHeldByCurrentThread(); 1092 return prevTakeIndex < 0; 1093 } 1094 incCursor(int index)1095 private int incCursor(int index) { 1096 // assert lock.isHeldByCurrentThread(); 1097 if (++index == items.length) index = 0; 1098 if (index == putIndex) index = NONE; 1099 return index; 1100 } 1101 1102 /** 1103 * Returns true if index is invalidated by the given number of 1104 * dequeues, starting from prevTakeIndex. 1105 */ invalidated(int index, int prevTakeIndex, long dequeues, int length)1106 private boolean invalidated(int index, int prevTakeIndex, 1107 long dequeues, int length) { 1108 if (index < 0) 1109 return false; 1110 int distance = index - prevTakeIndex; 1111 if (distance < 0) 1112 distance += length; 1113 return dequeues > distance; 1114 } 1115 1116 /** 1117 * Adjusts indices to incorporate all dequeues since the last 1118 * operation on this iterator. Call only from iterating thread. 1119 */ incorporateDequeues()1120 private void incorporateDequeues() { 1121 // assert lock.isHeldByCurrentThread(); 1122 // assert itrs != null; 1123 // assert !isDetached(); 1124 // assert count > 0; 1125 1126 final int cycles = itrs.cycles; 1127 final int takeIndex = ArrayBlockingQueue.this.takeIndex; 1128 final int prevCycles = this.prevCycles; 1129 final int prevTakeIndex = this.prevTakeIndex; 1130 1131 if (cycles != prevCycles || takeIndex != prevTakeIndex) { 1132 final int len = items.length; 1133 // how far takeIndex has advanced since the previous 1134 // operation of this iterator 1135 long dequeues = (long) (cycles - prevCycles) * len 1136 + (takeIndex - prevTakeIndex); 1137 1138 // Check indices for invalidation 1139 if (invalidated(lastRet, prevTakeIndex, dequeues, len)) 1140 lastRet = REMOVED; 1141 if (invalidated(nextIndex, prevTakeIndex, dequeues, len)) 1142 nextIndex = REMOVED; 1143 if (invalidated(cursor, prevTakeIndex, dequeues, len)) 1144 cursor = takeIndex; 1145 1146 if (cursor < 0 && nextIndex < 0 && lastRet < 0) 1147 detach(); 1148 else { 1149 this.prevCycles = cycles; 1150 this.prevTakeIndex = takeIndex; 1151 } 1152 } 1153 } 1154 1155 /** 1156 * Called when itrs should stop tracking this iterator, either 1157 * because there are no more indices to update (cursor < 0 && 1158 * nextIndex < 0 && lastRet < 0) or as a special exception, when 1159 * lastRet >= 0, because hasNext() is about to return false for the 1160 * first time. Call only from iterating thread. 1161 */ detach()1162 private void detach() { 1163 // Switch to detached mode 1164 // assert lock.isHeldByCurrentThread(); 1165 // assert cursor == NONE; 1166 // assert nextIndex < 0; 1167 // assert lastRet < 0 || nextItem == null; 1168 // assert lastRet < 0 ^ lastItem != null; 1169 if (prevTakeIndex >= 0) { 1170 // assert itrs != null; 1171 prevTakeIndex = DETACHED; 1172 // try to unlink from itrs (but not too hard) 1173 itrs.doSomeSweeping(true); 1174 } 1175 } 1176 1177 /** 1178 * For performance reasons, we would like not to acquire a lock in 1179 * hasNext in the common case. To allow for this, we only access 1180 * fields (i.e. nextItem) that are not modified by update operations 1181 * triggered by queue modifications. 1182 */ hasNext()1183 public boolean hasNext() { 1184 if (nextItem != null) 1185 return true; 1186 noNext(); 1187 return false; 1188 } 1189 noNext()1190 private void noNext() { 1191 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1192 lock.lock(); 1193 try { 1194 // assert cursor == NONE; 1195 // assert nextIndex == NONE; 1196 if (!isDetached()) { 1197 // assert lastRet >= 0; 1198 incorporateDequeues(); // might update lastRet 1199 if (lastRet >= 0) { 1200 lastItem = itemAt(lastRet); 1201 // assert lastItem != null; 1202 detach(); 1203 } 1204 } 1205 // assert isDetached(); 1206 // assert lastRet < 0 ^ lastItem != null; 1207 } finally { 1208 lock.unlock(); 1209 } 1210 } 1211 next()1212 public E next() { 1213 final E e = nextItem; 1214 if (e == null) 1215 throw new NoSuchElementException(); 1216 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1217 lock.lock(); 1218 try { 1219 if (!isDetached()) 1220 incorporateDequeues(); 1221 // assert nextIndex != NONE; 1222 // assert lastItem == null; 1223 lastRet = nextIndex; 1224 final int cursor = this.cursor; 1225 if (cursor >= 0) { 1226 nextItem = itemAt(nextIndex = cursor); 1227 // assert nextItem != null; 1228 this.cursor = incCursor(cursor); 1229 } else { 1230 nextIndex = NONE; 1231 nextItem = null; 1232 if (lastRet == REMOVED) detach(); 1233 } 1234 } finally { 1235 lock.unlock(); 1236 } 1237 return e; 1238 } 1239 forEachRemaining(Consumer<? super E> action)1240 public void forEachRemaining(Consumer<? super E> action) { 1241 Objects.requireNonNull(action); 1242 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1243 lock.lock(); 1244 try { 1245 final E e = nextItem; 1246 if (e == null) return; 1247 if (!isDetached()) 1248 incorporateDequeues(); 1249 action.accept(e); 1250 if (isDetached() || cursor < 0) return; 1251 final Object[] items = ArrayBlockingQueue.this.items; 1252 for (int i = cursor, end = putIndex, 1253 to = (i < end) ? end : items.length; 1254 ; i = 0, to = end) { 1255 for (; i < to; i++) 1256 action.accept(itemAt(items, i)); 1257 if (to == end) break; 1258 } 1259 } finally { 1260 // Calling forEachRemaining is a strong hint that this 1261 // iteration is surely over; supporting remove() after 1262 // forEachRemaining() is more trouble than it's worth 1263 cursor = nextIndex = lastRet = NONE; 1264 nextItem = lastItem = null; 1265 detach(); 1266 lock.unlock(); 1267 } 1268 } 1269 remove()1270 public void remove() { 1271 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1272 lock.lock(); 1273 // assert lock.getHoldCount() == 1; 1274 try { 1275 if (!isDetached()) 1276 incorporateDequeues(); // might update lastRet or detach 1277 final int lastRet = this.lastRet; 1278 this.lastRet = NONE; 1279 if (lastRet >= 0) { 1280 if (!isDetached()) 1281 removeAt(lastRet); 1282 else { 1283 final E lastItem = this.lastItem; 1284 // assert lastItem != null; 1285 this.lastItem = null; 1286 if (itemAt(lastRet) == lastItem) 1287 removeAt(lastRet); 1288 } 1289 } else if (lastRet == NONE) 1290 throw new IllegalStateException(); 1291 // else lastRet == REMOVED and the last returned element was 1292 // previously asynchronously removed via an operation other 1293 // than this.remove(), so nothing to do. 1294 1295 if (cursor < 0 && nextIndex < 0) 1296 detach(); 1297 } finally { 1298 lock.unlock(); 1299 // assert lastRet == NONE; 1300 // assert lastItem == null; 1301 } 1302 } 1303 1304 /** 1305 * Called to notify the iterator that the queue is empty, or that it 1306 * has fallen hopelessly behind, so that it should abandon any 1307 * further iteration, except possibly to return one more element 1308 * from next(), as promised by returning true from hasNext(). 1309 */ shutdown()1310 void shutdown() { 1311 // assert lock.isHeldByCurrentThread(); 1312 cursor = NONE; 1313 if (nextIndex >= 0) 1314 nextIndex = REMOVED; 1315 if (lastRet >= 0) { 1316 lastRet = REMOVED; 1317 lastItem = null; 1318 } 1319 prevTakeIndex = DETACHED; 1320 // Don't set nextItem to null because we must continue to be 1321 // able to return it on next(). 1322 // 1323 // Caller will unlink from itrs when convenient. 1324 } 1325 distance(int index, int prevTakeIndex, int length)1326 private int distance(int index, int prevTakeIndex, int length) { 1327 int distance = index - prevTakeIndex; 1328 if (distance < 0) 1329 distance += length; 1330 return distance; 1331 } 1332 1333 /** 1334 * Called whenever an interior remove (not at takeIndex) occurred. 1335 * 1336 * @return true if this iterator should be unlinked from itrs 1337 */ removedAt(int removedIndex)1338 boolean removedAt(int removedIndex) { 1339 // assert lock.isHeldByCurrentThread(); 1340 if (isDetached()) 1341 return true; 1342 1343 final int takeIndex = ArrayBlockingQueue.this.takeIndex; 1344 final int prevTakeIndex = this.prevTakeIndex; 1345 final int len = items.length; 1346 // distance from prevTakeIndex to removedIndex 1347 final int removedDistance = 1348 len * (itrs.cycles - this.prevCycles 1349 + ((removedIndex < takeIndex) ? 1 : 0)) 1350 + (removedIndex - prevTakeIndex); 1351 // assert itrs.cycles - this.prevCycles >= 0; 1352 // assert itrs.cycles - this.prevCycles <= 1; 1353 // assert removedDistance > 0; 1354 // assert removedIndex != takeIndex; 1355 int cursor = this.cursor; 1356 if (cursor >= 0) { 1357 int x = distance(cursor, prevTakeIndex, len); 1358 if (x == removedDistance) { 1359 if (cursor == putIndex) 1360 this.cursor = cursor = NONE; 1361 } 1362 else if (x > removedDistance) { 1363 // assert cursor != prevTakeIndex; 1364 this.cursor = cursor = dec(cursor, len); 1365 } 1366 } 1367 int lastRet = this.lastRet; 1368 if (lastRet >= 0) { 1369 int x = distance(lastRet, prevTakeIndex, len); 1370 if (x == removedDistance) 1371 this.lastRet = lastRet = REMOVED; 1372 else if (x > removedDistance) 1373 this.lastRet = lastRet = dec(lastRet, len); 1374 } 1375 int nextIndex = this.nextIndex; 1376 if (nextIndex >= 0) { 1377 int x = distance(nextIndex, prevTakeIndex, len); 1378 if (x == removedDistance) 1379 this.nextIndex = nextIndex = REMOVED; 1380 else if (x > removedDistance) 1381 this.nextIndex = nextIndex = dec(nextIndex, len); 1382 } 1383 if (cursor < 0 && nextIndex < 0 && lastRet < 0) { 1384 this.prevTakeIndex = DETACHED; 1385 return true; 1386 } 1387 return false; 1388 } 1389 1390 /** 1391 * Called whenever takeIndex wraps around to zero. 1392 * 1393 * @return true if this iterator should be unlinked from itrs 1394 */ takeIndexWrapped()1395 boolean takeIndexWrapped() { 1396 // assert lock.isHeldByCurrentThread(); 1397 if (isDetached()) 1398 return true; 1399 if (itrs.cycles - prevCycles > 1) { 1400 // All the elements that existed at the time of the last 1401 // operation are gone, so abandon further iteration. 1402 shutdown(); 1403 return true; 1404 } 1405 return false; 1406 } 1407 1408 // /** Uncomment for debugging. */ 1409 // public String toString() { 1410 // return ("cursor=" + cursor + " " + 1411 // "nextIndex=" + nextIndex + " " + 1412 // "lastRet=" + lastRet + " " + 1413 // "nextItem=" + nextItem + " " + 1414 // "lastItem=" + lastItem + " " + 1415 // "prevCycles=" + prevCycles + " " + 1416 // "prevTakeIndex=" + prevTakeIndex + " " + 1417 // "size()=" + size() + " " + 1418 // "remainingCapacity()=" + remainingCapacity()); 1419 // } 1420 } 1421 1422 /** 1423 * Returns a {@link Spliterator} over the elements in this queue. 1424 * 1425 * <p>The returned spliterator is 1426 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 1427 * 1428 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT}, 1429 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}. 1430 * 1431 * @implNote 1432 * The {@code Spliterator} implements {@code trySplit} to permit limited 1433 * parallelism. 1434 * 1435 * @return a {@code Spliterator} over the elements in this queue 1436 * @since 1.8 1437 */ spliterator()1438 public Spliterator<E> spliterator() { 1439 return Spliterators.spliterator 1440 (this, (Spliterator.ORDERED | 1441 Spliterator.NONNULL | 1442 Spliterator.CONCURRENT)); 1443 } 1444 1445 /** 1446 * @throws NullPointerException {@inheritDoc} 1447 */ forEach(Consumer<? super E> action)1448 public void forEach(Consumer<? super E> action) { 1449 Objects.requireNonNull(action); 1450 final ReentrantLock lock = this.lock; 1451 lock.lock(); 1452 try { 1453 if (count > 0) { 1454 final Object[] items = this.items; 1455 for (int i = takeIndex, end = putIndex, 1456 to = (i < end) ? end : items.length; 1457 ; i = 0, to = end) { 1458 for (; i < to; i++) 1459 action.accept(itemAt(items, i)); 1460 if (to == end) break; 1461 } 1462 } 1463 } finally { 1464 lock.unlock(); 1465 } 1466 } 1467 1468 /** 1469 * @throws NullPointerException {@inheritDoc} 1470 */ removeIf(Predicate<? super E> filter)1471 public boolean removeIf(Predicate<? super E> filter) { 1472 Objects.requireNonNull(filter); 1473 return bulkRemove(filter); 1474 } 1475 1476 /** 1477 * @throws NullPointerException {@inheritDoc} 1478 */ removeAll(Collection<?> c)1479 public boolean removeAll(Collection<?> c) { 1480 Objects.requireNonNull(c); 1481 return bulkRemove(e -> c.contains(e)); 1482 } 1483 1484 /** 1485 * @throws NullPointerException {@inheritDoc} 1486 */ retainAll(Collection<?> c)1487 public boolean retainAll(Collection<?> c) { 1488 Objects.requireNonNull(c); 1489 return bulkRemove(e -> !c.contains(e)); 1490 } 1491 1492 /** Implementation of bulk remove methods. */ bulkRemove(Predicate<? super E> filter)1493 private boolean bulkRemove(Predicate<? super E> filter) { 1494 final ReentrantLock lock = this.lock; 1495 lock.lock(); 1496 try { 1497 if (itrs == null) { // check for active iterators 1498 if (count > 0) { 1499 final Object[] items = this.items; 1500 // Optimize for initial run of survivors 1501 for (int i = takeIndex, end = putIndex, 1502 to = (i < end) ? end : items.length; 1503 ; i = 0, to = end) { 1504 for (; i < to; i++) 1505 if (filter.test(itemAt(items, i))) 1506 return bulkRemoveModified(filter, i); 1507 if (to == end) break; 1508 } 1509 } 1510 return false; 1511 } 1512 } finally { 1513 lock.unlock(); 1514 } 1515 // Active iterators are too hairy! 1516 // Punting (for now) to the slow n^2 algorithm ... 1517 return super.removeIf(filter); 1518 } 1519 1520 // A tiny bit set implementation 1521 nBits(int n)1522 private static long[] nBits(int n) { 1523 return new long[((n - 1) >> 6) + 1]; 1524 } setBit(long[] bits, int i)1525 private static void setBit(long[] bits, int i) { 1526 bits[i >> 6] |= 1L << i; 1527 } isClear(long[] bits, int i)1528 private static boolean isClear(long[] bits, int i) { 1529 return (bits[i >> 6] & (1L << i)) == 0; 1530 } 1531 1532 /** 1533 * Returns circular distance from i to j, disambiguating i == j to 1534 * items.length; never returns 0. 1535 */ distanceNonEmpty(int i, int j)1536 private int distanceNonEmpty(int i, int j) { 1537 if ((j -= i) <= 0) j += items.length; 1538 return j; 1539 } 1540 1541 /** 1542 * Helper for bulkRemove, in case of at least one deletion. 1543 * Tolerate predicates that reentrantly access the collection for 1544 * read (but not write), so traverse once to find elements to 1545 * delete, a second pass to physically expunge. 1546 * 1547 * @param beg valid index of first element to be deleted 1548 */ bulkRemoveModified( Predicate<? super E> filter, final int beg)1549 private boolean bulkRemoveModified( 1550 Predicate<? super E> filter, final int beg) { 1551 final Object[] es = items; 1552 final int capacity = items.length; 1553 final int end = putIndex; 1554 final long[] deathRow = nBits(distanceNonEmpty(beg, putIndex)); 1555 deathRow[0] = 1L; // set bit 0 1556 for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg; 1557 ; i = 0, to = end, k -= capacity) { 1558 for (; i < to; i++) 1559 if (filter.test(itemAt(es, i))) 1560 setBit(deathRow, i - k); 1561 if (to == end) break; 1562 } 1563 // a two-finger traversal, with hare i reading, tortoise w writing 1564 int w = beg; 1565 for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg; 1566 ; w = 0) { // w rejoins i on second leg 1567 // In this loop, i and w are on the same leg, with i > w 1568 for (; i < to; i++) 1569 if (isClear(deathRow, i - k)) 1570 es[w++] = es[i]; 1571 if (to == end) break; 1572 // In this loop, w is on the first leg, i on the second 1573 for (i = 0, to = end, k -= capacity; i < to && w < capacity; i++) 1574 if (isClear(deathRow, i - k)) 1575 es[w++] = es[i]; 1576 if (i >= to) { 1577 if (w == capacity) w = 0; // "corner" case 1578 break; 1579 } 1580 } 1581 count -= distanceNonEmpty(w, end); 1582 circularClear(es, putIndex = w, end); 1583 return true; 1584 } 1585 1586 /** debugging */ checkInvariants()1587 void checkInvariants() { 1588 // meta-assertions 1589 // assert lock.isHeldByCurrentThread(); 1590 if (!invariantsSatisfied()) { 1591 String detail = String.format( 1592 "takeIndex=%d putIndex=%d count=%d capacity=%d items=%s", 1593 takeIndex, putIndex, count, items.length, 1594 Arrays.toString(items)); 1595 System.err.println(detail); 1596 throw new AssertionError(detail); 1597 } 1598 } 1599 invariantsSatisfied()1600 private boolean invariantsSatisfied() { 1601 // Unlike ArrayDeque, we have a count field but no spare slot. 1602 // We prefer ArrayDeque's strategy (and the names of its fields!), 1603 // but our field layout is baked into the serial form, and so is 1604 // too annoying to change. 1605 // 1606 // putIndex == takeIndex must be disambiguated by checking count. 1607 int capacity = items.length; 1608 return capacity > 0 1609 && items.getClass() == Object[].class 1610 && (takeIndex | putIndex | count) >= 0 1611 && takeIndex < capacity 1612 && putIndex < capacity 1613 && count <= capacity 1614 && (putIndex - takeIndex - count) % capacity == 0 1615 && (count == 0 || items[takeIndex] != null) 1616 && (count == capacity || items[putIndex] == null) 1617 && (count == 0 || items[dec(putIndex, capacity)] != null); 1618 } 1619 1620 /** 1621 * Reconstitutes this queue from a stream (that is, deserializes it). 1622 * 1623 * @param s the stream 1624 * @throws ClassNotFoundException if the class of a serialized object 1625 * could not be found 1626 * @throws java.io.InvalidObjectException if invariants are violated 1627 * @throws java.io.IOException if an I/O error occurs 1628 */ readObject(java.io.ObjectInputStream s)1629 private void readObject(java.io.ObjectInputStream s) 1630 throws java.io.IOException, ClassNotFoundException { 1631 1632 // Read in items array and various fields 1633 s.defaultReadObject(); 1634 1635 if (!invariantsSatisfied()) 1636 throw new java.io.InvalidObjectException("invariants violated"); 1637 } 1638 } 1639