1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 * Other contributors include John Vint 6 */ 7 8 package jsr166; 9 10 import static java.util.concurrent.TimeUnit.MILLISECONDS; 11 12 import java.util.ArrayList; 13 import java.util.Arrays; 14 import java.util.Collection; 15 import java.util.Iterator; 16 import java.util.List; 17 import java.util.NoSuchElementException; 18 import java.util.Queue; 19 import java.util.concurrent.BlockingQueue; 20 import java.util.concurrent.Callable; 21 import java.util.concurrent.CountDownLatch; 22 import java.util.concurrent.Executors; 23 import java.util.concurrent.ExecutorService; 24 import java.util.concurrent.LinkedTransferQueue; 25 26 import junit.framework.Test; 27 28 @SuppressWarnings({"unchecked", "rawtypes"}) 29 public class LinkedTransferQueueTest extends JSR166TestCase { 30 static class Implementation implements CollectionImplementation { 31 public Class<?> klazz() { return LinkedTransferQueue.class; } 32 public Collection emptyCollection() { return new LinkedTransferQueue(); } 33 public Object makeElement(int i) { return i; } 34 public boolean isConcurrent() { return true; } 35 public boolean permitsNulls() { return false; } 36 } 37 38 // android-note: These tests have been moved into their own separate 39 // classes to work around CTS issues: 40 // LinkedTransferQueueBlockingQueueTest.java 41 // LinkedTransferQueueCollectionTest.java 42 // 43 // public static class Generic extends BlockingQueueTest { 44 // protected BlockingQueue emptyCollection() { 45 // return new LinkedTransferQueue(); 46 // } 47 // } 48 49 // android-note: Removed because the CTS runner does a bad job of 50 // retrying tests that have suite() declarations. 51 // 52 // public static void main(String[] args) { 53 // main(suite(), args); 54 // } 55 // public static Test suite() { 56 // return newTestSuite(LinkedTransferQueueTest.class, 57 // new Generic().testSuite(), 58 // CollectionTest.testSuite(new Implementation())); 59 // } 60 61 /** 62 * Constructor builds new queue with size being zero and empty 63 * being true 64 */ 65 public void testConstructor1() { 66 assertEquals(0, new LinkedTransferQueue().size()); 67 assertTrue(new LinkedTransferQueue().isEmpty()); 68 } 69 70 /** 71 * Initializing constructor with null collection throws 72 * NullPointerException 73 */ 74 public void testConstructor2() { 75 try { 76 new LinkedTransferQueue(null); 77 shouldThrow(); 78 } catch (NullPointerException success) {} 79 } 80 81 /** 82 * Initializing from Collection of null elements throws 83 * NullPointerException 84 */ 85 public void testConstructor3() { 86 Collection<Integer> elements = Arrays.asList(new Integer[SIZE]); 87 try { 88 new LinkedTransferQueue(elements); 89 shouldThrow(); 90 } catch (NullPointerException success) {} 91 } 92 93 /** 94 * Initializing constructor with a collection containing some null elements 95 * throws NullPointerException 96 */ 97 public void testConstructor4() { 98 Integer[] ints = new Integer[SIZE]; 99 for (int i = 0; i < SIZE - 1; ++i) 100 ints[i] = i; 101 Collection<Integer> elements = Arrays.asList(ints); 102 try { 103 new LinkedTransferQueue(elements); 104 shouldThrow(); 105 } catch (NullPointerException success) {} 106 } 107 108 /** 109 * Queue contains all elements of the collection it is initialized by 110 */ 111 public void testConstructor5() { 112 Integer[] ints = new Integer[SIZE]; 113 for (int i = 0; i < SIZE; ++i) { 114 ints[i] = i; 115 } 116 List intList = Arrays.asList(ints); 117 LinkedTransferQueue q 118 = new LinkedTransferQueue(intList); 119 assertEquals(q.size(), intList.size()); 120 assertEquals(q.toString(), intList.toString()); 121 assertTrue(Arrays.equals(q.toArray(), 122 intList.toArray())); 123 assertTrue(Arrays.equals(q.toArray(new Object[0]), 124 intList.toArray(new Object[0]))); 125 assertTrue(Arrays.equals(q.toArray(new Object[SIZE]), 126 intList.toArray(new Object[SIZE]))); 127 for (int i = 0; i < SIZE; ++i) { 128 assertEquals(ints[i], q.poll()); 129 } 130 } 131 132 /** 133 * remainingCapacity() always returns Integer.MAX_VALUE 134 */ 135 public void testRemainingCapacity() { 136 BlockingQueue q = populatedQueue(SIZE); 137 for (int i = 0; i < SIZE; ++i) { 138 assertEquals(Integer.MAX_VALUE, q.remainingCapacity()); 139 assertEquals(SIZE - i, q.size()); 140 assertEquals(i, q.remove()); 141 } 142 for (int i = 0; i < SIZE; ++i) { 143 assertEquals(Integer.MAX_VALUE, q.remainingCapacity()); 144 assertEquals(i, q.size()); 145 assertTrue(q.add(i)); 146 } 147 } 148 149 /** 150 * addAll(this) throws IllegalArgumentException 151 */ 152 public void testAddAllSelf() { 153 LinkedTransferQueue q = populatedQueue(SIZE); 154 try { 155 q.addAll(q); 156 shouldThrow(); 157 } catch (IllegalArgumentException success) {} 158 } 159 160 /** 161 * addAll of a collection with any null elements throws 162 * NullPointerException after possibly adding some elements 163 */ 164 public void testAddAll3() { 165 LinkedTransferQueue q = new LinkedTransferQueue(); 166 Integer[] ints = new Integer[SIZE]; 167 for (int i = 0; i < SIZE - 1; ++i) 168 ints[i] = i; 169 try { 170 q.addAll(Arrays.asList(ints)); 171 shouldThrow(); 172 } catch (NullPointerException success) {} 173 } 174 175 /** 176 * Queue contains all elements, in traversal order, of successful addAll 177 */ 178 public void testAddAll5() { 179 Integer[] empty = new Integer[0]; 180 Integer[] ints = new Integer[SIZE]; 181 for (int i = 0; i < SIZE; ++i) { 182 ints[i] = i; 183 } 184 LinkedTransferQueue q = new LinkedTransferQueue(); 185 assertFalse(q.addAll(Arrays.asList(empty))); 186 assertTrue(q.addAll(Arrays.asList(ints))); 187 for (int i = 0; i < SIZE; ++i) { 188 assertEquals(ints[i], q.poll()); 189 } 190 } 191 192 /** 193 * all elements successfully put are contained 194 */ 195 public void testPut() { 196 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>(); 197 for (int i = 0; i < SIZE; ++i) { 198 assertEquals(i, q.size()); 199 q.put(i); 200 assertTrue(q.contains(i)); 201 } 202 } 203 204 /** 205 * take retrieves elements in FIFO order 206 */ 207 public void testTake() throws InterruptedException { 208 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 209 for (int i = 0; i < SIZE; ++i) { 210 assertEquals(i, (int) q.take()); 211 } 212 } 213 214 /** 215 * take removes existing elements until empty, then blocks interruptibly 216 */ 217 public void testBlockingTake() throws InterruptedException { 218 final BlockingQueue q = populatedQueue(SIZE); 219 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 220 Thread t = newStartedThread(new CheckedRunnable() { 221 public void realRun() throws InterruptedException { 222 for (int i = 0; i < SIZE; ++i) { 223 assertEquals(i, q.take()); 224 } 225 226 Thread.currentThread().interrupt(); 227 try { 228 q.take(); 229 shouldThrow(); 230 } catch (InterruptedException success) {} 231 assertFalse(Thread.interrupted()); 232 233 pleaseInterrupt.countDown(); 234 try { 235 q.take(); 236 shouldThrow(); 237 } catch (InterruptedException success) {} 238 assertFalse(Thread.interrupted()); 239 }}); 240 241 await(pleaseInterrupt); 242 assertThreadStaysAlive(t); 243 t.interrupt(); 244 awaitTermination(t); 245 } 246 247 /** 248 * poll succeeds unless empty 249 */ 250 public void testPoll() throws InterruptedException { 251 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 252 for (int i = 0; i < SIZE; ++i) { 253 assertEquals(i, (int) q.poll()); 254 } 255 assertNull(q.poll()); 256 checkEmpty(q); 257 } 258 259 /** 260 * timed poll with zero timeout succeeds when non-empty, else times out 261 */ 262 public void testTimedPoll0() throws InterruptedException { 263 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 264 for (int i = 0; i < SIZE; ++i) { 265 assertEquals(i, (int) q.poll(0, MILLISECONDS)); 266 } 267 assertNull(q.poll(0, MILLISECONDS)); 268 checkEmpty(q); 269 } 270 271 /** 272 * timed poll with nonzero timeout succeeds when non-empty, else times out 273 */ 274 public void testTimedPoll() throws InterruptedException { 275 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 276 long startTime = System.nanoTime(); 277 for (int i = 0; i < SIZE; ++i) 278 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS)); 279 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 280 281 startTime = System.nanoTime(); 282 assertNull(q.poll(timeoutMillis(), MILLISECONDS)); 283 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 284 checkEmpty(q); 285 } 286 287 /** 288 * Interrupted timed poll throws InterruptedException instead of 289 * returning timeout status 290 */ 291 public void testInterruptedTimedPoll() throws InterruptedException { 292 final BlockingQueue<Integer> q = populatedQueue(SIZE); 293 final CountDownLatch aboutToWait = new CountDownLatch(1); 294 Thread t = newStartedThread(new CheckedRunnable() { 295 public void realRun() throws InterruptedException { 296 long startTime = System.nanoTime(); 297 for (int i = 0; i < SIZE; ++i) 298 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS)); 299 aboutToWait.countDown(); 300 try { 301 q.poll(LONG_DELAY_MS, MILLISECONDS); 302 shouldThrow(); 303 } catch (InterruptedException success) {} 304 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 305 }}); 306 307 aboutToWait.await(); 308 waitForThreadToEnterWaitState(t); 309 t.interrupt(); 310 awaitTermination(t); 311 checkEmpty(q); 312 } 313 314 /** 315 * timed poll after thread interrupted throws InterruptedException 316 * instead of returning timeout status 317 */ 318 public void testTimedPollAfterInterrupt() throws InterruptedException { 319 final BlockingQueue<Integer> q = populatedQueue(SIZE); 320 Thread t = newStartedThread(new CheckedRunnable() { 321 public void realRun() throws InterruptedException { 322 long startTime = System.nanoTime(); 323 Thread.currentThread().interrupt(); 324 for (int i = 0; i < SIZE; ++i) 325 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS)); 326 try { 327 q.poll(LONG_DELAY_MS, MILLISECONDS); 328 shouldThrow(); 329 } catch (InterruptedException success) {} 330 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 331 }}); 332 333 awaitTermination(t); 334 checkEmpty(q); 335 } 336 337 /** 338 * peek returns next element, or null if empty 339 */ 340 public void testPeek() throws InterruptedException { 341 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 342 for (int i = 0; i < SIZE; ++i) { 343 assertEquals(i, (int) q.peek()); 344 assertEquals(i, (int) q.poll()); 345 assertTrue(q.peek() == null || 346 i != (int) q.peek()); 347 } 348 assertNull(q.peek()); 349 checkEmpty(q); 350 } 351 352 /** 353 * element returns next element, or throws NoSuchElementException if empty 354 */ 355 public void testElement() throws InterruptedException { 356 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 357 for (int i = 0; i < SIZE; ++i) { 358 assertEquals(i, (int) q.element()); 359 assertEquals(i, (int) q.poll()); 360 } 361 try { 362 q.element(); 363 shouldThrow(); 364 } catch (NoSuchElementException success) {} 365 checkEmpty(q); 366 } 367 368 /** 369 * remove removes next element, or throws NoSuchElementException if empty 370 */ 371 public void testRemove() throws InterruptedException { 372 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 373 for (int i = 0; i < SIZE; ++i) { 374 assertEquals(i, (int) q.remove()); 375 } 376 try { 377 q.remove(); 378 shouldThrow(); 379 } catch (NoSuchElementException success) {} 380 checkEmpty(q); 381 } 382 383 /** 384 * An add following remove(x) succeeds 385 */ 386 public void testRemoveElementAndAdd() throws InterruptedException { 387 LinkedTransferQueue q = new LinkedTransferQueue(); 388 assertTrue(q.add(one)); 389 assertTrue(q.add(two)); 390 assertTrue(q.remove(one)); 391 assertTrue(q.remove(two)); 392 assertTrue(q.add(three)); 393 assertSame(q.take(), three); 394 } 395 396 /** 397 * contains(x) reports true when elements added but not yet removed 398 */ 399 public void testContains() { 400 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 401 for (int i = 0; i < SIZE; ++i) { 402 assertTrue(q.contains(i)); 403 assertEquals(i, (int) q.poll()); 404 assertFalse(q.contains(i)); 405 } 406 } 407 408 /** 409 * clear removes all elements 410 */ 411 public void testClear() throws InterruptedException { 412 LinkedTransferQueue q = populatedQueue(SIZE); 413 q.clear(); 414 checkEmpty(q); 415 assertEquals(Integer.MAX_VALUE, q.remainingCapacity()); 416 q.add(one); 417 assertFalse(q.isEmpty()); 418 assertEquals(1, q.size()); 419 assertTrue(q.contains(one)); 420 q.clear(); 421 checkEmpty(q); 422 } 423 424 /** 425 * containsAll(c) is true when c contains a subset of elements 426 */ 427 public void testContainsAll() { 428 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 429 LinkedTransferQueue<Integer> p = new LinkedTransferQueue<Integer>(); 430 for (int i = 0; i < SIZE; ++i) { 431 assertTrue(q.containsAll(p)); 432 assertFalse(p.containsAll(q)); 433 p.add(i); 434 } 435 assertTrue(p.containsAll(q)); 436 } 437 438 /** 439 * retainAll(c) retains only those elements of c and reports true 440 * if changed 441 */ 442 public void testRetainAll() { 443 LinkedTransferQueue q = populatedQueue(SIZE); 444 LinkedTransferQueue p = populatedQueue(SIZE); 445 for (int i = 0; i < SIZE; ++i) { 446 boolean changed = q.retainAll(p); 447 if (i == 0) { 448 assertFalse(changed); 449 } else { 450 assertTrue(changed); 451 } 452 assertTrue(q.containsAll(p)); 453 assertEquals(SIZE - i, q.size()); 454 p.remove(); 455 } 456 } 457 458 /** 459 * removeAll(c) removes only those elements of c and reports true 460 * if changed 461 */ 462 public void testRemoveAll() { 463 for (int i = 1; i < SIZE; ++i) { 464 LinkedTransferQueue q = populatedQueue(SIZE); 465 LinkedTransferQueue p = populatedQueue(i); 466 assertTrue(q.removeAll(p)); 467 assertEquals(SIZE - i, q.size()); 468 for (int j = 0; j < i; ++j) { 469 assertFalse(q.contains(p.remove())); 470 } 471 } 472 } 473 474 /** 475 * toArray() contains all elements in FIFO order 476 */ 477 public void testToArray() { 478 LinkedTransferQueue q = populatedQueue(SIZE); 479 Object[] o = q.toArray(); 480 for (int i = 0; i < o.length; i++) { 481 assertSame(o[i], q.poll()); 482 } 483 } 484 485 /** 486 * toArray(a) contains all elements in FIFO order 487 */ 488 public void testToArray2() { 489 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 490 Integer[] ints = new Integer[SIZE]; 491 Integer[] array = q.toArray(ints); 492 assertSame(ints, array); 493 for (int i = 0; i < ints.length; i++) { 494 assertSame(ints[i], q.poll()); 495 } 496 } 497 498 /** 499 * toArray(incompatible array type) throws ArrayStoreException 500 */ 501 public void testToArray1_BadArg() { 502 LinkedTransferQueue q = populatedQueue(SIZE); 503 try { 504 q.toArray(new String[10]); 505 shouldThrow(); 506 } catch (ArrayStoreException success) {} 507 } 508 509 /** 510 * iterator iterates through all elements 511 */ 512 public void testIterator() throws InterruptedException { 513 LinkedTransferQueue q = populatedQueue(SIZE); 514 Iterator it = q.iterator(); 515 int i; 516 for (i = 0; it.hasNext(); i++) 517 assertTrue(q.contains(it.next())); 518 assertEquals(i, SIZE); 519 assertIteratorExhausted(it); 520 521 it = q.iterator(); 522 for (i = 0; it.hasNext(); i++) 523 assertEquals(it.next(), q.take()); 524 assertEquals(i, SIZE); 525 assertIteratorExhausted(it); 526 } 527 528 /** 529 * iterator of empty collection has no elements 530 */ 531 public void testEmptyIterator() { 532 assertIteratorExhausted(new LinkedTransferQueue().iterator()); 533 } 534 535 /** 536 * iterator.remove() removes current element 537 */ 538 public void testIteratorRemove() { 539 final LinkedTransferQueue q = new LinkedTransferQueue(); 540 q.add(two); 541 q.add(one); 542 q.add(three); 543 544 Iterator it = q.iterator(); 545 it.next(); 546 it.remove(); 547 548 it = q.iterator(); 549 assertSame(it.next(), one); 550 assertSame(it.next(), three); 551 assertFalse(it.hasNext()); 552 } 553 554 /** 555 * iterator ordering is FIFO 556 */ 557 public void testIteratorOrdering() { 558 final LinkedTransferQueue<Integer> q 559 = new LinkedTransferQueue<Integer>(); 560 assertEquals(Integer.MAX_VALUE, q.remainingCapacity()); 561 q.add(one); 562 q.add(two); 563 q.add(three); 564 assertEquals(Integer.MAX_VALUE, q.remainingCapacity()); 565 int k = 0; 566 for (Integer n : q) { 567 assertEquals(++k, (int) n); 568 } 569 assertEquals(3, k); 570 } 571 572 /** 573 * Modifications do not cause iterators to fail 574 */ 575 public void testWeaklyConsistentIteration() { 576 final LinkedTransferQueue q = new LinkedTransferQueue(); 577 q.add(one); 578 q.add(two); 579 q.add(three); 580 for (Iterator it = q.iterator(); it.hasNext();) { 581 q.remove(); 582 it.next(); 583 } 584 assertEquals(0, q.size()); 585 } 586 587 /** 588 * toString contains toStrings of elements 589 */ 590 public void testToString() { 591 LinkedTransferQueue q = populatedQueue(SIZE); 592 String s = q.toString(); 593 for (int i = 0; i < SIZE; ++i) { 594 assertTrue(s.contains(String.valueOf(i))); 595 } 596 } 597 598 /** 599 * offer transfers elements across Executor tasks 600 */ 601 public void testOfferInExecutor() { 602 final LinkedTransferQueue q = new LinkedTransferQueue(); 603 final CheckedBarrier threadsStarted = new CheckedBarrier(2); 604 final ExecutorService executor = Executors.newFixedThreadPool(2); 605 try (PoolCleaner cleaner = cleaner(executor)) { 606 607 executor.execute(new CheckedRunnable() { 608 public void realRun() throws InterruptedException { 609 threadsStarted.await(); 610 long startTime = System.nanoTime(); 611 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS)); 612 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 613 }}); 614 615 executor.execute(new CheckedRunnable() { 616 public void realRun() throws InterruptedException { 617 threadsStarted.await(); 618 assertSame(one, q.take()); 619 checkEmpty(q); 620 }}); 621 } 622 } 623 624 /** 625 * timed poll retrieves elements across Executor threads 626 */ 627 public void testPollInExecutor() { 628 final LinkedTransferQueue q = new LinkedTransferQueue(); 629 final CheckedBarrier threadsStarted = new CheckedBarrier(2); 630 final ExecutorService executor = Executors.newFixedThreadPool(2); 631 try (PoolCleaner cleaner = cleaner(executor)) { 632 633 executor.execute(new CheckedRunnable() { 634 public void realRun() throws InterruptedException { 635 assertNull(q.poll()); 636 threadsStarted.await(); 637 long startTime = System.nanoTime(); 638 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS)); 639 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 640 checkEmpty(q); 641 }}); 642 643 executor.execute(new CheckedRunnable() { 644 public void realRun() throws InterruptedException { 645 threadsStarted.await(); 646 q.put(one); 647 }}); 648 } 649 } 650 651 /** 652 * A deserialized serialized queue has same elements in same order 653 */ 654 public void testSerialization() throws Exception { 655 Queue x = populatedQueue(SIZE); 656 Queue y = serialClone(x); 657 658 assertNotSame(y, x); 659 assertEquals(x.size(), y.size()); 660 assertEquals(x.toString(), y.toString()); 661 assertTrue(Arrays.equals(x.toArray(), y.toArray())); 662 while (!x.isEmpty()) { 663 assertFalse(y.isEmpty()); 664 assertEquals(x.remove(), y.remove()); 665 } 666 assertTrue(y.isEmpty()); 667 } 668 669 /** 670 * drainTo(c) empties queue into another collection c 671 */ 672 public void testDrainTo() { 673 LinkedTransferQueue q = populatedQueue(SIZE); 674 ArrayList l = new ArrayList(); 675 q.drainTo(l); 676 assertEquals(0, q.size()); 677 assertEquals(SIZE, l.size()); 678 for (int i = 0; i < SIZE; ++i) { 679 assertEquals(i, l.get(i)); 680 } 681 q.add(zero); 682 q.add(one); 683 assertFalse(q.isEmpty()); 684 assertTrue(q.contains(zero)); 685 assertTrue(q.contains(one)); 686 l.clear(); 687 q.drainTo(l); 688 assertEquals(0, q.size()); 689 assertEquals(2, l.size()); 690 for (int i = 0; i < 2; ++i) { 691 assertEquals(i, l.get(i)); 692 } 693 } 694 695 /** 696 * drainTo(c) empties full queue, unblocking a waiting put. 697 */ 698 public void testDrainToWithActivePut() throws InterruptedException { 699 final LinkedTransferQueue q = populatedQueue(SIZE); 700 Thread t = newStartedThread(new CheckedRunnable() { 701 public void realRun() { 702 q.put(SIZE + 1); 703 }}); 704 ArrayList l = new ArrayList(); 705 q.drainTo(l); 706 assertTrue(l.size() >= SIZE); 707 for (int i = 0; i < SIZE; ++i) 708 assertEquals(i, l.get(i)); 709 awaitTermination(t); 710 assertTrue(q.size() + l.size() >= SIZE); 711 } 712 713 /** 714 * drainTo(c, n) empties first min(n, size) elements of queue into c 715 */ 716 public void testDrainToN() { 717 LinkedTransferQueue q = new LinkedTransferQueue(); 718 for (int i = 0; i < SIZE + 2; ++i) { 719 for (int j = 0; j < SIZE; j++) { 720 assertTrue(q.offer(j)); 721 } 722 ArrayList l = new ArrayList(); 723 q.drainTo(l, i); 724 int k = (i < SIZE) ? i : SIZE; 725 assertEquals(k, l.size()); 726 assertEquals(SIZE - k, q.size()); 727 for (int j = 0; j < k; ++j) 728 assertEquals(j, l.get(j)); 729 do {} while (q.poll() != null); 730 } 731 } 732 733 /** 734 * timed poll() or take() increments the waiting consumer count; 735 * offer(e) decrements the waiting consumer count 736 */ 737 public void testWaitingConsumer() throws InterruptedException { 738 final LinkedTransferQueue q = new LinkedTransferQueue(); 739 assertEquals(0, q.getWaitingConsumerCount()); 740 assertFalse(q.hasWaitingConsumer()); 741 final CountDownLatch threadStarted = new CountDownLatch(1); 742 743 Thread t = newStartedThread(new CheckedRunnable() { 744 public void realRun() throws InterruptedException { 745 threadStarted.countDown(); 746 long startTime = System.nanoTime(); 747 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS)); 748 assertEquals(0, q.getWaitingConsumerCount()); 749 assertFalse(q.hasWaitingConsumer()); 750 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 751 }}); 752 753 threadStarted.await(); 754 Callable<Boolean> oneConsumer 755 = new Callable<Boolean>() { public Boolean call() { 756 return q.hasWaitingConsumer() 757 && q.getWaitingConsumerCount() == 1; }}; 758 waitForThreadToEnterWaitState(t, oneConsumer); 759 760 assertTrue(q.offer(one)); 761 assertEquals(0, q.getWaitingConsumerCount()); 762 assertFalse(q.hasWaitingConsumer()); 763 764 awaitTermination(t); 765 } 766 767 /** 768 * transfer(null) throws NullPointerException 769 */ 770 public void testTransfer1() throws InterruptedException { 771 try { 772 LinkedTransferQueue q = new LinkedTransferQueue(); 773 q.transfer(null); 774 shouldThrow(); 775 } catch (NullPointerException success) {} 776 } 777 778 /** 779 * transfer waits until a poll occurs. The transfered element 780 * is returned by this associated poll. 781 */ 782 public void testTransfer2() throws InterruptedException { 783 final LinkedTransferQueue<Integer> q 784 = new LinkedTransferQueue<Integer>(); 785 final CountDownLatch threadStarted = new CountDownLatch(1); 786 787 Thread t = newStartedThread(new CheckedRunnable() { 788 public void realRun() throws InterruptedException { 789 threadStarted.countDown(); 790 q.transfer(five); 791 checkEmpty(q); 792 }}); 793 794 threadStarted.await(); 795 Callable<Boolean> oneElement 796 = new Callable<Boolean>() { public Boolean call() { 797 return !q.isEmpty() && q.size() == 1; }}; 798 waitForThreadToEnterWaitState(t, oneElement); 799 800 assertSame(five, q.poll()); 801 checkEmpty(q); 802 awaitTermination(t); 803 } 804 805 /** 806 * transfer waits until a poll occurs, and then transfers in fifo order 807 */ 808 public void testTransfer3() throws InterruptedException { 809 final LinkedTransferQueue<Integer> q 810 = new LinkedTransferQueue<Integer>(); 811 812 Thread first = newStartedThread(new CheckedRunnable() { 813 public void realRun() throws InterruptedException { 814 q.transfer(four); 815 assertTrue(!q.contains(four)); 816 assertEquals(1, q.size()); 817 }}); 818 819 Thread interruptedThread = newStartedThread( 820 new CheckedInterruptedRunnable() { 821 public void realRun() throws InterruptedException { 822 while (q.isEmpty()) 823 Thread.yield(); 824 q.transfer(five); 825 }}); 826 827 while (q.size() < 2) 828 Thread.yield(); 829 assertEquals(2, q.size()); 830 assertSame(four, q.poll()); 831 first.join(); 832 assertEquals(1, q.size()); 833 interruptedThread.interrupt(); 834 interruptedThread.join(); 835 checkEmpty(q); 836 } 837 838 /** 839 * transfer waits until a poll occurs, at which point the polling 840 * thread returns the element 841 */ 842 public void testTransfer4() throws InterruptedException { 843 final LinkedTransferQueue q = new LinkedTransferQueue(); 844 845 Thread t = newStartedThread(new CheckedRunnable() { 846 public void realRun() throws InterruptedException { 847 q.transfer(four); 848 assertFalse(q.contains(four)); 849 assertSame(three, q.poll()); 850 }}); 851 852 while (q.isEmpty()) 853 Thread.yield(); 854 assertFalse(q.isEmpty()); 855 assertEquals(1, q.size()); 856 assertTrue(q.offer(three)); 857 assertSame(four, q.poll()); 858 awaitTermination(t); 859 } 860 861 /** 862 * transfer waits until a take occurs. The transfered element 863 * is returned by this associated take. 864 */ 865 public void testTransfer5() throws InterruptedException { 866 final LinkedTransferQueue<Integer> q 867 = new LinkedTransferQueue<Integer>(); 868 869 Thread t = newStartedThread(new CheckedRunnable() { 870 public void realRun() throws InterruptedException { 871 q.transfer(four); 872 checkEmpty(q); 873 }}); 874 875 while (q.isEmpty()) 876 Thread.yield(); 877 assertFalse(q.isEmpty()); 878 assertEquals(1, q.size()); 879 assertSame(four, q.take()); 880 checkEmpty(q); 881 awaitTermination(t); 882 } 883 884 /** 885 * tryTransfer(null) throws NullPointerException 886 */ 887 public void testTryTransfer1() { 888 final LinkedTransferQueue q = new LinkedTransferQueue(); 889 try { 890 q.tryTransfer(null); 891 shouldThrow(); 892 } catch (NullPointerException success) {} 893 } 894 895 /** 896 * tryTransfer returns false and does not enqueue if there are no 897 * consumers waiting to poll or take. 898 */ 899 public void testTryTransfer2() throws InterruptedException { 900 final LinkedTransferQueue q = new LinkedTransferQueue(); 901 assertFalse(q.tryTransfer(new Object())); 902 assertFalse(q.hasWaitingConsumer()); 903 checkEmpty(q); 904 } 905 906 /** 907 * If there is a consumer waiting in timed poll, tryTransfer 908 * returns true while successfully transfering object. 909 */ 910 public void testTryTransfer3() throws InterruptedException { 911 final Object hotPotato = new Object(); 912 final LinkedTransferQueue q = new LinkedTransferQueue(); 913 914 Thread t = newStartedThread(new CheckedRunnable() { 915 public void realRun() { 916 while (! q.hasWaitingConsumer()) 917 Thread.yield(); 918 assertTrue(q.hasWaitingConsumer()); 919 checkEmpty(q); 920 assertTrue(q.tryTransfer(hotPotato)); 921 }}); 922 923 long startTime = System.nanoTime(); 924 assertSame(hotPotato, q.poll(LONG_DELAY_MS, MILLISECONDS)); 925 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 926 checkEmpty(q); 927 awaitTermination(t); 928 } 929 930 /** 931 * If there is a consumer waiting in take, tryTransfer returns 932 * true while successfully transfering object. 933 */ 934 public void testTryTransfer4() throws InterruptedException { 935 final Object hotPotato = new Object(); 936 final LinkedTransferQueue q = new LinkedTransferQueue(); 937 938 Thread t = newStartedThread(new CheckedRunnable() { 939 public void realRun() { 940 while (! q.hasWaitingConsumer()) 941 Thread.yield(); 942 assertTrue(q.hasWaitingConsumer()); 943 checkEmpty(q); 944 assertTrue(q.tryTransfer(hotPotato)); 945 }}); 946 947 assertSame(q.take(), hotPotato); 948 checkEmpty(q); 949 awaitTermination(t); 950 } 951 952 /** 953 * tryTransfer blocks interruptibly if no takers 954 */ 955 public void testTryTransfer5() throws InterruptedException { 956 final LinkedTransferQueue q = new LinkedTransferQueue(); 957 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 958 assertTrue(q.isEmpty()); 959 960 Thread t = newStartedThread(new CheckedRunnable() { 961 public void realRun() throws InterruptedException { 962 long startTime = System.nanoTime(); 963 Thread.currentThread().interrupt(); 964 try { 965 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS); 966 shouldThrow(); 967 } catch (InterruptedException success) {} 968 assertFalse(Thread.interrupted()); 969 970 pleaseInterrupt.countDown(); 971 try { 972 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS); 973 shouldThrow(); 974 } catch (InterruptedException success) {} 975 assertFalse(Thread.interrupted()); 976 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 977 }}); 978 979 await(pleaseInterrupt); 980 assertThreadStaysAlive(t); 981 t.interrupt(); 982 awaitTermination(t); 983 checkEmpty(q); 984 } 985 986 /** 987 * tryTransfer gives up after the timeout and returns false 988 */ 989 public void testTryTransfer6() throws InterruptedException { 990 final LinkedTransferQueue q = new LinkedTransferQueue(); 991 992 Thread t = newStartedThread(new CheckedRunnable() { 993 public void realRun() throws InterruptedException { 994 long startTime = System.nanoTime(); 995 assertFalse(q.tryTransfer(new Object(), 996 timeoutMillis(), MILLISECONDS)); 997 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 998 checkEmpty(q); 999 }}); 1000 1001 awaitTermination(t); 1002 checkEmpty(q); 1003 } 1004 1005 /** 1006 * tryTransfer waits for any elements previously in to be removed 1007 * before transfering to a poll or take 1008 */ 1009 public void testTryTransfer7() throws InterruptedException { 1010 final LinkedTransferQueue q = new LinkedTransferQueue(); 1011 assertTrue(q.offer(four)); 1012 1013 Thread t = newStartedThread(new CheckedRunnable() { 1014 public void realRun() throws InterruptedException { 1015 long startTime = System.nanoTime(); 1016 assertTrue(q.tryTransfer(five, LONG_DELAY_MS, MILLISECONDS)); 1017 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 1018 checkEmpty(q); 1019 }}); 1020 1021 while (q.size() != 2) 1022 Thread.yield(); 1023 assertEquals(2, q.size()); 1024 assertSame(four, q.poll()); 1025 assertSame(five, q.poll()); 1026 checkEmpty(q); 1027 awaitTermination(t); 1028 } 1029 1030 /** 1031 * tryTransfer attempts to enqueue into the queue and fails 1032 * returning false not enqueueing and the successive poll is null 1033 */ 1034 public void testTryTransfer8() throws InterruptedException { 1035 final LinkedTransferQueue q = new LinkedTransferQueue(); 1036 assertTrue(q.offer(four)); 1037 assertEquals(1, q.size()); 1038 long startTime = System.nanoTime(); 1039 assertFalse(q.tryTransfer(five, timeoutMillis(), MILLISECONDS)); 1040 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 1041 assertEquals(1, q.size()); 1042 assertSame(four, q.poll()); 1043 assertNull(q.poll()); 1044 checkEmpty(q); 1045 } 1046 1047 private LinkedTransferQueue<Integer> populatedQueue(int n) { 1048 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>(); 1049 checkEmpty(q); 1050 for (int i = 0; i < n; i++) { 1051 assertEquals(i, q.size()); 1052 assertTrue(q.offer(i)); 1053 assertEquals(Integer.MAX_VALUE, q.remainingCapacity()); 1054 } 1055 assertFalse(q.isEmpty()); 1056 return q; 1057 } 1058 1059 /** 1060 * remove(null), contains(null) always return false 1061 */ 1062 public void testNeverContainsNull() { 1063 Collection<?>[] qs = { 1064 new LinkedTransferQueue<Object>(), 1065 populatedQueue(2), 1066 }; 1067 1068 for (Collection<?> q : qs) { 1069 assertFalse(q.contains(null)); 1070 assertFalse(q.remove(null)); 1071 } 1072 } 1073 } 1074