1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 * Other contributors include Andrew Wright, Jeffrey Hayes, 6 * Pat Fisher, Mike Judd. 7 */ 8 9 package jsr166; 10 11 import junit.framework.*; 12 import java.util.Arrays; 13 import java.util.ArrayList; 14 import java.util.Collection; 15 import java.util.Iterator; 16 import java.util.NoSuchElementException; 17 import java.util.Queue; 18 import java.util.concurrent.BlockingQueue; 19 import java.util.concurrent.CountDownLatch; 20 import java.util.concurrent.Executors; 21 import java.util.concurrent.ExecutorService; 22 import java.util.concurrent.SynchronousQueue; 23 import static java.util.concurrent.TimeUnit.MILLISECONDS; 24 25 public class SynchronousQueueTest extends JSR166TestCase { 26 27 /** 28 * Any SynchronousQueue is both empty and full 29 */ testEmptyFull()30 public void testEmptyFull() { testEmptyFull(false); } testEmptyFull_fair()31 public void testEmptyFull_fair() { testEmptyFull(true); } testEmptyFull(boolean fair)32 public void testEmptyFull(boolean fair) { 33 final SynchronousQueue q = new SynchronousQueue(fair); 34 assertTrue(q.isEmpty()); 35 assertEquals(0, q.size()); 36 assertEquals(0, q.remainingCapacity()); 37 assertFalse(q.offer(zero)); 38 } 39 40 /** 41 * offer fails if no active taker 42 */ testOffer()43 public void testOffer() { testOffer(false); } testOffer_fair()44 public void testOffer_fair() { testOffer(true); } testOffer(boolean fair)45 public void testOffer(boolean fair) { 46 SynchronousQueue q = new SynchronousQueue(fair); 47 assertFalse(q.offer(one)); 48 } 49 50 /** 51 * add throws IllegalStateException if no active taker 52 */ testAdd()53 public void testAdd() { testAdd(false); } testAdd_fair()54 public void testAdd_fair() { testAdd(true); } testAdd(boolean fair)55 public void testAdd(boolean fair) { 56 SynchronousQueue q = new SynchronousQueue(fair); 57 assertEquals(0, q.remainingCapacity()); 58 try { 59 q.add(one); 60 shouldThrow(); 61 } catch (IllegalStateException success) {} 62 } 63 64 /** 65 * addAll(this) throws IllegalArgumentException 66 */ testAddAll_self()67 public void testAddAll_self() { testAddAll_self(false); } testAddAll_self_fair()68 public void testAddAll_self_fair() { testAddAll_self(true); } testAddAll_self(boolean fair)69 public void testAddAll_self(boolean fair) { 70 SynchronousQueue q = new SynchronousQueue(fair); 71 try { 72 q.addAll(q); 73 shouldThrow(); 74 } catch (IllegalArgumentException success) {} 75 } 76 77 /** 78 * addAll throws ISE if no active taker 79 */ testAddAll_ISE()80 public void testAddAll_ISE() { testAddAll_ISE(false); } testAddAll_ISE_fair()81 public void testAddAll_ISE_fair() { testAddAll_ISE(true); } testAddAll_ISE(boolean fair)82 public void testAddAll_ISE(boolean fair) { 83 SynchronousQueue q = new SynchronousQueue(fair); 84 Integer[] ints = new Integer[1]; 85 for (int i = 0; i < ints.length; i++) 86 ints[i] = i; 87 Collection<Integer> coll = Arrays.asList(ints); 88 try { 89 q.addAll(coll); 90 shouldThrow(); 91 } catch (IllegalStateException success) {} 92 } 93 94 /** 95 * put blocks interruptibly if no active taker 96 */ testBlockingPut()97 public void testBlockingPut() { testBlockingPut(false); } testBlockingPut_fair()98 public void testBlockingPut_fair() { testBlockingPut(true); } testBlockingPut(boolean fair)99 public void testBlockingPut(boolean fair) { 100 final SynchronousQueue q = new SynchronousQueue(fair); 101 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 102 Thread t = newStartedThread(new CheckedRunnable() { 103 public void realRun() throws InterruptedException { 104 Thread.currentThread().interrupt(); 105 try { 106 q.put(99); 107 shouldThrow(); 108 } catch (InterruptedException success) {} 109 assertFalse(Thread.interrupted()); 110 111 pleaseInterrupt.countDown(); 112 try { 113 q.put(99); 114 shouldThrow(); 115 } catch (InterruptedException success) {} 116 assertFalse(Thread.interrupted()); 117 }}); 118 119 await(pleaseInterrupt); 120 assertThreadStaysAlive(t); 121 t.interrupt(); 122 awaitTermination(t); 123 assertEquals(0, q.remainingCapacity()); 124 } 125 126 /** 127 * put blocks interruptibly waiting for take 128 */ testPutWithTake()129 public void testPutWithTake() { testPutWithTake(false); } testPutWithTake_fair()130 public void testPutWithTake_fair() { testPutWithTake(true); } testPutWithTake(boolean fair)131 public void testPutWithTake(boolean fair) { 132 final SynchronousQueue q = new SynchronousQueue(fair); 133 final CountDownLatch pleaseTake = new CountDownLatch(1); 134 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 135 Thread t = newStartedThread(new CheckedRunnable() { 136 public void realRun() throws InterruptedException { 137 pleaseTake.countDown(); 138 q.put(one); 139 140 pleaseInterrupt.countDown(); 141 try { 142 q.put(99); 143 shouldThrow(); 144 } catch (InterruptedException success) {} 145 assertFalse(Thread.interrupted()); 146 }}); 147 148 await(pleaseTake); 149 assertEquals(0, q.remainingCapacity()); 150 try { assertSame(one, q.take()); } 151 catch (InterruptedException e) { threadUnexpectedException(e); } 152 153 await(pleaseInterrupt); 154 assertThreadStaysAlive(t); 155 t.interrupt(); 156 awaitTermination(t); 157 assertEquals(0, q.remainingCapacity()); 158 } 159 160 /** 161 * timed offer times out if elements not taken 162 */ testTimedOffer()163 public void testTimedOffer() { testTimedOffer(false); } testTimedOffer_fair()164 public void testTimedOffer_fair() { testTimedOffer(true); } testTimedOffer(boolean fair)165 public void testTimedOffer(boolean fair) { 166 final SynchronousQueue q = new SynchronousQueue(fair); 167 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 168 Thread t = newStartedThread(new CheckedRunnable() { 169 public void realRun() throws InterruptedException { 170 long startTime = System.nanoTime(); 171 assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS)); 172 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 173 pleaseInterrupt.countDown(); 174 try { 175 q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS); 176 shouldThrow(); 177 } catch (InterruptedException success) {} 178 }}); 179 180 await(pleaseInterrupt); 181 assertThreadStaysAlive(t); 182 t.interrupt(); 183 awaitTermination(t); 184 } 185 186 /** 187 * poll return null if no active putter 188 */ testPoll()189 public void testPoll() { testPoll(false); } testPoll_fair()190 public void testPoll_fair() { testPoll(true); } testPoll(boolean fair)191 public void testPoll(boolean fair) { 192 final SynchronousQueue q = new SynchronousQueue(fair); 193 assertNull(q.poll()); 194 } 195 196 /** 197 * timed poll with zero timeout times out if no active putter 198 */ testTimedPoll0()199 public void testTimedPoll0() { testTimedPoll0(false); } testTimedPoll0_fair()200 public void testTimedPoll0_fair() { testTimedPoll0(true); } testTimedPoll0(boolean fair)201 public void testTimedPoll0(boolean fair) { 202 final SynchronousQueue q = new SynchronousQueue(fair); 203 try { assertNull(q.poll(0, MILLISECONDS)); } 204 catch (InterruptedException e) { threadUnexpectedException(e); } 205 } 206 207 /** 208 * timed poll with nonzero timeout times out if no active putter 209 */ testTimedPoll()210 public void testTimedPoll() { testTimedPoll(false); } testTimedPoll_fair()211 public void testTimedPoll_fair() { testTimedPoll(true); } testTimedPoll(boolean fair)212 public void testTimedPoll(boolean fair) { 213 final SynchronousQueue q = new SynchronousQueue(fair); 214 long startTime = System.nanoTime(); 215 try { assertNull(q.poll(timeoutMillis(), MILLISECONDS)); } 216 catch (InterruptedException e) { threadUnexpectedException(e); } 217 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 218 } 219 220 /** 221 * timed poll before a delayed offer times out, returning null; 222 * after offer succeeds; on interruption throws 223 */ testTimedPollWithOffer()224 public void testTimedPollWithOffer() { testTimedPollWithOffer(false); } testTimedPollWithOffer_fair()225 public void testTimedPollWithOffer_fair() { testTimedPollWithOffer(true); } testTimedPollWithOffer(boolean fair)226 public void testTimedPollWithOffer(boolean fair) { 227 final SynchronousQueue q = new SynchronousQueue(fair); 228 final CountDownLatch pleaseOffer = new CountDownLatch(1); 229 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 230 Thread t = newStartedThread(new CheckedRunnable() { 231 public void realRun() throws InterruptedException { 232 long startTime = System.nanoTime(); 233 assertNull(q.poll(timeoutMillis(), MILLISECONDS)); 234 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 235 236 pleaseOffer.countDown(); 237 startTime = System.nanoTime(); 238 assertSame(zero, q.poll(LONG_DELAY_MS, MILLISECONDS)); 239 assertTrue(millisElapsedSince(startTime) < MEDIUM_DELAY_MS); 240 241 Thread.currentThread().interrupt(); 242 try { 243 q.poll(LONG_DELAY_MS, MILLISECONDS); 244 shouldThrow(); 245 } catch (InterruptedException success) {} 246 assertFalse(Thread.interrupted()); 247 248 pleaseInterrupt.countDown(); 249 try { 250 q.poll(LONG_DELAY_MS, MILLISECONDS); 251 shouldThrow(); 252 } catch (InterruptedException success) {} 253 assertFalse(Thread.interrupted()); 254 }}); 255 256 await(pleaseOffer); 257 long startTime = System.nanoTime(); 258 try { assertTrue(q.offer(zero, LONG_DELAY_MS, MILLISECONDS)); } 259 catch (InterruptedException e) { threadUnexpectedException(e); } 260 assertTrue(millisElapsedSince(startTime) < MEDIUM_DELAY_MS); 261 262 await(pleaseInterrupt); 263 assertThreadStaysAlive(t); 264 t.interrupt(); 265 awaitTermination(t); 266 } 267 268 /** 269 * peek() returns null if no active putter 270 */ 271 public void testPeek() { testPeek(false); } 272 public void testPeek_fair() { testPeek(true); } 273 public void testPeek(boolean fair) { 274 final SynchronousQueue q = new SynchronousQueue(fair); 275 assertNull(q.peek()); 276 } 277 278 /** 279 * element() throws NoSuchElementException if no active putter 280 */ 281 public void testElement() { testElement(false); } 282 public void testElement_fair() { testElement(true); } 283 public void testElement(boolean fair) { 284 final SynchronousQueue q = new SynchronousQueue(fair); 285 try { 286 q.element(); 287 shouldThrow(); 288 } catch (NoSuchElementException success) {} 289 } 290 291 /** 292 * remove() throws NoSuchElementException if no active putter 293 */ 294 public void testRemove() { testRemove(false); } 295 public void testRemove_fair() { testRemove(true); } 296 public void testRemove(boolean fair) { 297 final SynchronousQueue q = new SynchronousQueue(fair); 298 try { 299 q.remove(); 300 shouldThrow(); 301 } catch (NoSuchElementException success) {} 302 } 303 304 /** 305 * contains returns false 306 */ 307 public void testContains() { testContains(false); } 308 public void testContains_fair() { testContains(true); } 309 public void testContains(boolean fair) { 310 final SynchronousQueue q = new SynchronousQueue(fair); 311 assertFalse(q.contains(zero)); 312 } 313 314 /** 315 * clear ensures isEmpty 316 */ 317 public void testClear() { testClear(false); } 318 public void testClear_fair() { testClear(true); } 319 public void testClear(boolean fair) { 320 final SynchronousQueue q = new SynchronousQueue(fair); 321 q.clear(); 322 assertTrue(q.isEmpty()); 323 } 324 325 /** 326 * containsAll returns false unless empty 327 */ 328 public void testContainsAll() { testContainsAll(false); } 329 public void testContainsAll_fair() { testContainsAll(true); } 330 public void testContainsAll(boolean fair) { 331 final SynchronousQueue q = new SynchronousQueue(fair); 332 Integer[] empty = new Integer[0]; 333 assertTrue(q.containsAll(Arrays.asList(empty))); 334 Integer[] ints = new Integer[1]; ints[0] = zero; 335 assertFalse(q.containsAll(Arrays.asList(ints))); 336 } 337 338 /** 339 * retainAll returns false 340 */ 341 public void testRetainAll() { testRetainAll(false); } 342 public void testRetainAll_fair() { testRetainAll(true); } 343 public void testRetainAll(boolean fair) { 344 final SynchronousQueue q = new SynchronousQueue(fair); 345 Integer[] empty = new Integer[0]; 346 assertFalse(q.retainAll(Arrays.asList(empty))); 347 Integer[] ints = new Integer[1]; ints[0] = zero; 348 assertFalse(q.retainAll(Arrays.asList(ints))); 349 } 350 351 /** 352 * removeAll returns false 353 */ 354 public void testRemoveAll() { testRemoveAll(false); } 355 public void testRemoveAll_fair() { testRemoveAll(true); } 356 public void testRemoveAll(boolean fair) { 357 final SynchronousQueue q = new SynchronousQueue(fair); 358 Integer[] empty = new Integer[0]; 359 assertFalse(q.removeAll(Arrays.asList(empty))); 360 Integer[] ints = new Integer[1]; ints[0] = zero; 361 assertFalse(q.containsAll(Arrays.asList(ints))); 362 } 363 364 /** 365 * toArray is empty 366 */ 367 public void testToArray() { testToArray(false); } 368 public void testToArray_fair() { testToArray(true); } 369 public void testToArray(boolean fair) { 370 final SynchronousQueue q = new SynchronousQueue(fair); 371 Object[] o = q.toArray(); 372 assertEquals(0, o.length); 373 } 374 375 /** 376 * toArray(Integer array) returns its argument with the first 377 * element (if present) nulled out 378 */ 379 public void testToArray2() { testToArray2(false); } 380 public void testToArray2_fair() { testToArray2(true); } 381 public void testToArray2(boolean fair) { 382 final SynchronousQueue<Integer> q 383 = new SynchronousQueue<Integer>(fair); 384 Integer[] a; 385 386 a = new Integer[0]; 387 assertSame(a, q.toArray(a)); 388 389 a = new Integer[3]; 390 Arrays.fill(a, 42); 391 assertSame(a, q.toArray(a)); 392 assertNull(a[0]); 393 for (int i = 1; i < a.length; i++) 394 assertEquals(42, (int) a[i]); 395 } 396 397 /** 398 * toArray(null) throws NPE 399 */ 400 public void testToArray_null() { testToArray_null(false); } 401 public void testToArray_null_fair() { testToArray_null(true); } 402 public void testToArray_null(boolean fair) { 403 final SynchronousQueue q = new SynchronousQueue(fair); 404 try { 405 Object o[] = q.toArray(null); 406 shouldThrow(); 407 } catch (NullPointerException success) {} 408 } 409 410 /** 411 * iterator does not traverse any elements 412 */ 413 public void testIterator() { testIterator(false); } 414 public void testIterator_fair() { testIterator(true); } 415 public void testIterator(boolean fair) { 416 final SynchronousQueue q = new SynchronousQueue(fair); 417 Iterator it = q.iterator(); 418 assertFalse(it.hasNext()); 419 try { 420 Object x = it.next(); 421 shouldThrow(); 422 } catch (NoSuchElementException success) {} 423 } 424 425 /** 426 * iterator remove throws ISE 427 */ 428 public void testIteratorRemove() { testIteratorRemove(false); } 429 public void testIteratorRemove_fair() { testIteratorRemove(true); } 430 public void testIteratorRemove(boolean fair) { 431 final SynchronousQueue q = new SynchronousQueue(fair); 432 Iterator it = q.iterator(); 433 try { 434 it.remove(); 435 shouldThrow(); 436 } catch (IllegalStateException success) {} 437 } 438 439 /** 440 * toString returns a non-null string 441 */ 442 public void testToString() { testToString(false); } 443 public void testToString_fair() { testToString(true); } 444 public void testToString(boolean fair) { 445 final SynchronousQueue q = new SynchronousQueue(fair); 446 String s = q.toString(); 447 assertNotNull(s); 448 } 449 450 /** 451 * offer transfers elements across Executor tasks 452 */ 453 public void testOfferInExecutor() { testOfferInExecutor(false); } 454 public void testOfferInExecutor_fair() { testOfferInExecutor(true); } 455 public void testOfferInExecutor(boolean fair) { 456 final SynchronousQueue q = new SynchronousQueue(fair); 457 ExecutorService executor = Executors.newFixedThreadPool(2); 458 final CheckedBarrier threadsStarted = new CheckedBarrier(2); 459 460 executor.execute(new CheckedRunnable() { 461 public void realRun() throws InterruptedException { 462 assertFalse(q.offer(one)); 463 threadsStarted.await(); 464 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS)); 465 assertEquals(0, q.remainingCapacity()); 466 }}); 467 468 executor.execute(new CheckedRunnable() { 469 public void realRun() throws InterruptedException { 470 threadsStarted.await(); 471 assertSame(one, q.take()); 472 }}); 473 474 joinPool(executor); 475 } 476 477 /** 478 * timed poll retrieves elements across Executor threads 479 */ 480 public void testPollInExecutor() { testPollInExecutor(false); } 481 public void testPollInExecutor_fair() { testPollInExecutor(true); } 482 public void testPollInExecutor(boolean fair) { 483 final SynchronousQueue q = new SynchronousQueue(fair); 484 final CheckedBarrier threadsStarted = new CheckedBarrier(2); 485 ExecutorService executor = Executors.newFixedThreadPool(2); 486 executor.execute(new CheckedRunnable() { 487 public void realRun() throws InterruptedException { 488 assertNull(q.poll()); 489 threadsStarted.await(); 490 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS)); 491 assertTrue(q.isEmpty()); 492 }}); 493 494 executor.execute(new CheckedRunnable() { 495 public void realRun() throws InterruptedException { 496 threadsStarted.await(); 497 q.put(one); 498 }}); 499 500 joinPool(executor); 501 } 502 503 /** 504 * a deserialized serialized queue is usable 505 */ 506 public void testSerialization() { 507 final SynchronousQueue x = new SynchronousQueue(); 508 final SynchronousQueue y = new SynchronousQueue(false); 509 final SynchronousQueue z = new SynchronousQueue(true); 510 assertSerialEquals(x, y); 511 assertNotSerialEquals(x, z); 512 SynchronousQueue[] qs = { x, y, z }; 513 for (SynchronousQueue q : qs) { 514 SynchronousQueue clone = serialClone(q); 515 assertNotSame(q, clone); 516 assertSerialEquals(q, clone); 517 assertTrue(clone.isEmpty()); 518 assertEquals(0, clone.size()); 519 assertEquals(0, clone.remainingCapacity()); 520 assertFalse(clone.offer(zero)); 521 } 522 } 523 524 /** 525 * drainTo(c) of empty queue doesn't transfer elements 526 */ 527 public void testDrainTo() { testDrainTo(false); } 528 public void testDrainTo_fair() { testDrainTo(true); } 529 public void testDrainTo(boolean fair) { 530 final SynchronousQueue q = new SynchronousQueue(fair); 531 ArrayList l = new ArrayList(); 532 q.drainTo(l); 533 assertEquals(0, q.size()); 534 assertEquals(0, l.size()); 535 } 536 537 /** 538 * drainTo empties queue, unblocking a waiting put. 539 */ 540 public void testDrainToWithActivePut() { testDrainToWithActivePut(false); } 541 public void testDrainToWithActivePut_fair() { testDrainToWithActivePut(true); } 542 public void testDrainToWithActivePut(boolean fair) { 543 final SynchronousQueue q = new SynchronousQueue(fair); 544 Thread t = newStartedThread(new CheckedRunnable() { 545 public void realRun() throws InterruptedException { 546 q.put(one); 547 }}); 548 549 ArrayList l = new ArrayList(); 550 long startTime = System.nanoTime(); 551 while (l.isEmpty()) { 552 q.drainTo(l); 553 if (millisElapsedSince(startTime) > LONG_DELAY_MS) 554 fail("timed out"); 555 Thread.yield(); 556 } 557 assertTrue(l.size() == 1); 558 assertSame(one, l.get(0)); 559 awaitTermination(t); 560 } 561 562 /** 563 * drainTo(c, n) empties up to n elements of queue into c 564 */ 565 public void testDrainToN() throws InterruptedException { 566 final SynchronousQueue q = new SynchronousQueue(); 567 Thread t1 = newStartedThread(new CheckedRunnable() { 568 public void realRun() throws InterruptedException { 569 q.put(one); 570 }}); 571 572 Thread t2 = newStartedThread(new CheckedRunnable() { 573 public void realRun() throws InterruptedException { 574 q.put(two); 575 }}); 576 577 ArrayList l = new ArrayList(); 578 delay(SHORT_DELAY_MS); 579 q.drainTo(l, 1); 580 assertEquals(1, l.size()); 581 q.drainTo(l, 1); 582 assertEquals(2, l.size()); 583 assertTrue(l.contains(one)); 584 assertTrue(l.contains(two)); 585 awaitTermination(t1); 586 awaitTermination(t2); 587 } 588 589 } 590