1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 * Other contributors include Andrew Wright, Jeffrey Hayes, 6 * Pat Fisher, Mike Judd. 7 */ 8 9 package jsr166; 10 11 import static java.util.concurrent.TimeUnit.MILLISECONDS; 12 13 import java.util.ArrayList; 14 import java.util.Arrays; 15 import java.util.Collection; 16 import java.util.Iterator; 17 import java.util.NoSuchElementException; 18 import java.util.concurrent.BlockingQueue; 19 import java.util.concurrent.CountDownLatch; 20 import java.util.concurrent.Executors; 21 import java.util.concurrent.ExecutorService; 22 import java.util.concurrent.SynchronousQueue; 23 24 import junit.framework.Test; 25 26 public class SynchronousQueueTest extends JSR166TestCase { 27 28 // android-note: These tests have been moved into their own separate 29 // classes to work around CTS issues. 30 // 31 // public static class Fair extends BlockingQueueTest { 32 // protected BlockingQueue emptyCollection() { 33 // return new SynchronousQueue(true); 34 // } 35 // } 36 37 // public static class NonFair extends BlockingQueueTest { 38 // protected BlockingQueue emptyCollection() { 39 // return new SynchronousQueue(false); 40 // } 41 // } 42 43 // android-note: Removed because the CTS runner does a bad job of 44 // retrying tests that have suite() declarations. 45 // 46 // public static void main(String[] args) { 47 // main(suite(), args); 48 // } 49 // public static Test suite() { 50 // return newTestSuite(SynchronousQueueTest.class, 51 // new Fair().testSuite(), 52 // new NonFair().testSuite()); 53 // } 54 55 /** 56 * Any SynchronousQueue is both empty and full 57 */ 58 public void testEmptyFull() { testEmptyFull(false); } 59 public void testEmptyFull_fair() { testEmptyFull(true); } 60 public void testEmptyFull(boolean fair) { 61 final SynchronousQueue q = new SynchronousQueue(fair); 62 assertTrue(q.isEmpty()); 63 assertEquals(0, q.size()); 64 assertEquals(0, q.remainingCapacity()); 65 assertFalse(q.offer(zero)); 66 } 67 68 /** 69 * offer fails if no active taker 70 */ 71 public void testOffer() { testOffer(false); } 72 public void testOffer_fair() { testOffer(true); } 73 public void testOffer(boolean fair) { 74 SynchronousQueue q = new SynchronousQueue(fair); 75 assertFalse(q.offer(one)); 76 } 77 78 /** 79 * add throws IllegalStateException if no active taker 80 */ 81 public void testAdd() { testAdd(false); } 82 public void testAdd_fair() { testAdd(true); } 83 public void testAdd(boolean fair) { 84 SynchronousQueue q = new SynchronousQueue(fair); 85 assertEquals(0, q.remainingCapacity()); 86 try { 87 q.add(one); 88 shouldThrow(); 89 } catch (IllegalStateException success) {} 90 } 91 92 /** 93 * addAll(this) throws IllegalArgumentException 94 */ 95 public void testAddAll_self() { testAddAll_self(false); } 96 public void testAddAll_self_fair() { testAddAll_self(true); } 97 public void testAddAll_self(boolean fair) { 98 SynchronousQueue q = new SynchronousQueue(fair); 99 try { 100 q.addAll(q); 101 shouldThrow(); 102 } catch (IllegalArgumentException success) {} 103 } 104 105 /** 106 * addAll throws ISE if no active taker 107 */ 108 public void testAddAll_ISE() { testAddAll_ISE(false); } 109 public void testAddAll_ISE_fair() { testAddAll_ISE(true); } 110 public void testAddAll_ISE(boolean fair) { 111 SynchronousQueue q = new SynchronousQueue(fair); 112 Integer[] ints = new Integer[1]; 113 for (int i = 0; i < ints.length; i++) 114 ints[i] = i; 115 Collection<Integer> coll = Arrays.asList(ints); 116 try { 117 q.addAll(coll); 118 shouldThrow(); 119 } catch (IllegalStateException success) {} 120 } 121 122 /** 123 * put blocks interruptibly if no active taker 124 */ 125 public void testBlockingPut() { testBlockingPut(false); } 126 public void testBlockingPut_fair() { testBlockingPut(true); } 127 public void testBlockingPut(boolean fair) { 128 final SynchronousQueue q = new SynchronousQueue(fair); 129 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 130 Thread t = newStartedThread(new CheckedRunnable() { 131 public void realRun() throws InterruptedException { 132 Thread.currentThread().interrupt(); 133 try { 134 q.put(99); 135 shouldThrow(); 136 } catch (InterruptedException success) {} 137 assertFalse(Thread.interrupted()); 138 139 pleaseInterrupt.countDown(); 140 try { 141 q.put(99); 142 shouldThrow(); 143 } catch (InterruptedException success) {} 144 assertFalse(Thread.interrupted()); 145 }}); 146 147 await(pleaseInterrupt); 148 assertThreadStaysAlive(t); 149 t.interrupt(); 150 awaitTermination(t); 151 assertEquals(0, q.remainingCapacity()); 152 } 153 154 /** 155 * put blocks interruptibly waiting for take 156 */ 157 public void testPutWithTake() { testPutWithTake(false); } 158 public void testPutWithTake_fair() { testPutWithTake(true); } 159 public void testPutWithTake(boolean fair) { 160 final SynchronousQueue q = new SynchronousQueue(fair); 161 final CountDownLatch pleaseTake = new CountDownLatch(1); 162 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 163 Thread t = newStartedThread(new CheckedRunnable() { 164 public void realRun() throws InterruptedException { 165 pleaseTake.countDown(); 166 q.put(one); 167 168 pleaseInterrupt.countDown(); 169 try { 170 q.put(99); 171 shouldThrow(); 172 } catch (InterruptedException success) {} 173 assertFalse(Thread.interrupted()); 174 }}); 175 176 await(pleaseTake); 177 assertEquals(0, q.remainingCapacity()); 178 try { assertSame(one, q.take()); } 179 catch (InterruptedException e) { threadUnexpectedException(e); } 180 181 await(pleaseInterrupt); 182 assertThreadStaysAlive(t); 183 t.interrupt(); 184 awaitTermination(t); 185 assertEquals(0, q.remainingCapacity()); 186 } 187 188 /** 189 * timed offer times out if elements not taken 190 */ 191 public void testTimedOffer() { testTimedOffer(false); } 192 public void testTimedOffer_fair() { testTimedOffer(true); } 193 public void testTimedOffer(boolean fair) { 194 final SynchronousQueue q = new SynchronousQueue(fair); 195 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 196 Thread t = newStartedThread(new CheckedRunnable() { 197 public void realRun() throws InterruptedException { 198 long startTime = System.nanoTime(); 199 assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS)); 200 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 201 pleaseInterrupt.countDown(); 202 try { 203 q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS); 204 shouldThrow(); 205 } catch (InterruptedException success) {} 206 }}); 207 208 await(pleaseInterrupt); 209 assertThreadStaysAlive(t); 210 t.interrupt(); 211 awaitTermination(t); 212 } 213 214 /** 215 * poll return null if no active putter 216 */ 217 public void testPoll() { testPoll(false); } 218 public void testPoll_fair() { testPoll(true); } 219 public void testPoll(boolean fair) { 220 final SynchronousQueue q = new SynchronousQueue(fair); 221 assertNull(q.poll()); 222 } 223 224 /** 225 * timed poll with zero timeout times out if no active putter 226 */ 227 public void testTimedPoll0() { testTimedPoll0(false); } 228 public void testTimedPoll0_fair() { testTimedPoll0(true); } 229 public void testTimedPoll0(boolean fair) { 230 final SynchronousQueue q = new SynchronousQueue(fair); 231 try { assertNull(q.poll(0, MILLISECONDS)); } 232 catch (InterruptedException e) { threadUnexpectedException(e); } 233 } 234 235 /** 236 * timed poll with nonzero timeout times out if no active putter 237 */ 238 public void testTimedPoll() { testTimedPoll(false); } 239 public void testTimedPoll_fair() { testTimedPoll(true); } 240 public void testTimedPoll(boolean fair) { 241 final SynchronousQueue q = new SynchronousQueue(fair); 242 long startTime = System.nanoTime(); 243 try { assertNull(q.poll(timeoutMillis(), MILLISECONDS)); } 244 catch (InterruptedException e) { threadUnexpectedException(e); } 245 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 246 } 247 248 /** 249 * timed poll before a delayed offer times out, returning null; 250 * after offer succeeds; on interruption throws 251 */ 252 public void testTimedPollWithOffer() { testTimedPollWithOffer(false); } 253 public void testTimedPollWithOffer_fair() { testTimedPollWithOffer(true); } 254 public void testTimedPollWithOffer(boolean fair) { 255 final SynchronousQueue q = new SynchronousQueue(fair); 256 final CountDownLatch pleaseOffer = new CountDownLatch(1); 257 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 258 Thread t = newStartedThread(new CheckedRunnable() { 259 public void realRun() throws InterruptedException { 260 long startTime = System.nanoTime(); 261 assertNull(q.poll(timeoutMillis(), MILLISECONDS)); 262 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 263 264 pleaseOffer.countDown(); 265 startTime = System.nanoTime(); 266 assertSame(zero, q.poll(LONG_DELAY_MS, MILLISECONDS)); 267 268 Thread.currentThread().interrupt(); 269 try { 270 q.poll(LONG_DELAY_MS, MILLISECONDS); 271 shouldThrow(); 272 } catch (InterruptedException success) {} 273 assertFalse(Thread.interrupted()); 274 275 pleaseInterrupt.countDown(); 276 try { 277 q.poll(LONG_DELAY_MS, MILLISECONDS); 278 shouldThrow(); 279 } catch (InterruptedException success) {} 280 assertFalse(Thread.interrupted()); 281 282 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 283 }}); 284 285 await(pleaseOffer); 286 long startTime = System.nanoTime(); 287 try { assertTrue(q.offer(zero, LONG_DELAY_MS, MILLISECONDS)); } 288 catch (InterruptedException e) { threadUnexpectedException(e); } 289 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 290 291 await(pleaseInterrupt); 292 assertThreadStaysAlive(t); 293 t.interrupt(); 294 awaitTermination(t); 295 } 296 297 /** 298 * peek() returns null if no active putter 299 */ 300 public void testPeek() { testPeek(false); } 301 public void testPeek_fair() { testPeek(true); } 302 public void testPeek(boolean fair) { 303 final SynchronousQueue q = new SynchronousQueue(fair); 304 assertNull(q.peek()); 305 } 306 307 /** 308 * element() throws NoSuchElementException if no active putter 309 */ 310 public void testElement() { testElement(false); } 311 public void testElement_fair() { testElement(true); } 312 public void testElement(boolean fair) { 313 final SynchronousQueue q = new SynchronousQueue(fair); 314 try { 315 q.element(); 316 shouldThrow(); 317 } catch (NoSuchElementException success) {} 318 } 319 320 /** 321 * remove() throws NoSuchElementException if no active putter 322 */ 323 public void testRemove() { testRemove(false); } 324 public void testRemove_fair() { testRemove(true); } 325 public void testRemove(boolean fair) { 326 final SynchronousQueue q = new SynchronousQueue(fair); 327 try { 328 q.remove(); 329 shouldThrow(); 330 } catch (NoSuchElementException success) {} 331 } 332 333 /** 334 * contains returns false 335 */ 336 public void testContains() { testContains(false); } 337 public void testContains_fair() { testContains(true); } 338 public void testContains(boolean fair) { 339 final SynchronousQueue q = new SynchronousQueue(fair); 340 assertFalse(q.contains(zero)); 341 } 342 343 /** 344 * clear ensures isEmpty 345 */ 346 public void testClear() { testClear(false); } 347 public void testClear_fair() { testClear(true); } 348 public void testClear(boolean fair) { 349 final SynchronousQueue q = new SynchronousQueue(fair); 350 q.clear(); 351 assertTrue(q.isEmpty()); 352 } 353 354 /** 355 * containsAll returns false unless empty 356 */ 357 public void testContainsAll() { testContainsAll(false); } 358 public void testContainsAll_fair() { testContainsAll(true); } 359 public void testContainsAll(boolean fair) { 360 final SynchronousQueue q = new SynchronousQueue(fair); 361 Integer[] empty = new Integer[0]; 362 assertTrue(q.containsAll(Arrays.asList(empty))); 363 Integer[] ints = new Integer[1]; ints[0] = zero; 364 assertFalse(q.containsAll(Arrays.asList(ints))); 365 } 366 367 /** 368 * retainAll returns false 369 */ 370 public void testRetainAll() { testRetainAll(false); } 371 public void testRetainAll_fair() { testRetainAll(true); } 372 public void testRetainAll(boolean fair) { 373 final SynchronousQueue q = new SynchronousQueue(fair); 374 Integer[] empty = new Integer[0]; 375 assertFalse(q.retainAll(Arrays.asList(empty))); 376 Integer[] ints = new Integer[1]; ints[0] = zero; 377 assertFalse(q.retainAll(Arrays.asList(ints))); 378 } 379 380 /** 381 * removeAll returns false 382 */ 383 public void testRemoveAll() { testRemoveAll(false); } 384 public void testRemoveAll_fair() { testRemoveAll(true); } 385 public void testRemoveAll(boolean fair) { 386 final SynchronousQueue q = new SynchronousQueue(fair); 387 Integer[] empty = new Integer[0]; 388 assertFalse(q.removeAll(Arrays.asList(empty))); 389 Integer[] ints = new Integer[1]; ints[0] = zero; 390 assertFalse(q.containsAll(Arrays.asList(ints))); 391 } 392 393 /** 394 * toArray is empty 395 */ 396 public void testToArray() { testToArray(false); } 397 public void testToArray_fair() { testToArray(true); } 398 public void testToArray(boolean fair) { 399 final SynchronousQueue q = new SynchronousQueue(fair); 400 Object[] o = q.toArray(); 401 assertEquals(0, o.length); 402 } 403 404 /** 405 * toArray(Integer array) returns its argument with the first 406 * element (if present) nulled out 407 */ 408 public void testToArray2() { testToArray2(false); } 409 public void testToArray2_fair() { testToArray2(true); } 410 public void testToArray2(boolean fair) { 411 final SynchronousQueue<Integer> q 412 = new SynchronousQueue<Integer>(fair); 413 Integer[] a; 414 415 a = new Integer[0]; 416 assertSame(a, q.toArray(a)); 417 418 a = new Integer[3]; 419 Arrays.fill(a, 42); 420 assertSame(a, q.toArray(a)); 421 assertNull(a[0]); 422 for (int i = 1; i < a.length; i++) 423 assertEquals(42, (int) a[i]); 424 } 425 426 /** 427 * toArray(null) throws NPE 428 */ 429 public void testToArray_null() { testToArray_null(false); } 430 public void testToArray_null_fair() { testToArray_null(true); } 431 public void testToArray_null(boolean fair) { 432 final SynchronousQueue q = new SynchronousQueue(fair); 433 try { 434 Object[] o = q.toArray(null); 435 shouldThrow(); 436 } catch (NullPointerException success) {} 437 } 438 439 /** 440 * iterator does not traverse any elements 441 */ 442 public void testIterator() { testIterator(false); } 443 public void testIterator_fair() { testIterator(true); } 444 public void testIterator(boolean fair) { 445 assertIteratorExhausted(new SynchronousQueue(fair).iterator()); 446 } 447 448 /** 449 * iterator remove throws ISE 450 */ 451 public void testIteratorRemove() { testIteratorRemove(false); } 452 public void testIteratorRemove_fair() { testIteratorRemove(true); } 453 public void testIteratorRemove(boolean fair) { 454 final SynchronousQueue q = new SynchronousQueue(fair); 455 Iterator it = q.iterator(); 456 try { 457 it.remove(); 458 shouldThrow(); 459 } catch (IllegalStateException success) {} 460 } 461 462 /** 463 * toString returns a non-null string 464 */ 465 public void testToString() { testToString(false); } 466 public void testToString_fair() { testToString(true); } 467 public void testToString(boolean fair) { 468 final SynchronousQueue q = new SynchronousQueue(fair); 469 String s = q.toString(); 470 assertNotNull(s); 471 } 472 473 /** 474 * offer transfers elements across Executor tasks 475 */ 476 public void testOfferInExecutor() { testOfferInExecutor(false); } 477 public void testOfferInExecutor_fair() { testOfferInExecutor(true); } 478 public void testOfferInExecutor(boolean fair) { 479 final SynchronousQueue q = new SynchronousQueue(fair); 480 final CheckedBarrier threadsStarted = new CheckedBarrier(2); 481 final ExecutorService executor = Executors.newFixedThreadPool(2); 482 try (PoolCleaner cleaner = cleaner(executor)) { 483 484 executor.execute(new CheckedRunnable() { 485 public void realRun() throws InterruptedException { 486 assertFalse(q.offer(one)); 487 threadsStarted.await(); 488 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS)); 489 assertEquals(0, q.remainingCapacity()); 490 }}); 491 492 executor.execute(new CheckedRunnable() { 493 public void realRun() throws InterruptedException { 494 threadsStarted.await(); 495 assertSame(one, q.take()); 496 }}); 497 } 498 } 499 500 /** 501 * timed poll retrieves elements across Executor threads 502 */ 503 public void testPollInExecutor() { testPollInExecutor(false); } 504 public void testPollInExecutor_fair() { testPollInExecutor(true); } 505 public void testPollInExecutor(boolean fair) { 506 final SynchronousQueue q = new SynchronousQueue(fair); 507 final CheckedBarrier threadsStarted = new CheckedBarrier(2); 508 final ExecutorService executor = Executors.newFixedThreadPool(2); 509 try (PoolCleaner cleaner = cleaner(executor)) { 510 executor.execute(new CheckedRunnable() { 511 public void realRun() throws InterruptedException { 512 assertNull(q.poll()); 513 threadsStarted.await(); 514 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS)); 515 assertTrue(q.isEmpty()); 516 }}); 517 518 executor.execute(new CheckedRunnable() { 519 public void realRun() throws InterruptedException { 520 threadsStarted.await(); 521 q.put(one); 522 }}); 523 } 524 } 525 526 /** 527 * a deserialized serialized queue is usable 528 */ 529 public void testSerialization() { 530 final SynchronousQueue x = new SynchronousQueue(); 531 final SynchronousQueue y = new SynchronousQueue(false); 532 final SynchronousQueue z = new SynchronousQueue(true); 533 assertSerialEquals(x, y); 534 assertNotSerialEquals(x, z); 535 SynchronousQueue[] qs = { x, y, z }; 536 for (SynchronousQueue q : qs) { 537 SynchronousQueue clone = serialClone(q); 538 assertNotSame(q, clone); 539 assertSerialEquals(q, clone); 540 assertTrue(clone.isEmpty()); 541 assertEquals(0, clone.size()); 542 assertEquals(0, clone.remainingCapacity()); 543 assertFalse(clone.offer(zero)); 544 } 545 } 546 547 /** 548 * drainTo(c) of empty queue doesn't transfer elements 549 */ 550 public void testDrainTo() { testDrainTo(false); } 551 public void testDrainTo_fair() { testDrainTo(true); } 552 public void testDrainTo(boolean fair) { 553 final SynchronousQueue q = new SynchronousQueue(fair); 554 ArrayList l = new ArrayList(); 555 q.drainTo(l); 556 assertEquals(0, q.size()); 557 assertEquals(0, l.size()); 558 } 559 560 /** 561 * drainTo empties queue, unblocking a waiting put. 562 */ 563 public void testDrainToWithActivePut() { testDrainToWithActivePut(false); } 564 public void testDrainToWithActivePut_fair() { testDrainToWithActivePut(true); } 565 public void testDrainToWithActivePut(boolean fair) { 566 final SynchronousQueue q = new SynchronousQueue(fair); 567 Thread t = newStartedThread(new CheckedRunnable() { 568 public void realRun() throws InterruptedException { 569 q.put(one); 570 }}); 571 572 ArrayList l = new ArrayList(); 573 long startTime = System.nanoTime(); 574 while (l.isEmpty()) { 575 q.drainTo(l); 576 if (millisElapsedSince(startTime) > LONG_DELAY_MS) 577 fail("timed out"); 578 Thread.yield(); 579 } 580 assertTrue(l.size() == 1); 581 assertSame(one, l.get(0)); 582 awaitTermination(t); 583 } 584 585 /** 586 * drainTo(c, n) empties up to n elements of queue into c 587 */ 588 public void testDrainToN() throws InterruptedException { 589 final SynchronousQueue q = new SynchronousQueue(); 590 Thread t1 = newStartedThread(new CheckedRunnable() { 591 public void realRun() throws InterruptedException { 592 q.put(one); 593 }}); 594 595 Thread t2 = newStartedThread(new CheckedRunnable() { 596 public void realRun() throws InterruptedException { 597 q.put(two); 598 }}); 599 600 ArrayList l = new ArrayList(); 601 int drained; 602 while ((drained = q.drainTo(l, 1)) == 0) Thread.yield(); 603 assertEquals(1, drained); 604 assertEquals(1, l.size()); 605 while ((drained = q.drainTo(l, 1)) == 0) Thread.yield(); 606 assertEquals(1, drained); 607 assertEquals(2, l.size()); 608 assertTrue(l.contains(one)); 609 assertTrue(l.contains(two)); 610 awaitTermination(t1); 611 awaitTermination(t2); 612 } 613 614 /** 615 * remove(null), contains(null) always return false 616 */ 617 public void testNeverContainsNull() { 618 Collection<?> q = new SynchronousQueue(); 619 assertFalse(q.contains(null)); 620 assertFalse(q.remove(null)); 621 } 622 623 } 624