1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 * Other contributors include Andrew Wright, Jeffrey Hayes, 6 * Pat Fisher, Mike Judd. 7 */ 8 9 package jsr166; 10 11 import junit.framework.*; 12 import java.util.Arrays; 13 import java.util.ArrayList; 14 import java.util.Collection; 15 import java.util.Comparator; 16 import java.util.Iterator; 17 import java.util.NoSuchElementException; 18 import java.util.Queue; 19 import java.util.concurrent.PriorityBlockingQueue; 20 import java.util.concurrent.BlockingQueue; 21 import java.util.concurrent.CountDownLatch; 22 import java.util.concurrent.Executors; 23 import java.util.concurrent.ExecutorService; 24 import static java.util.concurrent.TimeUnit.MILLISECONDS; 25 26 public class PriorityBlockingQueueTest extends JSR166TestCase { 27 28 private static final int NOCAP = Integer.MAX_VALUE; 29 30 /** Sample Comparator */ 31 static class MyReverseComparator implements Comparator { compare(Object x, Object y)32 public int compare(Object x, Object y) { 33 return ((Comparable)y).compareTo(x); 34 } 35 } 36 37 /** 38 * Returns a new queue of given size containing consecutive 39 * Integers 0 ... n. 40 */ populatedQueue(int n)41 private PriorityBlockingQueue<Integer> populatedQueue(int n) { 42 PriorityBlockingQueue<Integer> q = 43 new PriorityBlockingQueue<Integer>(n); 44 assertTrue(q.isEmpty()); 45 for (int i = n-1; i >= 0; i-=2) 46 assertTrue(q.offer(new Integer(i))); 47 for (int i = (n & 1); i < n; i+=2) 48 assertTrue(q.offer(new Integer(i))); 49 assertFalse(q.isEmpty()); 50 assertEquals(NOCAP, q.remainingCapacity()); 51 assertEquals(n, q.size()); 52 return q; 53 } 54 55 /** 56 * A new queue has unbounded capacity 57 */ testConstructor1()58 public void testConstructor1() { 59 assertEquals(NOCAP, new PriorityBlockingQueue(SIZE).remainingCapacity()); 60 } 61 62 /** 63 * Constructor throws IAE if capacity argument nonpositive 64 */ testConstructor2()65 public void testConstructor2() { 66 try { 67 new PriorityBlockingQueue(0); 68 shouldThrow(); 69 } catch (IllegalArgumentException success) {} 70 } 71 72 /** 73 * Initializing from null Collection throws NPE 74 */ testConstructor3()75 public void testConstructor3() { 76 try { 77 new PriorityBlockingQueue(null); 78 shouldThrow(); 79 } catch (NullPointerException success) {} 80 } 81 82 /** 83 * Initializing from Collection of null elements throws NPE 84 */ testConstructor4()85 public void testConstructor4() { 86 Collection<Integer> elements = Arrays.asList(new Integer[SIZE]); 87 try { 88 new PriorityBlockingQueue(elements); 89 shouldThrow(); 90 } catch (NullPointerException success) {} 91 } 92 93 /** 94 * Initializing from Collection with some null elements throws NPE 95 */ testConstructor5()96 public void testConstructor5() { 97 Integer[] ints = new Integer[SIZE]; 98 for (int i = 0; i < SIZE-1; ++i) 99 ints[i] = i; 100 Collection<Integer> elements = Arrays.asList(ints); 101 try { 102 new PriorityBlockingQueue(elements); 103 shouldThrow(); 104 } catch (NullPointerException success) {} 105 } 106 107 /** 108 * Queue contains all elements of collection used to initialize 109 */ testConstructor6()110 public void testConstructor6() { 111 Integer[] ints = new Integer[SIZE]; 112 for (int i = 0; i < SIZE; ++i) 113 ints[i] = i; 114 PriorityBlockingQueue q = new PriorityBlockingQueue(Arrays.asList(ints)); 115 for (int i = 0; i < SIZE; ++i) 116 assertEquals(ints[i], q.poll()); 117 } 118 119 /** 120 * The comparator used in constructor is used 121 */ testConstructor7()122 public void testConstructor7() { 123 MyReverseComparator cmp = new MyReverseComparator(); 124 PriorityBlockingQueue q = new PriorityBlockingQueue(SIZE, cmp); 125 assertEquals(cmp, q.comparator()); 126 Integer[] ints = new Integer[SIZE]; 127 for (int i = 0; i < SIZE; ++i) 128 ints[i] = new Integer(i); 129 q.addAll(Arrays.asList(ints)); 130 for (int i = SIZE-1; i >= 0; --i) 131 assertEquals(ints[i], q.poll()); 132 } 133 134 /** 135 * isEmpty is true before add, false after 136 */ testEmpty()137 public void testEmpty() { 138 PriorityBlockingQueue q = new PriorityBlockingQueue(2); 139 assertTrue(q.isEmpty()); 140 assertEquals(NOCAP, q.remainingCapacity()); 141 q.add(one); 142 assertFalse(q.isEmpty()); 143 q.add(two); 144 q.remove(); 145 q.remove(); 146 assertTrue(q.isEmpty()); 147 } 148 149 /** 150 * remainingCapacity does not change when elements added or removed, 151 * but size does 152 */ testRemainingCapacity()153 public void testRemainingCapacity() { 154 PriorityBlockingQueue q = populatedQueue(SIZE); 155 for (int i = 0; i < SIZE; ++i) { 156 assertEquals(NOCAP, q.remainingCapacity()); 157 assertEquals(SIZE-i, q.size()); 158 q.remove(); 159 } 160 for (int i = 0; i < SIZE; ++i) { 161 assertEquals(NOCAP, q.remainingCapacity()); 162 assertEquals(i, q.size()); 163 q.add(new Integer(i)); 164 } 165 } 166 167 /** 168 * Offer of comparable element succeeds 169 */ testOffer()170 public void testOffer() { 171 PriorityBlockingQueue q = new PriorityBlockingQueue(1); 172 assertTrue(q.offer(zero)); 173 assertTrue(q.offer(one)); 174 } 175 176 /** 177 * Offer of non-Comparable throws CCE 178 */ testOfferNonComparable()179 public void testOfferNonComparable() { 180 try { 181 PriorityBlockingQueue q = new PriorityBlockingQueue(1); 182 q.offer(new Object()); 183 q.offer(new Object()); 184 q.offer(new Object()); 185 shouldThrow(); 186 } catch (ClassCastException success) {} 187 } 188 189 /** 190 * add of comparable succeeds 191 */ testAdd()192 public void testAdd() { 193 PriorityBlockingQueue q = new PriorityBlockingQueue(SIZE); 194 for (int i = 0; i < SIZE; ++i) { 195 assertEquals(i, q.size()); 196 assertTrue(q.add(new Integer(i))); 197 } 198 } 199 200 /** 201 * addAll(this) throws IAE 202 */ testAddAllSelf()203 public void testAddAllSelf() { 204 try { 205 PriorityBlockingQueue q = populatedQueue(SIZE); 206 q.addAll(q); 207 shouldThrow(); 208 } catch (IllegalArgumentException success) {} 209 } 210 211 /** 212 * addAll of a collection with any null elements throws NPE after 213 * possibly adding some elements 214 */ testAddAll3()215 public void testAddAll3() { 216 try { 217 PriorityBlockingQueue q = new PriorityBlockingQueue(SIZE); 218 Integer[] ints = new Integer[SIZE]; 219 for (int i = 0; i < SIZE-1; ++i) 220 ints[i] = new Integer(i); 221 q.addAll(Arrays.asList(ints)); 222 shouldThrow(); 223 } catch (NullPointerException success) {} 224 } 225 226 /** 227 * Queue contains all elements of successful addAll 228 */ testAddAll5()229 public void testAddAll5() { 230 Integer[] empty = new Integer[0]; 231 Integer[] ints = new Integer[SIZE]; 232 for (int i = SIZE-1; i >= 0; --i) 233 ints[i] = new Integer(i); 234 PriorityBlockingQueue q = new PriorityBlockingQueue(SIZE); 235 assertFalse(q.addAll(Arrays.asList(empty))); 236 assertTrue(q.addAll(Arrays.asList(ints))); 237 for (int i = 0; i < SIZE; ++i) 238 assertEquals(ints[i], q.poll()); 239 } 240 241 /** 242 * all elements successfully put are contained 243 */ testPut()244 public void testPut() { 245 PriorityBlockingQueue q = new PriorityBlockingQueue(SIZE); 246 for (int i = 0; i < SIZE; ++i) { 247 Integer I = new Integer(i); 248 q.put(I); 249 assertTrue(q.contains(I)); 250 } 251 assertEquals(SIZE, q.size()); 252 } 253 254 /** 255 * put doesn't block waiting for take 256 */ testPutWithTake()257 public void testPutWithTake() throws InterruptedException { 258 final PriorityBlockingQueue q = new PriorityBlockingQueue(2); 259 final int size = 4; 260 Thread t = newStartedThread(new CheckedRunnable() { 261 public void realRun() { 262 for (int i = 0; i < size; i++) 263 q.put(new Integer(0)); 264 }}); 265 266 awaitTermination(t); 267 assertEquals(size, q.size()); 268 q.take(); 269 } 270 271 /** 272 * timed offer does not time out 273 */ testTimedOffer()274 public void testTimedOffer() throws InterruptedException { 275 final PriorityBlockingQueue q = new PriorityBlockingQueue(2); 276 Thread t = newStartedThread(new CheckedRunnable() { 277 public void realRun() { 278 q.put(new Integer(0)); 279 q.put(new Integer(0)); 280 assertTrue(q.offer(new Integer(0), SHORT_DELAY_MS, MILLISECONDS)); 281 assertTrue(q.offer(new Integer(0), LONG_DELAY_MS, MILLISECONDS)); 282 }}); 283 284 awaitTermination(t); 285 } 286 287 /** 288 * take retrieves elements in priority order 289 */ testTake()290 public void testTake() throws InterruptedException { 291 PriorityBlockingQueue q = populatedQueue(SIZE); 292 for (int i = 0; i < SIZE; ++i) { 293 assertEquals(i, q.take()); 294 } 295 } 296 297 /** 298 * Take removes existing elements until empty, then blocks interruptibly 299 */ testBlockingTake()300 public void testBlockingTake() throws InterruptedException { 301 final PriorityBlockingQueue q = populatedQueue(SIZE); 302 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 303 Thread t = newStartedThread(new CheckedRunnable() { 304 public void realRun() throws InterruptedException { 305 for (int i = 0; i < SIZE; ++i) { 306 assertEquals(i, q.take()); 307 } 308 309 Thread.currentThread().interrupt(); 310 try { 311 q.take(); 312 shouldThrow(); 313 } catch (InterruptedException success) {} 314 assertFalse(Thread.interrupted()); 315 316 pleaseInterrupt.countDown(); 317 try { 318 q.take(); 319 shouldThrow(); 320 } catch (InterruptedException success) {} 321 assertFalse(Thread.interrupted()); 322 }}); 323 324 await(pleaseInterrupt); 325 assertThreadStaysAlive(t); 326 t.interrupt(); 327 awaitTermination(t); 328 } 329 330 /** 331 * poll succeeds unless empty 332 */ testPoll()333 public void testPoll() { 334 PriorityBlockingQueue q = populatedQueue(SIZE); 335 for (int i = 0; i < SIZE; ++i) { 336 assertEquals(i, q.poll()); 337 } 338 assertNull(q.poll()); 339 } 340 341 /** 342 * timed poll with zero timeout succeeds when non-empty, else times out 343 */ testTimedPoll0()344 public void testTimedPoll0() throws InterruptedException { 345 PriorityBlockingQueue q = populatedQueue(SIZE); 346 for (int i = 0; i < SIZE; ++i) { 347 assertEquals(i, q.poll(0, MILLISECONDS)); 348 } 349 assertNull(q.poll(0, MILLISECONDS)); 350 } 351 352 /** 353 * timed poll with nonzero timeout succeeds when non-empty, else times out 354 */ testTimedPoll()355 public void testTimedPoll() throws InterruptedException { 356 PriorityBlockingQueue<Integer> q = populatedQueue(SIZE); 357 for (int i = 0; i < SIZE; ++i) { 358 long startTime = System.nanoTime(); 359 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS)); 360 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 361 } 362 long startTime = System.nanoTime(); 363 assertNull(q.poll(timeoutMillis(), MILLISECONDS)); 364 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 365 checkEmpty(q); 366 } 367 368 /** 369 * Interrupted timed poll throws InterruptedException instead of 370 * returning timeout status 371 */ testInterruptedTimedPoll()372 public void testInterruptedTimedPoll() throws InterruptedException { 373 final BlockingQueue<Integer> q = populatedQueue(SIZE); 374 final CountDownLatch aboutToWait = new CountDownLatch(1); 375 Thread t = newStartedThread(new CheckedRunnable() { 376 public void realRun() throws InterruptedException { 377 for (int i = 0; i < SIZE; ++i) { 378 long t0 = System.nanoTime(); 379 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS)); 380 assertTrue(millisElapsedSince(t0) < SMALL_DELAY_MS); 381 } 382 long t0 = System.nanoTime(); 383 aboutToWait.countDown(); 384 try { 385 q.poll(LONG_DELAY_MS, MILLISECONDS); 386 shouldThrow(); 387 } catch (InterruptedException success) { 388 assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS); 389 } 390 }}); 391 392 aboutToWait.await(); 393 waitForThreadToEnterWaitState(t, SMALL_DELAY_MS); 394 t.interrupt(); 395 awaitTermination(t, MEDIUM_DELAY_MS); 396 } 397 398 /** 399 * peek returns next element, or null if empty 400 */ testPeek()401 public void testPeek() { 402 PriorityBlockingQueue q = populatedQueue(SIZE); 403 for (int i = 0; i < SIZE; ++i) { 404 assertEquals(i, q.peek()); 405 assertEquals(i, q.poll()); 406 assertTrue(q.peek() == null || 407 !q.peek().equals(i)); 408 } 409 assertNull(q.peek()); 410 } 411 412 /** 413 * element returns next element, or throws NSEE if empty 414 */ testElement()415 public void testElement() { 416 PriorityBlockingQueue q = populatedQueue(SIZE); 417 for (int i = 0; i < SIZE; ++i) { 418 assertEquals(i, q.element()); 419 assertEquals(i, q.poll()); 420 } 421 try { 422 q.element(); 423 shouldThrow(); 424 } catch (NoSuchElementException success) {} 425 } 426 427 /** 428 * remove removes next element, or throws NSEE if empty 429 */ testRemove()430 public void testRemove() { 431 PriorityBlockingQueue q = populatedQueue(SIZE); 432 for (int i = 0; i < SIZE; ++i) { 433 assertEquals(i, q.remove()); 434 } 435 try { 436 q.remove(); 437 shouldThrow(); 438 } catch (NoSuchElementException success) {} 439 } 440 441 /** 442 * contains(x) reports true when elements added but not yet removed 443 */ testContains()444 public void testContains() { 445 PriorityBlockingQueue q = populatedQueue(SIZE); 446 for (int i = 0; i < SIZE; ++i) { 447 assertTrue(q.contains(new Integer(i))); 448 q.poll(); 449 assertFalse(q.contains(new Integer(i))); 450 } 451 } 452 453 /** 454 * clear removes all elements 455 */ testClear()456 public void testClear() { 457 PriorityBlockingQueue q = populatedQueue(SIZE); 458 q.clear(); 459 assertTrue(q.isEmpty()); 460 assertEquals(0, q.size()); 461 q.add(one); 462 assertFalse(q.isEmpty()); 463 assertTrue(q.contains(one)); 464 q.clear(); 465 assertTrue(q.isEmpty()); 466 } 467 468 /** 469 * containsAll(c) is true when c contains a subset of elements 470 */ testContainsAll()471 public void testContainsAll() { 472 PriorityBlockingQueue q = populatedQueue(SIZE); 473 PriorityBlockingQueue p = new PriorityBlockingQueue(SIZE); 474 for (int i = 0; i < SIZE; ++i) { 475 assertTrue(q.containsAll(p)); 476 assertFalse(p.containsAll(q)); 477 p.add(new Integer(i)); 478 } 479 assertTrue(p.containsAll(q)); 480 } 481 482 /** 483 * retainAll(c) retains only those elements of c and reports true if changed 484 */ testRetainAll()485 public void testRetainAll() { 486 PriorityBlockingQueue q = populatedQueue(SIZE); 487 PriorityBlockingQueue p = populatedQueue(SIZE); 488 for (int i = 0; i < SIZE; ++i) { 489 boolean changed = q.retainAll(p); 490 if (i == 0) 491 assertFalse(changed); 492 else 493 assertTrue(changed); 494 495 assertTrue(q.containsAll(p)); 496 assertEquals(SIZE-i, q.size()); 497 p.remove(); 498 } 499 } 500 501 /** 502 * removeAll(c) removes only those elements of c and reports true if changed 503 */ testRemoveAll()504 public void testRemoveAll() { 505 for (int i = 1; i < SIZE; ++i) { 506 PriorityBlockingQueue q = populatedQueue(SIZE); 507 PriorityBlockingQueue p = populatedQueue(i); 508 assertTrue(q.removeAll(p)); 509 assertEquals(SIZE-i, q.size()); 510 for (int j = 0; j < i; ++j) { 511 Integer I = (Integer)(p.remove()); 512 assertFalse(q.contains(I)); 513 } 514 } 515 } 516 517 /** 518 * toArray contains all elements 519 */ testToArray()520 public void testToArray() throws InterruptedException { 521 PriorityBlockingQueue q = populatedQueue(SIZE); 522 Object[] o = q.toArray(); 523 Arrays.sort(o); 524 for (int i = 0; i < o.length; i++) 525 assertSame(o[i], q.take()); 526 } 527 528 /** 529 * toArray(a) contains all elements 530 */ testToArray2()531 public void testToArray2() throws InterruptedException { 532 PriorityBlockingQueue<Integer> q = populatedQueue(SIZE); 533 Integer[] ints = new Integer[SIZE]; 534 Integer[] array = q.toArray(ints); 535 assertSame(ints, array); 536 Arrays.sort(ints); 537 for (int i = 0; i < ints.length; i++) 538 assertSame(ints[i], q.take()); 539 } 540 541 /** 542 * toArray(incompatible array type) throws ArrayStoreException 543 */ testToArray1_BadArg()544 public void testToArray1_BadArg() { 545 PriorityBlockingQueue q = populatedQueue(SIZE); 546 try { 547 q.toArray(new String[10]); 548 shouldThrow(); 549 } catch (ArrayStoreException success) {} 550 } 551 552 /** 553 * iterator iterates through all elements 554 */ testIterator()555 public void testIterator() { 556 PriorityBlockingQueue q = populatedQueue(SIZE); 557 int i = 0; 558 Iterator it = q.iterator(); 559 while (it.hasNext()) { 560 assertTrue(q.contains(it.next())); 561 ++i; 562 } 563 assertEquals(i, SIZE); 564 } 565 566 /** 567 * iterator.remove removes current element 568 */ testIteratorRemove()569 public void testIteratorRemove() { 570 final PriorityBlockingQueue q = new PriorityBlockingQueue(3); 571 q.add(new Integer(2)); 572 q.add(new Integer(1)); 573 q.add(new Integer(3)); 574 575 Iterator it = q.iterator(); 576 it.next(); 577 it.remove(); 578 579 it = q.iterator(); 580 assertEquals(it.next(), new Integer(2)); 581 assertEquals(it.next(), new Integer(3)); 582 assertFalse(it.hasNext()); 583 } 584 585 /** 586 * toString contains toStrings of elements 587 */ testToString()588 public void testToString() { 589 PriorityBlockingQueue q = populatedQueue(SIZE); 590 String s = q.toString(); 591 for (int i = 0; i < SIZE; ++i) { 592 assertTrue(s.contains(String.valueOf(i))); 593 } 594 } 595 596 /** 597 * timed poll transfers elements across Executor tasks 598 */ testPollInExecutor()599 public void testPollInExecutor() { 600 final PriorityBlockingQueue q = new PriorityBlockingQueue(2); 601 final CheckedBarrier threadsStarted = new CheckedBarrier(2); 602 ExecutorService executor = Executors.newFixedThreadPool(2); 603 executor.execute(new CheckedRunnable() { 604 public void realRun() throws InterruptedException { 605 assertNull(q.poll()); 606 threadsStarted.await(); 607 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS)); 608 checkEmpty(q); 609 }}); 610 611 executor.execute(new CheckedRunnable() { 612 public void realRun() throws InterruptedException { 613 threadsStarted.await(); 614 q.put(one); 615 }}); 616 617 joinPool(executor); 618 } 619 620 /** 621 * A deserialized serialized queue has same elements 622 */ testSerialization()623 public void testSerialization() throws Exception { 624 Queue x = populatedQueue(SIZE); 625 Queue y = serialClone(x); 626 627 assertNotSame(x, y); 628 assertEquals(x.size(), y.size()); 629 while (!x.isEmpty()) { 630 assertFalse(y.isEmpty()); 631 assertEquals(x.remove(), y.remove()); 632 } 633 assertTrue(y.isEmpty()); 634 } 635 636 /** 637 * drainTo(c) empties queue into another collection c 638 */ testDrainTo()639 public void testDrainTo() { 640 PriorityBlockingQueue q = populatedQueue(SIZE); 641 ArrayList l = new ArrayList(); 642 q.drainTo(l); 643 assertEquals(0, q.size()); 644 assertEquals(SIZE, l.size()); 645 for (int i = 0; i < SIZE; ++i) 646 assertEquals(l.get(i), new Integer(i)); 647 q.add(zero); 648 q.add(one); 649 assertFalse(q.isEmpty()); 650 assertTrue(q.contains(zero)); 651 assertTrue(q.contains(one)); 652 l.clear(); 653 q.drainTo(l); 654 assertEquals(0, q.size()); 655 assertEquals(2, l.size()); 656 for (int i = 0; i < 2; ++i) 657 assertEquals(l.get(i), new Integer(i)); 658 } 659 660 /** 661 * drainTo empties queue 662 */ testDrainToWithActivePut()663 public void testDrainToWithActivePut() throws InterruptedException { 664 final PriorityBlockingQueue q = populatedQueue(SIZE); 665 Thread t = new Thread(new CheckedRunnable() { 666 public void realRun() { 667 q.put(new Integer(SIZE+1)); 668 }}); 669 670 t.start(); 671 ArrayList l = new ArrayList(); 672 q.drainTo(l); 673 assertTrue(l.size() >= SIZE); 674 for (int i = 0; i < SIZE; ++i) 675 assertEquals(l.get(i), new Integer(i)); 676 t.join(); 677 assertTrue(q.size() + l.size() >= SIZE); 678 } 679 680 /** 681 * drainTo(c, n) empties first min(n, size) elements of queue into c 682 */ testDrainToN()683 public void testDrainToN() { 684 PriorityBlockingQueue q = new PriorityBlockingQueue(SIZE*2); 685 for (int i = 0; i < SIZE + 2; ++i) { 686 for (int j = 0; j < SIZE; j++) 687 assertTrue(q.offer(new Integer(j))); 688 ArrayList l = new ArrayList(); 689 q.drainTo(l, i); 690 int k = (i < SIZE) ? i : SIZE; 691 assertEquals(k, l.size()); 692 assertEquals(SIZE-k, q.size()); 693 for (int j = 0; j < k; ++j) 694 assertEquals(l.get(j), new Integer(j)); 695 while (q.poll() != null) ; 696 } 697 } 698 699 } 700