1 /* 2 * Copyright (C) 2010 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import com.google.common.collect.ObjectArrays; 20 21 import java.util.AbstractQueue; 22 import java.util.Collection; 23 import java.util.ConcurrentModificationException; 24 import java.util.Iterator; 25 import java.util.NoSuchElementException; 26 import java.util.concurrent.BlockingQueue; 27 import java.util.concurrent.TimeUnit; 28 29 import javax.annotation.Nullable; 30 31 /** 32 * A bounded {@linkplain BlockingQueue blocking queue} backed by an 33 * array. This queue orders elements FIFO (first-in-first-out). The 34 * <em>head</em> of the queue is that element that has been on the 35 * queue the longest time. The <em>tail</em> of the queue is that 36 * element that has been on the queue the shortest time. New elements 37 * are inserted at the tail of the queue, and the queue retrieval 38 * operations obtain elements at the head of the queue. 39 * 40 * <p>This is a classic "bounded buffer", in which a 41 * fixed-sized array holds elements inserted by producers and 42 * extracted by consumers. Once created, the capacity cannot be 43 * increased. Attempts to <tt>put</tt> an element into a full queue 44 * will result in the operation blocking; attempts to <tt>take</tt> an 45 * element from an empty queue will similarly block. 46 * 47 * <p> This class supports an optional fairness policy for ordering 48 * waiting producer and consumer threads. By default, this ordering 49 * is not guaranteed. However, a queue constructed with fairness set 50 * to <tt>true</tt> grants threads access in FIFO order. Fairness 51 * generally decreases throughput but reduces variability and avoids 52 * starvation. 53 * 54 * <p>This class and its iterator implement all of the 55 * <em>optional</em> methods of the {@link Collection} and {@link 56 * Iterator} interfaces. 57 * 58 * @author Doug Lea 59 * @author Justin T. Sampson 60 * @param <E> the type of elements held in this collection 61 */ 62 public class MonitorBasedArrayBlockingQueue<E> extends AbstractQueue<E> 63 implements BlockingQueue<E> { 64 65 // Based on revision 1.58 of ArrayBlockingQueue by Doug Lea, from 66 // http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/ 67 68 /** The queued items */ 69 final E[] items; 70 /** items index for next take, poll or remove */ 71 int takeIndex; 72 /** items index for next put, offer, or add. */ 73 int putIndex; 74 /** Number of items in the queue */ 75 private int count; 76 77 /* 78 * Concurrency control uses the classic two-condition algorithm 79 * found in any textbook. 80 */ 81 82 /** Monitor guarding all access */ 83 final Monitor monitor; 84 85 /** Guard for waiting takes */ 86 private final Monitor.Guard notEmpty; 87 88 /** Guard for waiting puts */ 89 private final Monitor.Guard notFull; 90 91 // Internal helper methods 92 93 /** 94 * Circularly increment i. 95 */ inc(int i)96 final int inc(int i) { 97 return (++i == items.length) ? 0 : i; 98 } 99 100 /** 101 * Inserts element at current put position, advances, and signals. 102 * Call only when occupying monitor. 103 */ insert(E x)104 private void insert(E x) { 105 items[putIndex] = x; 106 putIndex = inc(putIndex); 107 ++count; 108 } 109 110 /** 111 * Extracts element at current take position, advances, and signals. 112 * Call only when occupying monitor. 113 */ extract()114 private E extract() { 115 final E[] items = this.items; 116 E x = items[takeIndex]; 117 items[takeIndex] = null; 118 takeIndex = inc(takeIndex); 119 --count; 120 return x; 121 } 122 123 /** 124 * Utility for remove and iterator.remove: Delete item at position i. 125 * Call only when occupying monitor. 126 */ removeAt(int i)127 void removeAt(int i) { 128 final E[] items = this.items; 129 // if removing front item, just advance 130 if (i == takeIndex) { 131 items[takeIndex] = null; 132 takeIndex = inc(takeIndex); 133 } else { 134 // slide over all others up through putIndex. 135 for (;;) { 136 int nexti = inc(i); 137 if (nexti != putIndex) { 138 items[i] = items[nexti]; 139 i = nexti; 140 } else { 141 items[i] = null; 142 putIndex = i; 143 break; 144 } 145 } 146 } 147 --count; 148 } 149 150 /** 151 * Creates an <tt>MonitorBasedArrayBlockingQueue</tt> with the given (fixed) 152 * capacity and default access policy. 153 * 154 * @param capacity the capacity of this queue 155 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1 156 */ MonitorBasedArrayBlockingQueue(int capacity)157 public MonitorBasedArrayBlockingQueue(int capacity) { 158 this(capacity, false); 159 } 160 161 /** 162 * Creates an <tt>MonitorBasedArrayBlockingQueue</tt> with the given (fixed) 163 * capacity and the specified access policy. 164 * 165 * @param capacity the capacity of this queue 166 * @param fair if <tt>true</tt> then queue accesses for threads blocked 167 * on insertion or removal, are processed in FIFO order; 168 * if <tt>false</tt> the access order is unspecified. 169 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1 170 */ MonitorBasedArrayBlockingQueue(int capacity, boolean fair)171 public MonitorBasedArrayBlockingQueue(int capacity, boolean fair) { 172 if (capacity <= 0) 173 throw new IllegalArgumentException(); 174 this.items = newEArray(capacity); 175 monitor = new Monitor(fair); 176 notEmpty = new Monitor.Guard(monitor) { 177 @Override public boolean isSatisfied() { 178 return count > 0; 179 } 180 }; 181 notFull = new Monitor.Guard(monitor) { 182 @Override public boolean isSatisfied() { 183 return count < items.length; 184 } 185 }; 186 } 187 188 @SuppressWarnings("unchecked") // please don't try this home, kids newEArray(int capacity)189 private static <E> E[] newEArray(int capacity) { 190 return (E[]) new Object[capacity]; 191 } 192 193 /** 194 * Creates an <tt>MonitorBasedArrayBlockingQueue</tt> with the given (fixed) 195 * capacity, the specified access policy and initially containing the 196 * elements of the given collection, 197 * added in traversal order of the collection's iterator. 198 * 199 * @param capacity the capacity of this queue 200 * @param fair if <tt>true</tt> then queue accesses for threads blocked 201 * on insertion or removal, are processed in FIFO order; 202 * if <tt>false</tt> the access order is unspecified. 203 * @param c the collection of elements to initially contain 204 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 205 * <tt>c.size()</tt>, or less than 1. 206 * @throws NullPointerException if the specified collection or any 207 * of its elements are null 208 */ MonitorBasedArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)209 public MonitorBasedArrayBlockingQueue(int capacity, boolean fair, 210 Collection<? extends E> c) { 211 this(capacity, fair); 212 if (capacity < c.size()) 213 throw new IllegalArgumentException(); 214 215 for (E e : c) 216 add(e); 217 } 218 219 /** 220 * Inserts the specified element at the tail of this queue if it is 221 * possible to do so immediately without exceeding the queue's capacity, 222 * returning <tt>true</tt> upon success and throwing an 223 * <tt>IllegalStateException</tt> if this queue is full. 224 * 225 * @param e the element to add 226 * @return <tt>true</tt> (as specified by {@link Collection#add}) 227 * @throws IllegalStateException if this queue is full 228 * @throws NullPointerException if the specified element is null 229 */ add(E e)230 @Override public boolean add(E e) { 231 return super.add(e); 232 } 233 234 /** 235 * Inserts the specified element at the tail of this queue if it is 236 * possible to do so immediately without exceeding the queue's capacity, 237 * returning <tt>true</tt> upon success and <tt>false</tt> if this queue 238 * is full. This method is generally preferable to method {@link #add}, 239 * which can fail to insert an element only by throwing an exception. 240 * 241 * @throws NullPointerException if the specified element is null 242 */ 243 @Override offer(E e)244 public boolean offer(E e) { 245 if (e == null) throw new NullPointerException(); 246 final Monitor monitor = this.monitor; 247 if (monitor.enterIf(notFull)) { 248 try { 249 insert(e); 250 return true; 251 } finally { 252 monitor.leave(); 253 } 254 } else { 255 return false; 256 } 257 } 258 259 /** 260 * Inserts the specified element at the tail of this queue, waiting 261 * for space to become available if the queue is full. 262 * 263 * @throws InterruptedException {@inheritDoc} 264 * @throws NullPointerException {@inheritDoc} 265 */ 266 @Override put(E e)267 public void put(E e) throws InterruptedException { 268 if (e == null) throw new NullPointerException(); 269 final Monitor monitor = this.monitor; 270 monitor.enterWhen(notFull); 271 try { 272 insert(e); 273 } finally { 274 monitor.leave(); 275 } 276 } 277 278 /** 279 * Inserts the specified element at the tail of this queue, waiting 280 * up to the specified wait time for space to become available if 281 * the queue is full. 282 * 283 * @throws InterruptedException {@inheritDoc} 284 * @throws NullPointerException {@inheritDoc} 285 */ 286 @Override offer(E e, long timeout, TimeUnit unit)287 public boolean offer(E e, long timeout, TimeUnit unit) 288 throws InterruptedException { 289 290 if (e == null) throw new NullPointerException(); 291 final Monitor monitor = this.monitor; 292 if (monitor.enterWhen(notFull, timeout, unit)) { 293 try { 294 insert(e); 295 return true; 296 } finally { 297 monitor.leave(); 298 } 299 } else { 300 return false; 301 } 302 } 303 304 @Override poll()305 public E poll() { 306 final Monitor monitor = this.monitor; 307 if (monitor.enterIf(notEmpty)) { 308 try { 309 return extract(); 310 } finally { 311 monitor.leave(); 312 } 313 } else { 314 return null; 315 } 316 } 317 318 @Override take()319 public E take() throws InterruptedException { 320 final Monitor monitor = this.monitor; 321 monitor.enterWhen(notEmpty); 322 try { 323 return extract(); 324 } finally { 325 monitor.leave(); 326 } 327 } 328 329 @Override poll(long timeout, TimeUnit unit)330 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 331 final Monitor monitor = this.monitor; 332 if (monitor.enterWhen(notEmpty, timeout, unit)) { 333 try { 334 return extract(); 335 } finally { 336 monitor.leave(); 337 } 338 } else { 339 return null; 340 } 341 } 342 343 @Override peek()344 public E peek() { 345 final Monitor monitor = this.monitor; 346 if (monitor.enterIf(notEmpty)) { 347 try { 348 return items[takeIndex]; 349 } finally { 350 monitor.leave(); 351 } 352 } else { 353 return null; 354 } 355 } 356 357 // this doc comment is overridden to remove the reference to collections 358 // greater in size than Integer.MAX_VALUE 359 /** 360 * Returns the number of elements in this queue. 361 * 362 * @return the number of elements in this queue 363 */ size()364 @Override public int size() { 365 final Monitor monitor = this.monitor; 366 monitor.enter(); 367 try { 368 return count; 369 } finally { 370 monitor.leave(); 371 } 372 } 373 374 // this doc comment is a modified copy of the inherited doc comment, 375 // without the reference to unlimited queues. 376 /** 377 * Returns the number of additional elements that this queue can ideally 378 * (in the absence of memory or resource constraints) accept without 379 * blocking. This is always equal to the initial capacity of this queue 380 * less the current <tt>size</tt> of this queue. 381 * 382 * <p>Note that you <em>cannot</em> always tell if an attempt to insert 383 * an element will succeed by inspecting <tt>remainingCapacity</tt> 384 * because it may be the case that another thread is about to 385 * insert or remove an element. 386 */ 387 @Override remainingCapacity()388 public int remainingCapacity() { 389 final Monitor monitor = this.monitor; 390 monitor.enter(); 391 try { 392 return items.length - count; 393 } finally { 394 monitor.leave(); 395 } 396 } 397 398 /** 399 * Removes a single instance of the specified element from this queue, 400 * if it is present. More formally, removes an element <tt>e</tt> such 401 * that <tt>o.equals(e)</tt>, if this queue contains one or more such 402 * elements. 403 * Returns <tt>true</tt> if this queue contained the specified element 404 * (or equivalently, if this queue changed as a result of the call). 405 * 406 * @param o element to be removed from this queue, if present 407 * @return <tt>true</tt> if this queue changed as a result of the call 408 */ remove(@ullable Object o)409 @Override public boolean remove(@Nullable Object o) { 410 if (o == null) return false; 411 final E[] items = this.items; 412 final Monitor monitor = this.monitor; 413 monitor.enter(); 414 try { 415 int i = takeIndex; 416 int k = 0; 417 for (;;) { 418 if (k++ >= count) 419 return false; 420 if (o.equals(items[i])) { 421 removeAt(i); 422 return true; 423 } 424 i = inc(i); 425 } 426 } finally { 427 monitor.leave(); 428 } 429 } 430 431 /** 432 * Returns <tt>true</tt> if this queue contains the specified element. 433 * More formally, returns <tt>true</tt> if and only if this queue contains 434 * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>. 435 * 436 * @param o object to be checked for containment in this queue 437 * @return <tt>true</tt> if this queue contains the specified element 438 */ contains(@ullable Object o)439 @Override public boolean contains(@Nullable Object o) { 440 if (o == null) return false; 441 final E[] items = this.items; 442 final Monitor monitor = this.monitor; 443 monitor.enter(); 444 try { 445 int i = takeIndex; 446 int k = 0; 447 while (k++ < count) { 448 if (o.equals(items[i])) 449 return true; 450 i = inc(i); 451 } 452 return false; 453 } finally { 454 monitor.leave(); 455 } 456 } 457 458 /** 459 * Returns an array containing all of the elements in this queue, in 460 * proper sequence. 461 * 462 * <p>The returned array will be "safe" in that no references to it are 463 * maintained by this queue. (In other words, this method must allocate 464 * a new array). The caller is thus free to modify the returned array. 465 * 466 * <p>This method acts as bridge between array-based and collection-based 467 * APIs. 468 * 469 * @return an array containing all of the elements in this queue 470 */ toArray()471 @Override public Object[] toArray() { 472 final E[] items = this.items; 473 final Monitor monitor = this.monitor; 474 monitor.enter(); 475 try { 476 Object[] a = new Object[count]; 477 int k = 0; 478 int i = takeIndex; 479 while (k < count) { 480 a[k++] = items[i]; 481 i = inc(i); 482 } 483 return a; 484 } finally { 485 monitor.leave(); 486 } 487 } 488 489 /** 490 * Returns an array containing all of the elements in this queue, in 491 * proper sequence; the runtime type of the returned array is that of 492 * the specified array. If the queue fits in the specified array, it 493 * is returned therein. Otherwise, a new array is allocated with the 494 * runtime type of the specified array and the size of this queue. 495 * 496 * <p>If this queue fits in the specified array with room to spare 497 * (i.e., the array has more elements than this queue), the element in 498 * the array immediately following the end of the queue is set to 499 * <tt>null</tt>. 500 * 501 * <p>Like the {@link #toArray()} method, this method acts as bridge between 502 * array-based and collection-based APIs. Further, this method allows 503 * precise control over the runtime type of the output array, and may, 504 * under certain circumstances, be used to save allocation costs. 505 * 506 * <p>Suppose <tt>x</tt> is a queue known to contain only strings. 507 * The following code can be used to dump the queue into a newly 508 * allocated array of <tt>String</tt>: 509 * 510 * <pre> 511 * String[] y = x.toArray(new String[0]);</pre> 512 * 513 * <p>Note that <tt>toArray(new Object[0])</tt> is identical in function to 514 * <tt>toArray()</tt>. 515 * 516 * @param a the array into which the elements of the queue are to 517 * be stored, if it is big enough; otherwise, a new array of the 518 * same runtime type is allocated for this purpose 519 * @return an array containing all of the elements in this queue 520 * @throws ArrayStoreException if the runtime type of the specified array 521 * is not a supertype of the runtime type of every element in 522 * this queue 523 * @throws NullPointerException if the specified array is null 524 */ toArray(T[] a)525 @Override public <T> T[] toArray(T[] a) { 526 final E[] items = this.items; 527 final Monitor monitor = this.monitor; 528 monitor.enter(); 529 try { 530 if (a.length < count) 531 a = ObjectArrays.newArray(a, count); 532 533 int k = 0; 534 int i = takeIndex; 535 while (k < count) { 536 // This cast is not itself safe, but the following statement 537 // will fail if the runtime type of items[i] is not assignable 538 // to the runtime type of a[k++], which is all that the method 539 // contract requires (see @throws ArrayStoreException above). 540 @SuppressWarnings("unchecked") 541 T t = (T) items[i]; 542 a[k++] = t; 543 i = inc(i); 544 } 545 if (a.length > count) 546 a[count] = null; 547 return a; 548 } finally { 549 monitor.leave(); 550 } 551 } 552 toString()553 @Override public String toString() { 554 final Monitor monitor = this.monitor; 555 monitor.enter(); 556 try { 557 return super.toString(); 558 } finally { 559 monitor.leave(); 560 } 561 } 562 563 /** 564 * Atomically removes all of the elements from this queue. 565 * The queue will be empty after this call returns. 566 */ clear()567 @Override public void clear() { 568 final E[] items = this.items; 569 final Monitor monitor = this.monitor; 570 monitor.enter(); 571 try { 572 int i = takeIndex; 573 int k = count; 574 while (k-- > 0) { 575 items[i] = null; 576 i = inc(i); 577 } 578 count = 0; 579 putIndex = 0; 580 takeIndex = 0; 581 } finally { 582 monitor.leave(); 583 } 584 } 585 586 /** 587 * @throws UnsupportedOperationException {@inheritDoc} 588 * @throws ClassCastException {@inheritDoc} 589 * @throws NullPointerException {@inheritDoc} 590 * @throws IllegalArgumentException {@inheritDoc} 591 */ 592 @Override drainTo(Collection<? super E> c)593 public int drainTo(Collection<? super E> c) { 594 if (c == null) 595 throw new NullPointerException(); 596 if (c == this) 597 throw new IllegalArgumentException(); 598 final E[] items = this.items; 599 final Monitor monitor = this.monitor; 600 monitor.enter(); 601 try { 602 int i = takeIndex; 603 int n = 0; 604 int max = count; 605 while (n < max) { 606 c.add(items[i]); 607 items[i] = null; 608 i = inc(i); 609 ++n; 610 } 611 if (n > 0) { 612 count = 0; 613 putIndex = 0; 614 takeIndex = 0; 615 } 616 return n; 617 } finally { 618 monitor.leave(); 619 } 620 } 621 622 /** 623 * @throws UnsupportedOperationException {@inheritDoc} 624 * @throws ClassCastException {@inheritDoc} 625 * @throws NullPointerException {@inheritDoc} 626 * @throws IllegalArgumentException {@inheritDoc} 627 */ 628 @Override drainTo(Collection<? super E> c, int maxElements)629 public int drainTo(Collection<? super E> c, int maxElements) { 630 if (c == null) 631 throw new NullPointerException(); 632 if (c == this) 633 throw new IllegalArgumentException(); 634 if (maxElements <= 0) 635 return 0; 636 final E[] items = this.items; 637 final Monitor monitor = this.monitor; 638 monitor.enter(); 639 try { 640 int i = takeIndex; 641 int n = 0; 642 int max = (maxElements < count) ? maxElements : count; 643 while (n < max) { 644 c.add(items[i]); 645 items[i] = null; 646 i = inc(i); 647 ++n; 648 } 649 if (n > 0) { 650 count -= n; 651 takeIndex = i; 652 } 653 return n; 654 } finally { 655 monitor.leave(); 656 } 657 } 658 659 /** 660 * Returns an iterator over the elements in this queue in proper sequence. 661 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that 662 * will never throw {@link ConcurrentModificationException}, 663 * and guarantees to traverse elements as they existed upon 664 * construction of the iterator, and may (but is not guaranteed to) 665 * reflect any modifications subsequent to construction. 666 * 667 * @return an iterator over the elements in this queue in proper sequence 668 */ iterator()669 @Override public Iterator<E> iterator() { 670 final Monitor monitor = this.monitor; 671 monitor.enter(); 672 try { 673 return new Itr(); 674 } finally { 675 monitor.leave(); 676 } 677 } 678 679 /** 680 * Iterator for MonitorBasedArrayBlockingQueue 681 */ 682 private class Itr implements Iterator<E> { 683 /** 684 * Index of element to be returned by next, 685 * or a negative number if no such. 686 */ 687 private int nextIndex; 688 689 /** 690 * nextItem holds on to item fields because once we claim 691 * that an element exists in hasNext(), we must return it in 692 * the following next() call even if it was in the process of 693 * being removed when hasNext() was called. 694 */ 695 private E nextItem; 696 697 /** 698 * Index of element returned by most recent call to next. 699 * Reset to -1 if this element is deleted by a call to remove. 700 */ 701 private int lastRet; 702 Itr()703 Itr() { 704 lastRet = -1; 705 if (count == 0) 706 nextIndex = -1; 707 else { 708 nextIndex = takeIndex; 709 nextItem = items[takeIndex]; 710 } 711 } 712 713 @Override hasNext()714 public boolean hasNext() { 715 /* 716 * No sync. We can return true by mistake here 717 * only if this iterator passed across threads, 718 * which we don't support anyway. 719 */ 720 return nextIndex >= 0; 721 } 722 723 /** 724 * Checks whether nextIndex is valid; if so setting nextItem. 725 * Stops iterator when either hits putIndex or sees null item. 726 */ checkNext()727 private void checkNext() { 728 if (nextIndex == putIndex) { 729 nextIndex = -1; 730 nextItem = null; 731 } else { 732 nextItem = items[nextIndex]; 733 if (nextItem == null) 734 nextIndex = -1; 735 } 736 } 737 738 @Override next()739 public E next() { 740 final Monitor monitor = MonitorBasedArrayBlockingQueue.this.monitor; 741 monitor.enter(); 742 try { 743 if (nextIndex < 0) 744 throw new NoSuchElementException(); 745 lastRet = nextIndex; 746 E x = nextItem; 747 nextIndex = inc(nextIndex); 748 checkNext(); 749 return x; 750 } finally { 751 monitor.leave(); 752 } 753 } 754 755 @Override remove()756 public void remove() { 757 final Monitor monitor = MonitorBasedArrayBlockingQueue.this.monitor; 758 monitor.enter(); 759 try { 760 int i = lastRet; 761 if (i == -1) 762 throw new IllegalStateException(); 763 lastRet = -1; 764 765 int ti = takeIndex; 766 removeAt(i); 767 // back up cursor (reset to front if was first element) 768 nextIndex = (i == ti) ? takeIndex : i; 769 checkNext(); 770 } finally { 771 monitor.leave(); 772 } 773 } 774 } 775 } 776