1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 * Other contributors include John Vint 6 */ 7 8 package jsr166; 9 10 import junit.framework.*; 11 import java.util.Arrays; 12 import java.util.ArrayList; 13 import java.util.Collection; 14 import java.util.Iterator; 15 import java.util.List; 16 import java.util.NoSuchElementException; 17 import java.util.Queue; 18 import java.util.concurrent.BlockingQueue; 19 import java.util.concurrent.CountDownLatch; 20 import java.util.concurrent.Executors; 21 import java.util.concurrent.ExecutorService; 22 import java.util.concurrent.LinkedTransferQueue; 23 import static java.util.concurrent.TimeUnit.MILLISECONDS; 24 import static java.util.concurrent.TimeUnit.NANOSECONDS; 25 26 @SuppressWarnings({"unchecked", "rawtypes"}) 27 public class LinkedTransferQueueTest extends BlockingQueueTest { 28 emptyCollection()29 protected BlockingQueue emptyCollection() { 30 return new LinkedTransferQueue(); 31 } 32 33 /** 34 * Constructor builds new queue with size being zero and empty 35 * being true 36 */ testConstructor1()37 public void testConstructor1() { 38 assertEquals(0, new LinkedTransferQueue().size()); 39 assertTrue(new LinkedTransferQueue().isEmpty()); 40 } 41 42 /** 43 * Initializing constructor with null collection throws 44 * NullPointerException 45 */ testConstructor2()46 public void testConstructor2() { 47 try { 48 new LinkedTransferQueue(null); 49 shouldThrow(); 50 } catch (NullPointerException success) {} 51 } 52 53 /** 54 * Initializing from Collection of null elements throws 55 * NullPointerException 56 */ testConstructor3()57 public void testConstructor3() { 58 Collection<Integer> elements = Arrays.asList(new Integer[SIZE]); 59 try { 60 new LinkedTransferQueue(elements); 61 shouldThrow(); 62 } catch (NullPointerException success) {} 63 } 64 65 /** 66 * Initializing constructor with a collection containing some null elements 67 * throws NullPointerException 68 */ testConstructor4()69 public void testConstructor4() { 70 Integer[] ints = new Integer[SIZE]; 71 for (int i = 0; i < SIZE-1; ++i) 72 ints[i] = i; 73 Collection<Integer> elements = Arrays.asList(ints); 74 try { 75 new LinkedTransferQueue(elements); 76 shouldThrow(); 77 } catch (NullPointerException success) {} 78 } 79 80 /** 81 * Queue contains all elements of the collection it is initialized by 82 */ testConstructor5()83 public void testConstructor5() { 84 Integer[] ints = new Integer[SIZE]; 85 for (int i = 0; i < SIZE; ++i) { 86 ints[i] = i; 87 } 88 List intList = Arrays.asList(ints); 89 LinkedTransferQueue q 90 = new LinkedTransferQueue(intList); 91 assertEquals(q.size(), intList.size()); 92 assertEquals(q.toString(), intList.toString()); 93 assertTrue(Arrays.equals(q.toArray(), 94 intList.toArray())); 95 assertTrue(Arrays.equals(q.toArray(new Object[0]), 96 intList.toArray(new Object[0]))); 97 assertTrue(Arrays.equals(q.toArray(new Object[SIZE]), 98 intList.toArray(new Object[SIZE]))); 99 for (int i = 0; i < SIZE; ++i) { 100 assertEquals(ints[i], q.poll()); 101 } 102 } 103 104 /** 105 * remainingCapacity() always returns Integer.MAX_VALUE 106 */ testRemainingCapacity()107 public void testRemainingCapacity() { 108 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 109 for (int i = 0; i < SIZE; ++i) { 110 assertEquals(Integer.MAX_VALUE, q.remainingCapacity()); 111 assertEquals(SIZE - i, q.size()); 112 q.remove(); 113 } 114 for (int i = 0; i < SIZE; ++i) { 115 assertEquals(Integer.MAX_VALUE, q.remainingCapacity()); 116 assertEquals(i, q.size()); 117 q.add(i); 118 } 119 } 120 121 /** 122 * addAll(this) throws IllegalArgumentException 123 */ testAddAllSelf()124 public void testAddAllSelf() { 125 try { 126 LinkedTransferQueue q = populatedQueue(SIZE); 127 q.addAll(q); 128 shouldThrow(); 129 } catch (IllegalArgumentException success) {} 130 } 131 132 /** 133 * addAll of a collection with any null elements throws 134 * NullPointerException after possibly adding some elements 135 */ testAddAll3()136 public void testAddAll3() { 137 try { 138 LinkedTransferQueue q = new LinkedTransferQueue(); 139 Integer[] ints = new Integer[SIZE]; 140 for (int i = 0; i < SIZE - 1; ++i) { 141 ints[i] = i; 142 } 143 q.addAll(Arrays.asList(ints)); 144 shouldThrow(); 145 } catch (NullPointerException success) {} 146 } 147 148 /** 149 * Queue contains all elements, in traversal order, of successful addAll 150 */ testAddAll5()151 public void testAddAll5() { 152 Integer[] empty = new Integer[0]; 153 Integer[] ints = new Integer[SIZE]; 154 for (int i = 0; i < SIZE; ++i) { 155 ints[i] = i; 156 } 157 LinkedTransferQueue q = new LinkedTransferQueue(); 158 assertFalse(q.addAll(Arrays.asList(empty))); 159 assertTrue(q.addAll(Arrays.asList(ints))); 160 for (int i = 0; i < SIZE; ++i) { 161 assertEquals(ints[i], q.poll()); 162 } 163 } 164 165 /** 166 * all elements successfully put are contained 167 */ testPut()168 public void testPut() { 169 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>(); 170 for (int i = 0; i < SIZE; ++i) { 171 assertEquals(i, q.size()); 172 q.put(i); 173 assertTrue(q.contains(i)); 174 } 175 } 176 177 /** 178 * take retrieves elements in FIFO order 179 */ testTake()180 public void testTake() throws InterruptedException { 181 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 182 for (int i = 0; i < SIZE; ++i) { 183 assertEquals(i, (int) q.take()); 184 } 185 } 186 187 /** 188 * take removes existing elements until empty, then blocks interruptibly 189 */ testBlockingTake()190 public void testBlockingTake() throws InterruptedException { 191 final BlockingQueue q = populatedQueue(SIZE); 192 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 193 Thread t = newStartedThread(new CheckedRunnable() { 194 public void realRun() throws InterruptedException { 195 for (int i = 0; i < SIZE; ++i) { 196 assertEquals(i, q.take()); 197 } 198 199 Thread.currentThread().interrupt(); 200 try { 201 q.take(); 202 shouldThrow(); 203 } catch (InterruptedException success) {} 204 assertFalse(Thread.interrupted()); 205 206 pleaseInterrupt.countDown(); 207 try { 208 q.take(); 209 shouldThrow(); 210 } catch (InterruptedException success) {} 211 assertFalse(Thread.interrupted()); 212 }}); 213 214 await(pleaseInterrupt); 215 assertThreadStaysAlive(t); 216 t.interrupt(); 217 awaitTermination(t); 218 } 219 220 /** 221 * poll succeeds unless empty 222 */ testPoll()223 public void testPoll() throws InterruptedException { 224 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 225 for (int i = 0; i < SIZE; ++i) { 226 assertEquals(i, (int) q.poll()); 227 } 228 assertNull(q.poll()); 229 checkEmpty(q); 230 } 231 232 /** 233 * timed poll with zero timeout succeeds when non-empty, else times out 234 */ testTimedPoll0()235 public void testTimedPoll0() throws InterruptedException { 236 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 237 for (int i = 0; i < SIZE; ++i) { 238 assertEquals(i, (int) q.poll(0, MILLISECONDS)); 239 } 240 assertNull(q.poll(0, MILLISECONDS)); 241 checkEmpty(q); 242 } 243 244 /** 245 * timed poll with nonzero timeout succeeds when non-empty, else times out 246 */ testTimedPoll()247 public void testTimedPoll() throws InterruptedException { 248 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 249 for (int i = 0; i < SIZE; ++i) { 250 long startTime = System.nanoTime(); 251 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS)); 252 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 253 } 254 long startTime = System.nanoTime(); 255 assertNull(q.poll(timeoutMillis(), MILLISECONDS)); 256 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 257 checkEmpty(q); 258 } 259 260 /** 261 * Interrupted timed poll throws InterruptedException instead of 262 * returning timeout status 263 */ testInterruptedTimedPoll()264 public void testInterruptedTimedPoll() throws InterruptedException { 265 final BlockingQueue<Integer> q = populatedQueue(SIZE); 266 final CountDownLatch aboutToWait = new CountDownLatch(1); 267 Thread t = newStartedThread(new CheckedRunnable() { 268 public void realRun() throws InterruptedException { 269 for (int i = 0; i < SIZE; ++i) { 270 long t0 = System.nanoTime(); 271 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS)); 272 assertTrue(millisElapsedSince(t0) < SMALL_DELAY_MS); 273 } 274 long t0 = System.nanoTime(); 275 aboutToWait.countDown(); 276 try { 277 q.poll(MEDIUM_DELAY_MS, MILLISECONDS); 278 shouldThrow(); 279 } catch (InterruptedException success) { 280 assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS); 281 } 282 }}); 283 284 aboutToWait.await(); 285 waitForThreadToEnterWaitState(t, SMALL_DELAY_MS); 286 t.interrupt(); 287 awaitTermination(t, MEDIUM_DELAY_MS); 288 checkEmpty(q); 289 } 290 291 /** 292 * timed poll after thread interrupted throws InterruptedException 293 * instead of returning timeout status 294 */ testTimedPollAfterInterrupt()295 public void testTimedPollAfterInterrupt() throws InterruptedException { 296 final BlockingQueue<Integer> q = populatedQueue(SIZE); 297 Thread t = newStartedThread(new CheckedRunnable() { 298 public void realRun() throws InterruptedException { 299 Thread.currentThread().interrupt(); 300 for (int i = 0; i < SIZE; ++i) { 301 long t0 = System.nanoTime(); 302 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS)); 303 assertTrue(millisElapsedSince(t0) < SMALL_DELAY_MS); 304 } 305 try { 306 q.poll(MEDIUM_DELAY_MS, MILLISECONDS); 307 shouldThrow(); 308 } catch (InterruptedException success) {} 309 }}); 310 311 awaitTermination(t, MEDIUM_DELAY_MS); 312 checkEmpty(q); 313 } 314 315 /** 316 * peek returns next element, or null if empty 317 */ testPeek()318 public void testPeek() throws InterruptedException { 319 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 320 for (int i = 0; i < SIZE; ++i) { 321 assertEquals(i, (int) q.peek()); 322 assertEquals(i, (int) q.poll()); 323 assertTrue(q.peek() == null || 324 i != (int) q.peek()); 325 } 326 assertNull(q.peek()); 327 checkEmpty(q); 328 } 329 330 /** 331 * element returns next element, or throws NoSuchElementException if empty 332 */ testElement()333 public void testElement() throws InterruptedException { 334 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 335 for (int i = 0; i < SIZE; ++i) { 336 assertEquals(i, (int) q.element()); 337 assertEquals(i, (int) q.poll()); 338 } 339 try { 340 q.element(); 341 shouldThrow(); 342 } catch (NoSuchElementException success) {} 343 checkEmpty(q); 344 } 345 346 /** 347 * remove removes next element, or throws NoSuchElementException if empty 348 */ testRemove()349 public void testRemove() throws InterruptedException { 350 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 351 for (int i = 0; i < SIZE; ++i) { 352 assertEquals(i, (int) q.remove()); 353 } 354 try { 355 q.remove(); 356 shouldThrow(); 357 } catch (NoSuchElementException success) {} 358 checkEmpty(q); 359 } 360 361 /** 362 * An add following remove(x) succeeds 363 */ testRemoveElementAndAdd()364 public void testRemoveElementAndAdd() throws InterruptedException { 365 LinkedTransferQueue q = new LinkedTransferQueue(); 366 assertTrue(q.add(one)); 367 assertTrue(q.add(two)); 368 assertTrue(q.remove(one)); 369 assertTrue(q.remove(two)); 370 assertTrue(q.add(three)); 371 assertSame(q.take(), three); 372 } 373 374 /** 375 * contains(x) reports true when elements added but not yet removed 376 */ testContains()377 public void testContains() { 378 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 379 for (int i = 0; i < SIZE; ++i) { 380 assertTrue(q.contains(i)); 381 assertEquals(i, (int) q.poll()); 382 assertFalse(q.contains(i)); 383 } 384 } 385 386 /** 387 * clear removes all elements 388 */ testClear()389 public void testClear() throws InterruptedException { 390 LinkedTransferQueue q = populatedQueue(SIZE); 391 q.clear(); 392 checkEmpty(q); 393 assertEquals(Integer.MAX_VALUE, q.remainingCapacity()); 394 q.add(one); 395 assertFalse(q.isEmpty()); 396 assertEquals(1, q.size()); 397 assertTrue(q.contains(one)); 398 q.clear(); 399 checkEmpty(q); 400 } 401 402 /** 403 * containsAll(c) is true when c contains a subset of elements 404 */ testContainsAll()405 public void testContainsAll() { 406 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 407 LinkedTransferQueue<Integer> p = new LinkedTransferQueue<Integer>(); 408 for (int i = 0; i < SIZE; ++i) { 409 assertTrue(q.containsAll(p)); 410 assertFalse(p.containsAll(q)); 411 p.add(i); 412 } 413 assertTrue(p.containsAll(q)); 414 } 415 416 /** 417 * retainAll(c) retains only those elements of c and reports true 418 * if changed 419 */ testRetainAll()420 public void testRetainAll() { 421 LinkedTransferQueue q = populatedQueue(SIZE); 422 LinkedTransferQueue p = populatedQueue(SIZE); 423 for (int i = 0; i < SIZE; ++i) { 424 boolean changed = q.retainAll(p); 425 if (i == 0) { 426 assertFalse(changed); 427 } else { 428 assertTrue(changed); 429 } 430 assertTrue(q.containsAll(p)); 431 assertEquals(SIZE - i, q.size()); 432 p.remove(); 433 } 434 } 435 436 /** 437 * removeAll(c) removes only those elements of c and reports true 438 * if changed 439 */ testRemoveAll()440 public void testRemoveAll() { 441 for (int i = 1; i < SIZE; ++i) { 442 LinkedTransferQueue q = populatedQueue(SIZE); 443 LinkedTransferQueue p = populatedQueue(i); 444 assertTrue(q.removeAll(p)); 445 assertEquals(SIZE - i, q.size()); 446 for (int j = 0; j < i; ++j) { 447 assertFalse(q.contains(p.remove())); 448 } 449 } 450 } 451 452 /** 453 * toArray() contains all elements in FIFO order 454 */ testToArray()455 public void testToArray() { 456 LinkedTransferQueue q = populatedQueue(SIZE); 457 Object[] o = q.toArray(); 458 for (int i = 0; i < o.length; i++) { 459 assertSame(o[i], q.poll()); 460 } 461 } 462 463 /** 464 * toArray(a) contains all elements in FIFO order 465 */ testToArray2()466 public void testToArray2() { 467 LinkedTransferQueue<Integer> q = populatedQueue(SIZE); 468 Integer[] ints = new Integer[SIZE]; 469 Integer[] array = q.toArray(ints); 470 assertSame(ints, array); 471 for (int i = 0; i < ints.length; i++) { 472 assertSame(ints[i], q.poll()); 473 } 474 } 475 476 /** 477 * toArray(incompatible array type) throws ArrayStoreException 478 */ testToArray1_BadArg()479 public void testToArray1_BadArg() { 480 LinkedTransferQueue q = populatedQueue(SIZE); 481 try { 482 q.toArray(new String[10]); 483 shouldThrow(); 484 } catch (ArrayStoreException success) {} 485 } 486 487 /** 488 * iterator iterates through all elements 489 */ testIterator()490 public void testIterator() throws InterruptedException { 491 LinkedTransferQueue q = populatedQueue(SIZE); 492 Iterator it = q.iterator(); 493 int i = 0; 494 while (it.hasNext()) { 495 assertEquals(it.next(), i++); 496 } 497 assertEquals(i, SIZE); 498 } 499 500 /** 501 * iterator.remove() removes current element 502 */ testIteratorRemove()503 public void testIteratorRemove() { 504 final LinkedTransferQueue q = new LinkedTransferQueue(); 505 q.add(two); 506 q.add(one); 507 q.add(three); 508 509 Iterator it = q.iterator(); 510 it.next(); 511 it.remove(); 512 513 it = q.iterator(); 514 assertSame(it.next(), one); 515 assertSame(it.next(), three); 516 assertFalse(it.hasNext()); 517 } 518 519 /** 520 * iterator ordering is FIFO 521 */ testIteratorOrdering()522 public void testIteratorOrdering() { 523 final LinkedTransferQueue<Integer> q 524 = new LinkedTransferQueue<Integer>(); 525 assertEquals(Integer.MAX_VALUE, q.remainingCapacity()); 526 q.add(one); 527 q.add(two); 528 q.add(three); 529 assertEquals(Integer.MAX_VALUE, q.remainingCapacity()); 530 int k = 0; 531 for (Integer n : q) { 532 assertEquals(++k, (int) n); 533 } 534 assertEquals(3, k); 535 } 536 537 /** 538 * Modifications do not cause iterators to fail 539 */ testWeaklyConsistentIteration()540 public void testWeaklyConsistentIteration() { 541 final LinkedTransferQueue q = new LinkedTransferQueue(); 542 q.add(one); 543 q.add(two); 544 q.add(three); 545 for (Iterator it = q.iterator(); it.hasNext();) { 546 q.remove(); 547 it.next(); 548 } 549 assertEquals(0, q.size()); 550 } 551 552 /** 553 * toString contains toStrings of elements 554 */ testToString()555 public void testToString() { 556 LinkedTransferQueue q = populatedQueue(SIZE); 557 String s = q.toString(); 558 for (int i = 0; i < SIZE; ++i) { 559 assertTrue(s.contains(String.valueOf(i))); 560 } 561 } 562 563 /** 564 * offer transfers elements across Executor tasks 565 */ testOfferInExecutor()566 public void testOfferInExecutor() { 567 final LinkedTransferQueue q = new LinkedTransferQueue(); 568 final CheckedBarrier threadsStarted = new CheckedBarrier(2); 569 ExecutorService executor = Executors.newFixedThreadPool(2); 570 571 executor.execute(new CheckedRunnable() { 572 public void realRun() throws InterruptedException { 573 threadsStarted.await(); 574 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS)); 575 }}); 576 577 executor.execute(new CheckedRunnable() { 578 public void realRun() throws InterruptedException { 579 threadsStarted.await(); 580 assertSame(one, q.take()); 581 checkEmpty(q); 582 }}); 583 584 joinPool(executor); 585 } 586 587 /** 588 * timed poll retrieves elements across Executor threads 589 */ testPollInExecutor()590 public void testPollInExecutor() { 591 final LinkedTransferQueue q = new LinkedTransferQueue(); 592 final CheckedBarrier threadsStarted = new CheckedBarrier(2); 593 ExecutorService executor = Executors.newFixedThreadPool(2); 594 595 executor.execute(new CheckedRunnable() { 596 public void realRun() throws InterruptedException { 597 assertNull(q.poll()); 598 threadsStarted.await(); 599 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS)); 600 checkEmpty(q); 601 }}); 602 603 executor.execute(new CheckedRunnable() { 604 public void realRun() throws InterruptedException { 605 threadsStarted.await(); 606 q.put(one); 607 }}); 608 609 joinPool(executor); 610 } 611 612 /** 613 * A deserialized serialized queue has same elements in same order 614 */ testSerialization()615 public void testSerialization() throws Exception { 616 Queue x = populatedQueue(SIZE); 617 Queue y = serialClone(x); 618 619 assertNotSame(y, x); 620 assertEquals(x.size(), y.size()); 621 assertEquals(x.toString(), y.toString()); 622 assertTrue(Arrays.equals(x.toArray(), y.toArray())); 623 while (!x.isEmpty()) { 624 assertFalse(y.isEmpty()); 625 assertEquals(x.remove(), y.remove()); 626 } 627 assertTrue(y.isEmpty()); 628 } 629 630 /** 631 * drainTo(c) empties queue into another collection c 632 */ testDrainTo()633 public void testDrainTo() { 634 LinkedTransferQueue q = populatedQueue(SIZE); 635 ArrayList l = new ArrayList(); 636 q.drainTo(l); 637 assertEquals(0, q.size()); 638 assertEquals(SIZE, l.size()); 639 for (int i = 0; i < SIZE; ++i) { 640 assertEquals(i, l.get(i)); 641 } 642 q.add(zero); 643 q.add(one); 644 assertFalse(q.isEmpty()); 645 assertTrue(q.contains(zero)); 646 assertTrue(q.contains(one)); 647 l.clear(); 648 q.drainTo(l); 649 assertEquals(0, q.size()); 650 assertEquals(2, l.size()); 651 for (int i = 0; i < 2; ++i) { 652 assertEquals(i, l.get(i)); 653 } 654 } 655 656 /** 657 * drainTo(c) empties full queue, unblocking a waiting put. 658 */ testDrainToWithActivePut()659 public void testDrainToWithActivePut() throws InterruptedException { 660 final LinkedTransferQueue q = populatedQueue(SIZE); 661 Thread t = newStartedThread(new CheckedRunnable() { 662 public void realRun() { 663 q.put(SIZE + 1); 664 }}); 665 ArrayList l = new ArrayList(); 666 q.drainTo(l); 667 assertTrue(l.size() >= SIZE); 668 for (int i = 0; i < SIZE; ++i) 669 assertEquals(i, l.get(i)); 670 awaitTermination(t, MEDIUM_DELAY_MS); 671 assertTrue(q.size() + l.size() >= SIZE); 672 } 673 674 /** 675 * drainTo(c, n) empties first min(n, size) elements of queue into c 676 */ testDrainToN()677 public void testDrainToN() { 678 LinkedTransferQueue q = new LinkedTransferQueue(); 679 for (int i = 0; i < SIZE + 2; ++i) { 680 for (int j = 0; j < SIZE; j++) { 681 assertTrue(q.offer(j)); 682 } 683 ArrayList l = new ArrayList(); 684 q.drainTo(l, i); 685 int k = (i < SIZE) ? i : SIZE; 686 assertEquals(k, l.size()); 687 assertEquals(SIZE - k, q.size()); 688 for (int j = 0; j < k; ++j) 689 assertEquals(j, l.get(j)); 690 while (q.poll() != null) 691 ; 692 } 693 } 694 695 /** 696 * timed poll() or take() increments the waiting consumer count; 697 * offer(e) decrements the waiting consumer count 698 */ testWaitingConsumer()699 public void testWaitingConsumer() throws InterruptedException { 700 final LinkedTransferQueue q = new LinkedTransferQueue(); 701 assertEquals(0, q.getWaitingConsumerCount()); 702 assertFalse(q.hasWaitingConsumer()); 703 final CountDownLatch threadStarted = new CountDownLatch(1); 704 705 Thread t = newStartedThread(new CheckedRunnable() { 706 public void realRun() throws InterruptedException { 707 threadStarted.countDown(); 708 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS)); 709 assertEquals(0, q.getWaitingConsumerCount()); 710 assertFalse(q.hasWaitingConsumer()); 711 }}); 712 713 threadStarted.await(); 714 waitForThreadToEnterWaitState(t, SMALL_DELAY_MS); 715 assertEquals(1, q.getWaitingConsumerCount()); 716 assertTrue(q.hasWaitingConsumer()); 717 718 assertTrue(q.offer(one)); 719 assertEquals(0, q.getWaitingConsumerCount()); 720 assertFalse(q.hasWaitingConsumer()); 721 722 awaitTermination(t, MEDIUM_DELAY_MS); 723 } 724 725 /** 726 * transfer(null) throws NullPointerException 727 */ testTransfer1()728 public void testTransfer1() throws InterruptedException { 729 try { 730 LinkedTransferQueue q = new LinkedTransferQueue(); 731 q.transfer(null); 732 shouldThrow(); 733 } catch (NullPointerException success) {} 734 } 735 736 /** 737 * transfer waits until a poll occurs. The transfered element 738 * is returned by this associated poll. 739 */ testTransfer2()740 public void testTransfer2() throws InterruptedException { 741 final LinkedTransferQueue<Integer> q 742 = new LinkedTransferQueue<Integer>(); 743 final CountDownLatch threadStarted = new CountDownLatch(1); 744 745 Thread t = newStartedThread(new CheckedRunnable() { 746 public void realRun() throws InterruptedException { 747 threadStarted.countDown(); 748 q.transfer(five); 749 checkEmpty(q); 750 }}); 751 752 threadStarted.await(); 753 waitForThreadToEnterWaitState(t, SMALL_DELAY_MS); 754 assertEquals(1, q.size()); 755 assertSame(five, q.poll()); 756 checkEmpty(q); 757 awaitTermination(t, MEDIUM_DELAY_MS); 758 } 759 760 /** 761 * transfer waits until a poll occurs, and then transfers in fifo order 762 */ testTransfer3()763 public void testTransfer3() throws InterruptedException { 764 final LinkedTransferQueue<Integer> q 765 = new LinkedTransferQueue<Integer>(); 766 767 Thread first = newStartedThread(new CheckedRunnable() { 768 public void realRun() throws InterruptedException { 769 q.transfer(four); 770 assertTrue(!q.contains(four)); 771 assertEquals(1, q.size()); 772 }}); 773 774 Thread interruptedThread = newStartedThread( 775 new CheckedInterruptedRunnable() { 776 public void realRun() throws InterruptedException { 777 while (q.isEmpty()) 778 Thread.yield(); 779 q.transfer(five); 780 }}); 781 782 while (q.size() < 2) 783 Thread.yield(); 784 assertEquals(2, q.size()); 785 assertSame(four, q.poll()); 786 first.join(); 787 assertEquals(1, q.size()); 788 interruptedThread.interrupt(); 789 interruptedThread.join(); 790 checkEmpty(q); 791 } 792 793 /** 794 * transfer waits until a poll occurs, at which point the polling 795 * thread returns the element 796 */ testTransfer4()797 public void testTransfer4() throws InterruptedException { 798 final LinkedTransferQueue q = new LinkedTransferQueue(); 799 800 Thread t = newStartedThread(new CheckedRunnable() { 801 public void realRun() throws InterruptedException { 802 q.transfer(four); 803 assertFalse(q.contains(four)); 804 assertSame(three, q.poll()); 805 }}); 806 807 while (q.isEmpty()) 808 Thread.yield(); 809 assertFalse(q.isEmpty()); 810 assertEquals(1, q.size()); 811 assertTrue(q.offer(three)); 812 assertSame(four, q.poll()); 813 awaitTermination(t, MEDIUM_DELAY_MS); 814 } 815 816 /** 817 * transfer waits until a take occurs. The transfered element 818 * is returned by this associated take. 819 */ testTransfer5()820 public void testTransfer5() throws InterruptedException { 821 final LinkedTransferQueue<Integer> q 822 = new LinkedTransferQueue<Integer>(); 823 824 Thread t = newStartedThread(new CheckedRunnable() { 825 public void realRun() throws InterruptedException { 826 q.transfer(four); 827 checkEmpty(q); 828 }}); 829 830 while (q.isEmpty()) 831 Thread.yield(); 832 assertFalse(q.isEmpty()); 833 assertEquals(1, q.size()); 834 assertSame(four, q.take()); 835 checkEmpty(q); 836 awaitTermination(t, MEDIUM_DELAY_MS); 837 } 838 839 /** 840 * tryTransfer(null) throws NullPointerException 841 */ testTryTransfer1()842 public void testTryTransfer1() { 843 try { 844 final LinkedTransferQueue q = new LinkedTransferQueue(); 845 q.tryTransfer(null); 846 shouldThrow(); 847 } catch (NullPointerException success) {} 848 } 849 850 /** 851 * tryTransfer returns false and does not enqueue if there are no 852 * consumers waiting to poll or take. 853 */ testTryTransfer2()854 public void testTryTransfer2() throws InterruptedException { 855 final LinkedTransferQueue q = new LinkedTransferQueue(); 856 assertFalse(q.tryTransfer(new Object())); 857 assertFalse(q.hasWaitingConsumer()); 858 checkEmpty(q); 859 } 860 861 /** 862 * If there is a consumer waiting in timed poll, tryTransfer 863 * returns true while successfully transfering object. 864 */ testTryTransfer3()865 public void testTryTransfer3() throws InterruptedException { 866 final Object hotPotato = new Object(); 867 final LinkedTransferQueue q = new LinkedTransferQueue(); 868 869 Thread t = newStartedThread(new CheckedRunnable() { 870 public void realRun() { 871 while (! q.hasWaitingConsumer()) 872 Thread.yield(); 873 assertTrue(q.hasWaitingConsumer()); 874 checkEmpty(q); 875 assertTrue(q.tryTransfer(hotPotato)); 876 }}); 877 878 assertSame(hotPotato, q.poll(MEDIUM_DELAY_MS, MILLISECONDS)); 879 checkEmpty(q); 880 awaitTermination(t, MEDIUM_DELAY_MS); 881 } 882 883 /** 884 * If there is a consumer waiting in take, tryTransfer returns 885 * true while successfully transfering object. 886 */ testTryTransfer4()887 public void testTryTransfer4() throws InterruptedException { 888 final Object hotPotato = new Object(); 889 final LinkedTransferQueue q = new LinkedTransferQueue(); 890 891 Thread t = newStartedThread(new CheckedRunnable() { 892 public void realRun() { 893 while (! q.hasWaitingConsumer()) 894 Thread.yield(); 895 assertTrue(q.hasWaitingConsumer()); 896 checkEmpty(q); 897 assertTrue(q.tryTransfer(hotPotato)); 898 }}); 899 900 assertSame(q.take(), hotPotato); 901 checkEmpty(q); 902 awaitTermination(t, MEDIUM_DELAY_MS); 903 } 904 905 /** 906 * tryTransfer blocks interruptibly if no takers 907 */ testTryTransfer5()908 public void testTryTransfer5() throws InterruptedException { 909 final LinkedTransferQueue q = new LinkedTransferQueue(); 910 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 911 assertTrue(q.isEmpty()); 912 913 Thread t = newStartedThread(new CheckedRunnable() { 914 public void realRun() throws InterruptedException { 915 Thread.currentThread().interrupt(); 916 try { 917 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS); 918 shouldThrow(); 919 } catch (InterruptedException success) {} 920 assertFalse(Thread.interrupted()); 921 922 pleaseInterrupt.countDown(); 923 try { 924 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS); 925 shouldThrow(); 926 } catch (InterruptedException success) {} 927 assertFalse(Thread.interrupted()); 928 }}); 929 930 await(pleaseInterrupt); 931 assertThreadStaysAlive(t); 932 t.interrupt(); 933 awaitTermination(t); 934 checkEmpty(q); 935 } 936 937 /** 938 * tryTransfer gives up after the timeout and returns false 939 */ testTryTransfer6()940 public void testTryTransfer6() throws InterruptedException { 941 final LinkedTransferQueue q = new LinkedTransferQueue(); 942 943 Thread t = newStartedThread(new CheckedRunnable() { 944 public void realRun() throws InterruptedException { 945 long t0 = System.nanoTime(); 946 assertFalse(q.tryTransfer(new Object(), 947 timeoutMillis(), MILLISECONDS)); 948 assertTrue(millisElapsedSince(t0) >= timeoutMillis()); 949 checkEmpty(q); 950 }}); 951 952 awaitTermination(t); 953 checkEmpty(q); 954 } 955 956 /** 957 * tryTransfer waits for any elements previously in to be removed 958 * before transfering to a poll or take 959 */ testTryTransfer7()960 public void testTryTransfer7() throws InterruptedException { 961 final LinkedTransferQueue q = new LinkedTransferQueue(); 962 assertTrue(q.offer(four)); 963 964 Thread t = newStartedThread(new CheckedRunnable() { 965 public void realRun() throws InterruptedException { 966 assertTrue(q.tryTransfer(five, MEDIUM_DELAY_MS, MILLISECONDS)); 967 checkEmpty(q); 968 }}); 969 970 while (q.size() != 2) 971 Thread.yield(); 972 assertEquals(2, q.size()); 973 assertSame(four, q.poll()); 974 assertSame(five, q.poll()); 975 checkEmpty(q); 976 awaitTermination(t, MEDIUM_DELAY_MS); 977 } 978 979 /** 980 * tryTransfer attempts to enqueue into the queue and fails 981 * returning false not enqueueing and the successive poll is null 982 */ testTryTransfer8()983 public void testTryTransfer8() throws InterruptedException { 984 final LinkedTransferQueue q = new LinkedTransferQueue(); 985 assertTrue(q.offer(four)); 986 assertEquals(1, q.size()); 987 long t0 = System.nanoTime(); 988 assertFalse(q.tryTransfer(five, timeoutMillis(), MILLISECONDS)); 989 assertTrue(millisElapsedSince(t0) >= timeoutMillis()); 990 assertEquals(1, q.size()); 991 assertSame(four, q.poll()); 992 assertNull(q.poll()); 993 checkEmpty(q); 994 } 995 populatedQueue(int n)996 private LinkedTransferQueue<Integer> populatedQueue(int n) { 997 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>(); 998 checkEmpty(q); 999 for (int i = 0; i < n; i++) { 1000 assertEquals(i, q.size()); 1001 assertTrue(q.offer(i)); 1002 assertEquals(Integer.MAX_VALUE, q.remainingCapacity()); 1003 } 1004 assertFalse(q.isEmpty()); 1005 return q; 1006 } 1007 } 1008