1 /* 2 * Copyright (C) 2011 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not 5 * use this file except in compliance with the License. You may obtain a copy of 6 * the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 * License for the specific language governing permissions and limitations under 14 * the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.util.concurrent.InterruptionUtil.repeatedlyInterruptTestThread; 20 import static com.google.common.util.concurrent.Uninterruptibles.awaitTerminationUninterruptibly; 21 import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly; 22 import static com.google.common.util.concurrent.Uninterruptibles.joinUninterruptibly; 23 import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly; 24 import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly; 25 import static com.google.common.util.concurrent.Uninterruptibles.tryAcquireUninterruptibly; 26 import static com.google.common.util.concurrent.Uninterruptibles.tryLockUninterruptibly; 27 import static java.util.concurrent.Executors.newFixedThreadPool; 28 import static java.util.concurrent.TimeUnit.MILLISECONDS; 29 import static java.util.concurrent.TimeUnit.SECONDS; 30 31 import com.google.common.base.Preconditions; 32 import com.google.common.base.Stopwatch; 33 import com.google.common.testing.NullPointerTester; 34 import com.google.common.testing.TearDown; 35 import com.google.common.testing.TearDownStack; 36 import com.google.errorprone.annotations.CanIgnoreReturnValue; 37 import java.time.Duration; 38 import java.util.Date; 39 import java.util.concurrent.ArrayBlockingQueue; 40 import java.util.concurrent.BlockingQueue; 41 import java.util.concurrent.CountDownLatch; 42 import java.util.concurrent.ExecutorService; 43 import java.util.concurrent.Executors; 44 import java.util.concurrent.Future; 45 import java.util.concurrent.ScheduledExecutorService; 46 import java.util.concurrent.Semaphore; 47 import java.util.concurrent.TimeUnit; 48 import java.util.concurrent.locks.Condition; 49 import java.util.concurrent.locks.Lock; 50 import java.util.concurrent.locks.ReentrantLock; 51 import junit.framework.TestCase; 52 53 /** 54 * Tests for {@link Uninterruptibles}. 55 * 56 * @author Anthony Zana 57 */ 58 59 public class UninterruptiblesTest extends TestCase { 60 private static final String EXPECTED_TAKE = "expectedTake"; 61 62 /** Timeout to use when we don't expect the timeout to expire. */ 63 private static final long LONG_DELAY_MS = 2500; 64 65 private static final long SLEEP_SLACK = 2; 66 67 private final TearDownStack tearDownStack = new TearDownStack(); 68 69 // NOTE: All durations in these tests are expressed in milliseconds 70 @Override setUp()71 protected void setUp() { 72 // Clear any previous interrupt before running the test. 73 if (Thread.currentThread().isInterrupted()) { 74 throw new AssertionError( 75 "Thread interrupted on test entry. " 76 + "Some test probably didn't clear the interrupt state"); 77 } 78 79 tearDownStack.addTearDown( 80 new TearDown() { 81 @Override 82 public void tearDown() { 83 Thread.interrupted(); 84 } 85 }); 86 } 87 88 @Override tearDown()89 protected void tearDown() { 90 tearDownStack.runTearDown(); 91 } 92 testNull()93 public void testNull() throws Exception { 94 new NullPointerTester() 95 .setDefault(CountDownLatch.class, new CountDownLatch(0)) 96 .setDefault(Semaphore.class, new Semaphore(999)) 97 .testAllPublicStaticMethods(Uninterruptibles.class); 98 } 99 100 // IncrementableCountDownLatch.await() tests 101 102 // CountDownLatch.await() tests 103 104 // Condition.await() tests testConditionAwaitTimeoutExceeded()105 public void testConditionAwaitTimeoutExceeded() { 106 Stopwatch stopwatch = Stopwatch.createStarted(); 107 Condition condition = TestCondition.create(); 108 109 boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 500, MILLISECONDS); 110 111 assertFalse(signaledBeforeTimeout); 112 assertAtLeastTimePassed(stopwatch, 500); 113 assertNotInterrupted(); 114 } 115 testConditionAwaitTimeoutNotExceeded()116 public void testConditionAwaitTimeoutNotExceeded() { 117 Stopwatch stopwatch = Stopwatch.createStarted(); 118 Condition condition = TestCondition.createAndSignalAfter(500, MILLISECONDS); 119 120 boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1500, MILLISECONDS); 121 122 assertTrue(signaledBeforeTimeout); 123 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 124 assertNotInterrupted(); 125 } 126 testConditionAwaitInterruptedTimeoutExceeded()127 public void testConditionAwaitInterruptedTimeoutExceeded() { 128 Stopwatch stopwatch = Stopwatch.createStarted(); 129 Condition condition = TestCondition.create(); 130 requestInterruptIn(500); 131 132 boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1000, MILLISECONDS); 133 134 assertFalse(signaledBeforeTimeout); 135 assertAtLeastTimePassed(stopwatch, 1000); 136 assertInterrupted(); 137 } 138 testConditionAwaitInterruptedTimeoutNotExceeded()139 public void testConditionAwaitInterruptedTimeoutNotExceeded() { 140 Stopwatch stopwatch = Stopwatch.createStarted(); 141 Condition condition = TestCondition.createAndSignalAfter(1000, MILLISECONDS); 142 requestInterruptIn(500); 143 144 boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1500, MILLISECONDS); 145 146 assertTrue(signaledBeforeTimeout); 147 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 148 assertInterrupted(); 149 } 150 151 // Lock.tryLock() tests testTryLockTimeoutExceeded()152 public void testTryLockTimeoutExceeded() { 153 Stopwatch stopwatch = Stopwatch.createStarted(); 154 Lock lock = new ReentrantLock(); 155 Thread lockThread = acquireFor(lock, 5, SECONDS); 156 157 boolean lockAcquired = tryLockUninterruptibly(lock, 500, MILLISECONDS); 158 159 assertFalse(lockAcquired); 160 assertAtLeastTimePassed(stopwatch, 500); 161 assertNotInterrupted(); 162 163 // finish locking thread 164 lockThread.interrupt(); 165 } 166 testTryLockTimeoutNotExceeded()167 public void testTryLockTimeoutNotExceeded() { 168 Stopwatch stopwatch = Stopwatch.createStarted(); 169 Lock lock = new ReentrantLock(); 170 acquireFor(lock, 500, MILLISECONDS); 171 172 boolean signaledBeforeTimeout = tryLockUninterruptibly(lock, 1500, MILLISECONDS); 173 174 assertTrue(signaledBeforeTimeout); 175 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 176 assertNotInterrupted(); 177 } 178 testTryLockInterruptedTimeoutExceeded()179 public void testTryLockInterruptedTimeoutExceeded() { 180 Stopwatch stopwatch = Stopwatch.createStarted(); 181 Lock lock = new ReentrantLock(); 182 Thread lockThread = acquireFor(lock, 5, SECONDS); 183 requestInterruptIn(500); 184 185 boolean signaledBeforeTimeout = tryLockUninterruptibly(lock, 1000, MILLISECONDS); 186 187 assertFalse(signaledBeforeTimeout); 188 assertAtLeastTimePassed(stopwatch, 1000); 189 assertInterrupted(); 190 191 // finish locking thread 192 lockThread.interrupt(); 193 } 194 testTryLockInterruptedTimeoutNotExceeded()195 public void testTryLockInterruptedTimeoutNotExceeded() { 196 Stopwatch stopwatch = Stopwatch.createStarted(); 197 Lock lock = new ReentrantLock(); 198 acquireFor(lock, 1000, MILLISECONDS); 199 requestInterruptIn(500); 200 201 boolean signaledBeforeTimeout = tryLockUninterruptibly(lock, 1500, MILLISECONDS); 202 203 assertTrue(signaledBeforeTimeout); 204 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 205 assertInterrupted(); 206 } 207 208 // BlockingQueue.put() tests testPutWithNoWait()209 public void testPutWithNoWait() { 210 Stopwatch stopwatch = Stopwatch.createStarted(); 211 BlockingQueue<String> queue = new ArrayBlockingQueue<>(999); 212 putUninterruptibly(queue, ""); 213 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 214 assertEquals("", queue.peek()); 215 } 216 testPutNoInterrupt()217 public void testPutNoInterrupt() { 218 TimedPutQueue queue = TimedPutQueue.createWithDelay(20); 219 queue.putSuccessfully(); 220 assertNotInterrupted(); 221 } 222 testPutSingleInterrupt()223 public void testPutSingleInterrupt() { 224 TimedPutQueue queue = TimedPutQueue.createWithDelay(50); 225 requestInterruptIn(10); 226 queue.putSuccessfully(); 227 assertInterrupted(); 228 } 229 testPutMultiInterrupt()230 public void testPutMultiInterrupt() { 231 TimedPutQueue queue = TimedPutQueue.createWithDelay(100); 232 repeatedlyInterruptTestThread(20, tearDownStack); 233 queue.putSuccessfully(); 234 assertInterrupted(); 235 } 236 237 // BlockingQueue.take() tests testTakeWithNoWait()238 public void testTakeWithNoWait() { 239 Stopwatch stopwatch = Stopwatch.createStarted(); 240 BlockingQueue<String> queue = new ArrayBlockingQueue<>(1); 241 assertTrue(queue.offer("")); 242 assertEquals("", takeUninterruptibly(queue)); 243 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 244 } 245 testTakeNoInterrupt()246 public void testTakeNoInterrupt() { 247 TimedTakeQueue queue = TimedTakeQueue.createWithDelay(20); 248 queue.takeSuccessfully(); 249 assertNotInterrupted(); 250 } 251 testTakeSingleInterrupt()252 public void testTakeSingleInterrupt() { 253 TimedTakeQueue queue = TimedTakeQueue.createWithDelay(50); 254 requestInterruptIn(10); 255 queue.takeSuccessfully(); 256 assertInterrupted(); 257 } 258 testTakeMultiInterrupt()259 public void testTakeMultiInterrupt() { 260 TimedTakeQueue queue = TimedTakeQueue.createWithDelay(100); 261 repeatedlyInterruptTestThread(20, tearDownStack); 262 queue.takeSuccessfully(); 263 assertInterrupted(); 264 } 265 266 // join() tests testJoinWithNoWait()267 public void testJoinWithNoWait() throws InterruptedException { 268 Stopwatch stopwatch = Stopwatch.createStarted(); 269 Thread thread = new Thread(new JoinTarget(15)); 270 thread.start(); 271 thread.join(); 272 assertFalse(thread.isAlive()); 273 274 joinUninterruptibly(thread); 275 joinUninterruptibly(thread, 0, MILLISECONDS); 276 joinUninterruptibly(thread, -42, MILLISECONDS); 277 joinUninterruptibly(thread, LONG_DELAY_MS, MILLISECONDS); 278 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 279 } 280 testJoinNoInterrupt()281 public void testJoinNoInterrupt() { 282 TimedThread thread = TimedThread.createWithDelay(20); 283 thread.joinSuccessfully(); 284 assertNotInterrupted(); 285 } 286 testJoinTimeoutNoInterruptNotExpired()287 public void testJoinTimeoutNoInterruptNotExpired() { 288 TimedThread thread = TimedThread.createWithDelay(20); 289 thread.joinSuccessfully(LONG_DELAY_MS); 290 assertNotInterrupted(); 291 } 292 testJoinTimeoutNoInterruptExpired()293 public void testJoinTimeoutNoInterruptExpired() { 294 TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS); 295 thread.joinUnsuccessfully(30); 296 assertNotInterrupted(); 297 } 298 testJoinSingleInterrupt()299 public void testJoinSingleInterrupt() { 300 TimedThread thread = TimedThread.createWithDelay(50); 301 requestInterruptIn(10); 302 thread.joinSuccessfully(); 303 assertInterrupted(); 304 } 305 testJoinTimeoutSingleInterruptNoExpire()306 public void testJoinTimeoutSingleInterruptNoExpire() { 307 TimedThread thread = TimedThread.createWithDelay(50); 308 requestInterruptIn(10); 309 thread.joinSuccessfully(LONG_DELAY_MS); 310 assertInterrupted(); 311 } 312 testJoinTimeoutSingleInterruptExpired()313 public void testJoinTimeoutSingleInterruptExpired() { 314 TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS); 315 requestInterruptIn(10); 316 thread.joinUnsuccessfully(50); 317 assertInterrupted(); 318 } 319 testJoinMultiInterrupt()320 public void testJoinMultiInterrupt() { 321 TimedThread thread = TimedThread.createWithDelay(100); 322 repeatedlyInterruptTestThread(20, tearDownStack); 323 thread.joinSuccessfully(); 324 assertInterrupted(); 325 } 326 testJoinTimeoutMultiInterruptNoExpire()327 public void testJoinTimeoutMultiInterruptNoExpire() { 328 TimedThread thread = TimedThread.createWithDelay(100); 329 repeatedlyInterruptTestThread(20, tearDownStack); 330 thread.joinSuccessfully(LONG_DELAY_MS); 331 assertInterrupted(); 332 } 333 testJoinTimeoutMultiInterruptExpired()334 public void testJoinTimeoutMultiInterruptExpired() { 335 /* 336 * We don't "need" to schedule a thread completion at all here, but by doing 337 * so, we come the closest we can to testing that the wait time is 338 * appropriately decreased on each progressive join() call. 339 */ 340 TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS); 341 repeatedlyInterruptTestThread(20, tearDownStack); 342 thread.joinUnsuccessfully(70); 343 assertInterrupted(); 344 } 345 346 // sleep() Tests testSleepNoInterrupt()347 public void testSleepNoInterrupt() { 348 sleepSuccessfully(10); 349 } 350 testSleepSingleInterrupt()351 public void testSleepSingleInterrupt() { 352 requestInterruptIn(10); 353 sleepSuccessfully(50); 354 assertInterrupted(); 355 } 356 testSleepMultiInterrupt()357 public void testSleepMultiInterrupt() { 358 repeatedlyInterruptTestThread(10, tearDownStack); 359 sleepSuccessfully(100); 360 assertInterrupted(); 361 } 362 363 // Semaphore.tryAcquire() tests testTryAcquireWithNoWait()364 public void testTryAcquireWithNoWait() { 365 Stopwatch stopwatch = Stopwatch.createStarted(); 366 Semaphore semaphore = new Semaphore(99); 367 assertTrue(tryAcquireUninterruptibly(semaphore, 0, MILLISECONDS)); 368 assertTrue(tryAcquireUninterruptibly(semaphore, -42, MILLISECONDS)); 369 assertTrue(tryAcquireUninterruptibly(semaphore, LONG_DELAY_MS, MILLISECONDS)); 370 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 371 } 372 testTryAcquireTimeoutNoInterruptNotExpired()373 public void testTryAcquireTimeoutNoInterruptNotExpired() { 374 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(20); 375 semaphore.tryAcquireSuccessfully(LONG_DELAY_MS); 376 assertNotInterrupted(); 377 } 378 testTryAcquireTimeoutNoInterruptExpired()379 public void testTryAcquireTimeoutNoInterruptExpired() { 380 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS); 381 semaphore.tryAcquireUnsuccessfully(30); 382 assertNotInterrupted(); 383 } 384 testTryAcquireTimeoutSingleInterruptNoExpire()385 public void testTryAcquireTimeoutSingleInterruptNoExpire() { 386 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(50); 387 requestInterruptIn(10); 388 semaphore.tryAcquireSuccessfully(LONG_DELAY_MS); 389 assertInterrupted(); 390 } 391 testTryAcquireTimeoutSingleInterruptExpired()392 public void testTryAcquireTimeoutSingleInterruptExpired() { 393 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS); 394 requestInterruptIn(10); 395 semaphore.tryAcquireUnsuccessfully(50); 396 assertInterrupted(); 397 } 398 testTryAcquireTimeoutMultiInterruptNoExpire()399 public void testTryAcquireTimeoutMultiInterruptNoExpire() { 400 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(100); 401 repeatedlyInterruptTestThread(20, tearDownStack); 402 semaphore.tryAcquireSuccessfully(LONG_DELAY_MS); 403 assertInterrupted(); 404 } 405 testTryAcquireTimeoutMultiInterruptExpired()406 public void testTryAcquireTimeoutMultiInterruptExpired() { 407 /* 408 * We don't "need" to schedule a release() call at all here, but by doing 409 * so, we come the closest we can to testing that the wait time is 410 * appropriately decreased on each progressive tryAcquire() call. 411 */ 412 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS); 413 repeatedlyInterruptTestThread(20, tearDownStack); 414 semaphore.tryAcquireUnsuccessfully(70); 415 assertInterrupted(); 416 } 417 testTryAcquireWithNoWaitMultiPermit()418 public void testTryAcquireWithNoWaitMultiPermit() { 419 Stopwatch stopwatch = Stopwatch.createStarted(); 420 Semaphore semaphore = new Semaphore(99); 421 assertTrue(tryAcquireUninterruptibly(semaphore, 10, 0, MILLISECONDS)); 422 assertTrue(tryAcquireUninterruptibly(semaphore, 10, -42, MILLISECONDS)); 423 assertTrue(tryAcquireUninterruptibly(semaphore, 10, LONG_DELAY_MS, MILLISECONDS)); 424 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 425 } 426 testTryAcquireTimeoutNoInterruptNotExpiredMultiPermit()427 public void testTryAcquireTimeoutNoInterruptNotExpiredMultiPermit() { 428 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(20); 429 semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS); 430 assertNotInterrupted(); 431 } 432 testTryAcquireTimeoutNoInterruptExpiredMultiPermit()433 public void testTryAcquireTimeoutNoInterruptExpiredMultiPermit() { 434 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS); 435 semaphore.tryAcquireUnsuccessfully(10, 30); 436 assertNotInterrupted(); 437 } 438 testTryAcquireTimeoutSingleInterruptNoExpireMultiPermit()439 public void testTryAcquireTimeoutSingleInterruptNoExpireMultiPermit() { 440 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(50); 441 requestInterruptIn(10); 442 semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS); 443 assertInterrupted(); 444 } 445 testTryAcquireTimeoutSingleInterruptExpiredMultiPermit()446 public void testTryAcquireTimeoutSingleInterruptExpiredMultiPermit() { 447 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS); 448 requestInterruptIn(10); 449 semaphore.tryAcquireUnsuccessfully(10, 50); 450 assertInterrupted(); 451 } 452 testTryAcquireTimeoutMultiInterruptNoExpireMultiPermit()453 public void testTryAcquireTimeoutMultiInterruptNoExpireMultiPermit() { 454 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(100); 455 repeatedlyInterruptTestThread(20, tearDownStack); 456 semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS); 457 assertInterrupted(); 458 } 459 testTryAcquireTimeoutMultiInterruptExpiredMultiPermit()460 public void testTryAcquireTimeoutMultiInterruptExpiredMultiPermit() { 461 /* 462 * We don't "need" to schedule a release() call at all here, but by doing 463 * so, we come the closest we can to testing that the wait time is 464 * appropriately decreased on each progressive tryAcquire() call. 465 */ 466 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS); 467 repeatedlyInterruptTestThread(20, tearDownStack); 468 semaphore.tryAcquireUnsuccessfully(10, 70); 469 assertInterrupted(); 470 } 471 472 // executor.awaitTermination Testcases testTryAwaitTerminationUninterruptiblyDuration_success()473 public void testTryAwaitTerminationUninterruptiblyDuration_success() { 474 ExecutorService executor = newFixedThreadPool(1); 475 requestInterruptIn(500); 476 executor.execute(new SleepTask(1000)); 477 executor.shutdown(); 478 assertTrue(awaitTerminationUninterruptibly(executor, Duration.ofMillis(LONG_DELAY_MS))); 479 assertTrue(executor.isTerminated()); 480 assertInterrupted(); 481 } 482 testTryAwaitTerminationUninterruptiblyDuration_failure()483 public void testTryAwaitTerminationUninterruptiblyDuration_failure() { 484 ExecutorService executor = newFixedThreadPool(1); 485 requestInterruptIn(500); 486 executor.execute(new SleepTask(10000)); 487 executor.shutdown(); 488 assertFalse(awaitTerminationUninterruptibly(executor, Duration.ofMillis(1000))); 489 assertFalse(executor.isTerminated()); 490 assertInterrupted(); 491 } 492 testTryAwaitTerminationUninterruptiblyLongTimeUnit_success()493 public void testTryAwaitTerminationUninterruptiblyLongTimeUnit_success() { 494 ExecutorService executor = newFixedThreadPool(1); 495 requestInterruptIn(500); 496 executor.execute(new SleepTask(1000)); 497 executor.shutdown(); 498 assertTrue(awaitTerminationUninterruptibly(executor, LONG_DELAY_MS, MILLISECONDS)); 499 assertTrue(executor.isTerminated()); 500 assertInterrupted(); 501 } 502 testTryAwaitTerminationUninterruptiblyLongTimeUnit_failure()503 public void testTryAwaitTerminationUninterruptiblyLongTimeUnit_failure() { 504 ExecutorService executor = newFixedThreadPool(1); 505 requestInterruptIn(500); 506 executor.execute(new SleepTask(10000)); 507 executor.shutdown(); 508 assertFalse(awaitTerminationUninterruptibly(executor, 1000, MILLISECONDS)); 509 assertFalse(executor.isTerminated()); 510 assertInterrupted(); 511 } 512 testTryAwaitTerminationInfiniteTimeout()513 public void testTryAwaitTerminationInfiniteTimeout() { 514 ExecutorService executor = newFixedThreadPool(1); 515 requestInterruptIn(500); 516 executor.execute(new SleepTask(1000)); 517 executor.shutdown(); 518 awaitTerminationUninterruptibly(executor); 519 assertTrue(executor.isTerminated()); 520 assertInterrupted(); 521 } 522 523 /** 524 * Wrapper around {@link Stopwatch} which also contains an "expected completion time." Creating a 525 * {@code Completion} starts the underlying stopwatch. 526 */ 527 private static final class Completion { 528 final Stopwatch stopwatch; 529 final long expectedCompletionWaitMillis; 530 Completion(long expectedCompletionWaitMillis)531 Completion(long expectedCompletionWaitMillis) { 532 this.expectedCompletionWaitMillis = expectedCompletionWaitMillis; 533 stopwatch = Stopwatch.createStarted(); 534 } 535 536 /** 537 * Asserts that the expected completion time has passed (and not "too much" time beyond that). 538 */ assertCompletionExpected()539 void assertCompletionExpected() { 540 assertAtLeastTimePassed(stopwatch, expectedCompletionWaitMillis); 541 assertTimeNotPassed(stopwatch, expectedCompletionWaitMillis + LONG_DELAY_MS); 542 } 543 544 /** 545 * Asserts that at least {@code timeout} has passed but the expected completion time has not. 546 */ assertCompletionNotExpected(long timeout)547 void assertCompletionNotExpected(long timeout) { 548 Preconditions.checkArgument(timeout < expectedCompletionWaitMillis); 549 assertAtLeastTimePassed(stopwatch, timeout); 550 assertTimeNotPassed(stopwatch, expectedCompletionWaitMillis); 551 } 552 } 553 554 private static void assertAtLeastTimePassed(Stopwatch stopwatch, long expectedMillis) { 555 long elapsedMillis = stopwatch.elapsed(MILLISECONDS); 556 /* 557 * The "+ 5" below is to permit, say, sleep(10) to sleep only 9 milliseconds. We see such 558 * behavior sometimes when running these tests publicly as part of Guava. "+ 5" is probably more 559 * generous than it needs to be. 560 */ 561 assertTrue( 562 "Expected elapsed millis to be >= " + expectedMillis + " but was " + elapsedMillis, 563 elapsedMillis + 5 >= expectedMillis); 564 } 565 566 // TODO(cpovirk): Split this into separate CountDownLatch and IncrementableCountDownLatch classes. 567 568 /** Manages a {@link BlockingQueue} and associated timings for a {@code put} call. */ 569 private static final class TimedPutQueue { 570 final BlockingQueue<String> queue; 571 final Completion completed; 572 573 /** 574 * Creates a {@link EnableWrites} which open up a spot for a {@code put} to succeed in {@code 575 * countdownInMillis}. 576 */ createWithDelay(long countdownInMillis)577 static TimedPutQueue createWithDelay(long countdownInMillis) { 578 return new TimedPutQueue(countdownInMillis); 579 } 580 TimedPutQueue(long countdownInMillis)581 private TimedPutQueue(long countdownInMillis) { 582 this.queue = new ArrayBlockingQueue<>(1); 583 assertTrue(queue.offer("blocksPutCallsUntilRemoved")); 584 this.completed = new Completion(countdownInMillis); 585 scheduleEnableWrites(this.queue, countdownInMillis); 586 } 587 588 /** Perform a {@code put} and assert that operation completed in the expected timeframe. */ putSuccessfully()589 void putSuccessfully() { 590 putUninterruptibly(queue, ""); 591 completed.assertCompletionExpected(); 592 assertEquals("", queue.peek()); 593 } 594 scheduleEnableWrites(BlockingQueue<String> queue, long countdownInMillis)595 private static void scheduleEnableWrites(BlockingQueue<String> queue, long countdownInMillis) { 596 Runnable toRun = new EnableWrites(queue, countdownInMillis); 597 // TODO(cpovirk): automatically fail the test if this thread throws 598 Thread enablerThread = new Thread(toRun); 599 enablerThread.start(); 600 } 601 } 602 603 /** Manages a {@link BlockingQueue} and associated timings for a {@code take} call. */ 604 private static final class TimedTakeQueue { 605 final BlockingQueue<String> queue; 606 final Completion completed; 607 608 /** 609 * Creates a {@link EnableReads} which insert an element for a {@code take} to receive in {@code 610 * countdownInMillis}. 611 */ createWithDelay(long countdownInMillis)612 static TimedTakeQueue createWithDelay(long countdownInMillis) { 613 return new TimedTakeQueue(countdownInMillis); 614 } 615 TimedTakeQueue(long countdownInMillis)616 private TimedTakeQueue(long countdownInMillis) { 617 this.queue = new ArrayBlockingQueue<>(1); 618 this.completed = new Completion(countdownInMillis); 619 scheduleEnableReads(this.queue, countdownInMillis); 620 } 621 622 /** Perform a {@code take} and assert that operation completed in the expected timeframe. */ takeSuccessfully()623 void takeSuccessfully() { 624 assertEquals(EXPECTED_TAKE, takeUninterruptibly(queue)); 625 completed.assertCompletionExpected(); 626 assertTrue(queue.isEmpty()); 627 } 628 scheduleEnableReads(BlockingQueue<String> queue, long countdownInMillis)629 private static void scheduleEnableReads(BlockingQueue<String> queue, long countdownInMillis) { 630 Runnable toRun = new EnableReads(queue, countdownInMillis); 631 // TODO(cpovirk): automatically fail the test if this thread throws 632 Thread enablerThread = new Thread(toRun); 633 enablerThread.start(); 634 } 635 } 636 637 /** Manages a {@link Semaphore} and associated timings. */ 638 private static final class TimedSemaphore { 639 final Semaphore semaphore; 640 final Completion completed; 641 642 /** 643 * Create a {@link Release} which will release a semaphore permit in {@code countdownInMillis}. 644 */ createWithDelay(long countdownInMillis)645 static TimedSemaphore createWithDelay(long countdownInMillis) { 646 return new TimedSemaphore(countdownInMillis); 647 } 648 TimedSemaphore(long countdownInMillis)649 private TimedSemaphore(long countdownInMillis) { 650 this.semaphore = new Semaphore(0); 651 this.completed = new Completion(countdownInMillis); 652 scheduleRelease(countdownInMillis); 653 } 654 655 /** 656 * Requests a permit from the semaphore with a timeout and asserts that operation completed in 657 * the expected timeframe. 658 */ tryAcquireSuccessfully(long timeoutMillis)659 void tryAcquireSuccessfully(long timeoutMillis) { 660 assertTrue(tryAcquireUninterruptibly(semaphore, timeoutMillis, MILLISECONDS)); 661 completed.assertCompletionExpected(); 662 } 663 tryAcquireSuccessfully(int permits, long timeoutMillis)664 void tryAcquireSuccessfully(int permits, long timeoutMillis) { 665 assertTrue(tryAcquireUninterruptibly(semaphore, permits, timeoutMillis, MILLISECONDS)); 666 completed.assertCompletionExpected(); 667 } 668 669 /** 670 * Requests a permit from the semaphore with a timeout and asserts that the wait returned within 671 * the expected timeout. 672 */ tryAcquireUnsuccessfully(long timeoutMillis)673 private void tryAcquireUnsuccessfully(long timeoutMillis) { 674 assertFalse(tryAcquireUninterruptibly(semaphore, timeoutMillis, MILLISECONDS)); 675 completed.assertCompletionNotExpected(timeoutMillis); 676 } 677 tryAcquireUnsuccessfully(int permits, long timeoutMillis)678 private void tryAcquireUnsuccessfully(int permits, long timeoutMillis) { 679 assertFalse(tryAcquireUninterruptibly(semaphore, permits, timeoutMillis, MILLISECONDS)); 680 completed.assertCompletionNotExpected(timeoutMillis); 681 } 682 scheduleRelease(long countdownInMillis)683 private void scheduleRelease(long countdownInMillis) { 684 DelayedActionRunnable toRun = new Release(semaphore, countdownInMillis); 685 // TODO(cpovirk): automatically fail the test if this thread throws 686 Thread releaserThread = new Thread(toRun); 687 releaserThread.start(); 688 } 689 } 690 691 private abstract static class DelayedActionRunnable implements Runnable { 692 private final long tMinus; 693 DelayedActionRunnable(long tMinus)694 protected DelayedActionRunnable(long tMinus) { 695 this.tMinus = tMinus; 696 } 697 698 @Override run()699 public final void run() { 700 try { 701 Thread.sleep(tMinus); 702 } catch (InterruptedException e) { 703 throw new AssertionError(e); 704 } 705 doAction(); 706 } 707 doAction()708 protected abstract void doAction(); 709 } 710 711 private static class CountDown extends DelayedActionRunnable { 712 private final CountDownLatch latch; 713 CountDown(CountDownLatch latch, long tMinus)714 public CountDown(CountDownLatch latch, long tMinus) { 715 super(tMinus); 716 this.latch = latch; 717 } 718 719 @Override doAction()720 protected void doAction() { 721 latch.countDown(); 722 } 723 } 724 725 private static class EnableWrites extends DelayedActionRunnable { 726 private final BlockingQueue<String> queue; 727 EnableWrites(BlockingQueue<String> queue, long tMinus)728 public EnableWrites(BlockingQueue<String> queue, long tMinus) { 729 super(tMinus); 730 assertFalse(queue.isEmpty()); 731 assertFalse(queue.offer("shouldBeRejected")); 732 this.queue = queue; 733 } 734 735 @Override doAction()736 protected void doAction() { 737 assertNotNull(queue.remove()); 738 } 739 } 740 741 private static class EnableReads extends DelayedActionRunnable { 742 private final BlockingQueue<String> queue; 743 EnableReads(BlockingQueue<String> queue, long tMinus)744 public EnableReads(BlockingQueue<String> queue, long tMinus) { 745 super(tMinus); 746 assertTrue(queue.isEmpty()); 747 this.queue = queue; 748 } 749 750 @Override doAction()751 protected void doAction() { 752 assertTrue(queue.offer(EXPECTED_TAKE)); 753 } 754 } 755 756 private static final class TimedThread { 757 private final Thread thread; 758 private final Completion completed; 759 createWithDelay(long countdownInMillis)760 static TimedThread createWithDelay(long countdownInMillis) { 761 return new TimedThread(countdownInMillis); 762 } 763 TimedThread(long expectedCompletionWaitMillis)764 private TimedThread(long expectedCompletionWaitMillis) { 765 completed = new Completion(expectedCompletionWaitMillis); 766 thread = new Thread(new JoinTarget(expectedCompletionWaitMillis)); 767 thread.start(); 768 } 769 joinSuccessfully()770 void joinSuccessfully() { 771 Uninterruptibles.joinUninterruptibly(thread); 772 completed.assertCompletionExpected(); 773 assertEquals(Thread.State.TERMINATED, thread.getState()); 774 } 775 joinSuccessfully(long timeoutMillis)776 void joinSuccessfully(long timeoutMillis) { 777 Uninterruptibles.joinUninterruptibly(thread, timeoutMillis, MILLISECONDS); 778 completed.assertCompletionExpected(); 779 assertEquals(Thread.State.TERMINATED, thread.getState()); 780 } 781 joinUnsuccessfully(long timeoutMillis)782 void joinUnsuccessfully(long timeoutMillis) { 783 Uninterruptibles.joinUninterruptibly(thread, timeoutMillis, MILLISECONDS); 784 completed.assertCompletionNotExpected(timeoutMillis); 785 assertFalse(Thread.State.TERMINATED.equals(thread.getState())); 786 } 787 } 788 789 private static class JoinTarget extends DelayedActionRunnable { JoinTarget(long tMinus)790 public JoinTarget(long tMinus) { 791 super(tMinus); 792 } 793 794 @Override doAction()795 protected void doAction() {} 796 } 797 798 private static class Release extends DelayedActionRunnable { 799 private final Semaphore semaphore; 800 Release(Semaphore semaphore, long tMinus)801 public Release(Semaphore semaphore, long tMinus) { 802 super(tMinus); 803 this.semaphore = semaphore; 804 } 805 806 @Override doAction()807 protected void doAction() { 808 semaphore.release(10); 809 } 810 } 811 812 private static final class SleepTask extends DelayedActionRunnable { SleepTask(long tMinus)813 SleepTask(long tMinus) { 814 super(tMinus); 815 } 816 817 @Override doAction()818 protected void doAction() {} 819 } 820 sleepSuccessfully(long sleepMillis)821 private static void sleepSuccessfully(long sleepMillis) { 822 Completion completed = new Completion(sleepMillis - SLEEP_SLACK); 823 Uninterruptibles.sleepUninterruptibly(sleepMillis, MILLISECONDS); 824 completed.assertCompletionExpected(); 825 } 826 assertTimeNotPassed(Stopwatch stopwatch, long timelimitMillis)827 private static void assertTimeNotPassed(Stopwatch stopwatch, long timelimitMillis) { 828 long elapsedMillis = stopwatch.elapsed(MILLISECONDS); 829 assertTrue(elapsedMillis < timelimitMillis); 830 } 831 832 /** 833 * Await an interrupt, then clear the interrupt status. Similar to {@code 834 * assertTrue(Thread.interrupted())} except that this version tolerates late interrupts. 835 */ 836 private static void assertInterrupted() { 837 try { 838 /* 839 * The sleep() will end immediately if we've already been interrupted or 840 * wait patiently for the interrupt if not. 841 */ 842 Thread.sleep(LONG_DELAY_MS); 843 fail("Dude, where's my interrupt?"); 844 } catch (InterruptedException expected) { 845 } 846 } 847 848 private static void assertNotInterrupted() { 849 assertFalse(Thread.interrupted()); 850 } 851 852 private static void requestInterruptIn(long millis) { 853 InterruptionUtil.requestInterruptIn(millis, MILLISECONDS); 854 } 855 856 @CanIgnoreReturnValue 857 private static Thread acquireFor(final Lock lock, final long duration, final TimeUnit unit) { 858 final CountDownLatch latch = new CountDownLatch(1); 859 Thread thread = 860 new Thread() { 861 @Override 862 public void run() { 863 lock.lock(); 864 latch.countDown(); 865 try { 866 Thread.sleep(unit.toMillis(duration)); 867 } catch (InterruptedException e) { 868 // simply finish execution 869 } finally { 870 lock.unlock(); 871 } 872 } 873 }; 874 thread.setDaemon(true); 875 thread.start(); 876 awaitUninterruptibly(latch); 877 return thread; 878 } 879 880 private static class TestCondition implements Condition { 881 private final Lock lock; 882 private final Condition condition; 883 884 private TestCondition(Lock lock, Condition condition) { 885 this.lock = lock; 886 this.condition = condition; 887 } 888 889 static TestCondition createAndSignalAfter(long delay, TimeUnit unit) { 890 final TestCondition testCondition = create(); 891 892 ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(1); 893 // If signal() fails somehow, we should see a failed test, even without looking at the Future. 894 Future<?> unused = 895 scheduledPool.schedule( 896 new Runnable() { 897 @Override 898 public void run() { 899 testCondition.signal(); 900 } 901 }, 902 delay, 903 unit); 904 905 return testCondition; 906 } 907 908 static TestCondition create() { 909 Lock lock = new ReentrantLock(); 910 Condition condition = lock.newCondition(); 911 return new TestCondition(lock, condition); 912 } 913 914 @Override 915 public void await() throws InterruptedException { 916 lock.lock(); 917 try { 918 condition.await(); 919 } finally { 920 lock.unlock(); 921 } 922 } 923 924 @Override 925 public boolean await(long time, TimeUnit unit) throws InterruptedException { 926 lock.lock(); 927 try { 928 return condition.await(time, unit); 929 } finally { 930 lock.unlock(); 931 } 932 } 933 934 @Override 935 public void awaitUninterruptibly() { 936 lock.lock(); 937 try { 938 condition.awaitUninterruptibly(); 939 } finally { 940 lock.unlock(); 941 } 942 } 943 944 @Override 945 public long awaitNanos(long nanosTimeout) throws InterruptedException { 946 lock.lock(); 947 try { 948 return condition.awaitNanos(nanosTimeout); 949 } finally { 950 lock.unlock(); 951 } 952 } 953 954 @Override 955 public boolean awaitUntil(Date deadline) throws InterruptedException { 956 lock.lock(); 957 try { 958 return condition.awaitUntil(deadline); 959 } finally { 960 lock.unlock(); 961 } 962 } 963 964 @Override 965 public void signal() { 966 lock.lock(); 967 try { 968 condition.signal(); 969 } finally { 970 lock.unlock(); 971 } 972 } 973 974 @Override 975 public void signalAll() { 976 lock.lock(); 977 try { 978 condition.signalAll(); 979 } finally { 980 lock.unlock(); 981 } 982 } 983 } 984 } 985