1 /* 2 * Copyright (C) 2011 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not 5 * use this file except in compliance with the License. You may obtain a copy of 6 * the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 * License for the specific language governing permissions and limitations under 14 * the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.util.concurrent.InterruptionUtil.repeatedlyInterruptTestThread; 20 import static com.google.common.util.concurrent.Uninterruptibles.awaitTerminationUninterruptibly; 21 import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly; 22 import static com.google.common.util.concurrent.Uninterruptibles.joinUninterruptibly; 23 import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly; 24 import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly; 25 import static com.google.common.util.concurrent.Uninterruptibles.tryAcquireUninterruptibly; 26 import static com.google.common.util.concurrent.Uninterruptibles.tryLockUninterruptibly; 27 import static java.util.concurrent.Executors.newFixedThreadPool; 28 import static java.util.concurrent.TimeUnit.MILLISECONDS; 29 import static java.util.concurrent.TimeUnit.SECONDS; 30 31 import com.google.common.base.Preconditions; 32 import com.google.common.base.Stopwatch; 33 import com.google.common.testing.NullPointerTester; 34 import com.google.common.testing.TearDown; 35 import com.google.common.testing.TearDownStack; 36 import com.google.errorprone.annotations.CanIgnoreReturnValue; 37 import java.util.Date; 38 import java.util.concurrent.ArrayBlockingQueue; 39 import java.util.concurrent.BlockingQueue; 40 import java.util.concurrent.CountDownLatch; 41 import java.util.concurrent.ExecutorService; 42 import java.util.concurrent.Executors; 43 import java.util.concurrent.Future; 44 import java.util.concurrent.ScheduledExecutorService; 45 import java.util.concurrent.Semaphore; 46 import java.util.concurrent.TimeUnit; 47 import java.util.concurrent.locks.Condition; 48 import java.util.concurrent.locks.Lock; 49 import java.util.concurrent.locks.ReentrantLock; 50 import junit.framework.TestCase; 51 52 /** 53 * Tests for {@link Uninterruptibles}. 54 * 55 * @author Anthony Zana 56 */ 57 58 public class UninterruptiblesTest extends TestCase { 59 private static final String EXPECTED_TAKE = "expectedTake"; 60 61 /** Timeout to use when we don't expect the timeout to expire. */ 62 private static final long LONG_DELAY_MS = 2500; 63 64 private static final long SLEEP_SLACK = 2; 65 66 private final TearDownStack tearDownStack = new TearDownStack(); 67 68 // NOTE: All durations in these tests are expressed in milliseconds 69 @Override setUp()70 protected void setUp() { 71 // Clear any previous interrupt before running the test. 72 if (Thread.currentThread().isInterrupted()) { 73 throw new AssertionError( 74 "Thread interrupted on test entry. " 75 + "Some test probably didn't clear the interrupt state"); 76 } 77 78 tearDownStack.addTearDown( 79 new TearDown() { 80 @Override 81 public void tearDown() { 82 Thread.interrupted(); 83 } 84 }); 85 } 86 87 @Override tearDown()88 protected void tearDown() { 89 tearDownStack.runTearDown(); 90 } 91 testNull()92 public void testNull() throws Exception { 93 new NullPointerTester() 94 .setDefault(CountDownLatch.class, new CountDownLatch(0)) 95 .setDefault(Semaphore.class, new Semaphore(999)) 96 .testAllPublicStaticMethods(Uninterruptibles.class); 97 } 98 99 // IncrementableCountDownLatch.await() tests 100 101 // CountDownLatch.await() tests 102 103 // Condition.await() tests testConditionAwaitTimeoutExceeded()104 public void testConditionAwaitTimeoutExceeded() { 105 Stopwatch stopwatch = Stopwatch.createStarted(); 106 Condition condition = TestCondition.create(); 107 108 boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 500, MILLISECONDS); 109 110 assertFalse(signaledBeforeTimeout); 111 assertAtLeastTimePassed(stopwatch, 500); 112 assertNotInterrupted(); 113 } 114 testConditionAwaitTimeoutNotExceeded()115 public void testConditionAwaitTimeoutNotExceeded() { 116 Stopwatch stopwatch = Stopwatch.createStarted(); 117 Condition condition = TestCondition.createAndSignalAfter(500, MILLISECONDS); 118 119 boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1500, MILLISECONDS); 120 121 assertTrue(signaledBeforeTimeout); 122 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 123 assertNotInterrupted(); 124 } 125 testConditionAwaitInterruptedTimeoutExceeded()126 public void testConditionAwaitInterruptedTimeoutExceeded() { 127 Stopwatch stopwatch = Stopwatch.createStarted(); 128 Condition condition = TestCondition.create(); 129 requestInterruptIn(500); 130 131 boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1000, MILLISECONDS); 132 133 assertFalse(signaledBeforeTimeout); 134 assertAtLeastTimePassed(stopwatch, 1000); 135 assertInterrupted(); 136 } 137 testConditionAwaitInterruptedTimeoutNotExceeded()138 public void testConditionAwaitInterruptedTimeoutNotExceeded() { 139 Stopwatch stopwatch = Stopwatch.createStarted(); 140 Condition condition = TestCondition.createAndSignalAfter(1000, MILLISECONDS); 141 requestInterruptIn(500); 142 143 boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1500, MILLISECONDS); 144 145 assertTrue(signaledBeforeTimeout); 146 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 147 assertInterrupted(); 148 } 149 150 // Lock.tryLock() tests testTryLockTimeoutExceeded()151 public void testTryLockTimeoutExceeded() { 152 Stopwatch stopwatch = Stopwatch.createStarted(); 153 Lock lock = new ReentrantLock(); 154 Thread lockThread = acquireFor(lock, 5, SECONDS); 155 156 boolean lockAcquired = tryLockUninterruptibly(lock, 500, MILLISECONDS); 157 158 assertFalse(lockAcquired); 159 assertAtLeastTimePassed(stopwatch, 500); 160 assertNotInterrupted(); 161 162 // finish locking thread 163 lockThread.interrupt(); 164 } 165 testTryLockTimeoutNotExceeded()166 public void testTryLockTimeoutNotExceeded() { 167 Stopwatch stopwatch = Stopwatch.createStarted(); 168 Lock lock = new ReentrantLock(); 169 acquireFor(lock, 500, MILLISECONDS); 170 171 boolean signaledBeforeTimeout = tryLockUninterruptibly(lock, 1500, MILLISECONDS); 172 173 assertTrue(signaledBeforeTimeout); 174 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 175 assertNotInterrupted(); 176 } 177 testTryLockInterruptedTimeoutExceeded()178 public void testTryLockInterruptedTimeoutExceeded() { 179 Stopwatch stopwatch = Stopwatch.createStarted(); 180 Lock lock = new ReentrantLock(); 181 Thread lockThread = acquireFor(lock, 5, SECONDS); 182 requestInterruptIn(500); 183 184 boolean signaledBeforeTimeout = tryLockUninterruptibly(lock, 1000, MILLISECONDS); 185 186 assertFalse(signaledBeforeTimeout); 187 assertAtLeastTimePassed(stopwatch, 1000); 188 assertInterrupted(); 189 190 // finish locking thread 191 lockThread.interrupt(); 192 } 193 testTryLockInterruptedTimeoutNotExceeded()194 public void testTryLockInterruptedTimeoutNotExceeded() { 195 Stopwatch stopwatch = Stopwatch.createStarted(); 196 Lock lock = new ReentrantLock(); 197 acquireFor(lock, 1000, MILLISECONDS); 198 requestInterruptIn(500); 199 200 boolean signaledBeforeTimeout = tryLockUninterruptibly(lock, 1500, MILLISECONDS); 201 202 assertTrue(signaledBeforeTimeout); 203 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 204 assertInterrupted(); 205 } 206 207 // BlockingQueue.put() tests testPutWithNoWait()208 public void testPutWithNoWait() { 209 Stopwatch stopwatch = Stopwatch.createStarted(); 210 BlockingQueue<String> queue = new ArrayBlockingQueue<>(999); 211 putUninterruptibly(queue, ""); 212 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 213 assertEquals("", queue.peek()); 214 } 215 testPutNoInterrupt()216 public void testPutNoInterrupt() { 217 TimedPutQueue queue = TimedPutQueue.createWithDelay(20); 218 queue.putSuccessfully(); 219 assertNotInterrupted(); 220 } 221 testPutSingleInterrupt()222 public void testPutSingleInterrupt() { 223 TimedPutQueue queue = TimedPutQueue.createWithDelay(50); 224 requestInterruptIn(10); 225 queue.putSuccessfully(); 226 assertInterrupted(); 227 } 228 testPutMultiInterrupt()229 public void testPutMultiInterrupt() { 230 TimedPutQueue queue = TimedPutQueue.createWithDelay(100); 231 repeatedlyInterruptTestThread(20, tearDownStack); 232 queue.putSuccessfully(); 233 assertInterrupted(); 234 } 235 236 // BlockingQueue.take() tests testTakeWithNoWait()237 public void testTakeWithNoWait() { 238 Stopwatch stopwatch = Stopwatch.createStarted(); 239 BlockingQueue<String> queue = new ArrayBlockingQueue<>(1); 240 assertTrue(queue.offer("")); 241 assertEquals("", takeUninterruptibly(queue)); 242 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 243 } 244 testTakeNoInterrupt()245 public void testTakeNoInterrupt() { 246 TimedTakeQueue queue = TimedTakeQueue.createWithDelay(20); 247 queue.takeSuccessfully(); 248 assertNotInterrupted(); 249 } 250 testTakeSingleInterrupt()251 public void testTakeSingleInterrupt() { 252 TimedTakeQueue queue = TimedTakeQueue.createWithDelay(50); 253 requestInterruptIn(10); 254 queue.takeSuccessfully(); 255 assertInterrupted(); 256 } 257 testTakeMultiInterrupt()258 public void testTakeMultiInterrupt() { 259 TimedTakeQueue queue = TimedTakeQueue.createWithDelay(100); 260 repeatedlyInterruptTestThread(20, tearDownStack); 261 queue.takeSuccessfully(); 262 assertInterrupted(); 263 } 264 265 // join() tests testJoinWithNoWait()266 public void testJoinWithNoWait() throws InterruptedException { 267 Stopwatch stopwatch = Stopwatch.createStarted(); 268 Thread thread = new Thread(new JoinTarget(15)); 269 thread.start(); 270 thread.join(); 271 assertFalse(thread.isAlive()); 272 273 joinUninterruptibly(thread); 274 joinUninterruptibly(thread, 0, MILLISECONDS); 275 joinUninterruptibly(thread, -42, MILLISECONDS); 276 joinUninterruptibly(thread, LONG_DELAY_MS, MILLISECONDS); 277 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 278 } 279 testJoinNoInterrupt()280 public void testJoinNoInterrupt() { 281 TimedThread thread = TimedThread.createWithDelay(20); 282 thread.joinSuccessfully(); 283 assertNotInterrupted(); 284 } 285 testJoinTimeoutNoInterruptNotExpired()286 public void testJoinTimeoutNoInterruptNotExpired() { 287 TimedThread thread = TimedThread.createWithDelay(20); 288 thread.joinSuccessfully(LONG_DELAY_MS); 289 assertNotInterrupted(); 290 } 291 testJoinTimeoutNoInterruptExpired()292 public void testJoinTimeoutNoInterruptExpired() { 293 TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS); 294 thread.joinUnsuccessfully(30); 295 assertNotInterrupted(); 296 } 297 testJoinSingleInterrupt()298 public void testJoinSingleInterrupt() { 299 TimedThread thread = TimedThread.createWithDelay(50); 300 requestInterruptIn(10); 301 thread.joinSuccessfully(); 302 assertInterrupted(); 303 } 304 testJoinTimeoutSingleInterruptNoExpire()305 public void testJoinTimeoutSingleInterruptNoExpire() { 306 TimedThread thread = TimedThread.createWithDelay(50); 307 requestInterruptIn(10); 308 thread.joinSuccessfully(LONG_DELAY_MS); 309 assertInterrupted(); 310 } 311 testJoinTimeoutSingleInterruptExpired()312 public void testJoinTimeoutSingleInterruptExpired() { 313 TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS); 314 requestInterruptIn(10); 315 thread.joinUnsuccessfully(50); 316 assertInterrupted(); 317 } 318 testJoinMultiInterrupt()319 public void testJoinMultiInterrupt() { 320 TimedThread thread = TimedThread.createWithDelay(100); 321 repeatedlyInterruptTestThread(20, tearDownStack); 322 thread.joinSuccessfully(); 323 assertInterrupted(); 324 } 325 testJoinTimeoutMultiInterruptNoExpire()326 public void testJoinTimeoutMultiInterruptNoExpire() { 327 TimedThread thread = TimedThread.createWithDelay(100); 328 repeatedlyInterruptTestThread(20, tearDownStack); 329 thread.joinSuccessfully(LONG_DELAY_MS); 330 assertInterrupted(); 331 } 332 testJoinTimeoutMultiInterruptExpired()333 public void testJoinTimeoutMultiInterruptExpired() { 334 /* 335 * We don't "need" to schedule a thread completion at all here, but by doing 336 * so, we come the closest we can to testing that the wait time is 337 * appropriately decreased on each progressive join() call. 338 */ 339 TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS); 340 repeatedlyInterruptTestThread(20, tearDownStack); 341 thread.joinUnsuccessfully(70); 342 assertInterrupted(); 343 } 344 345 // sleep() Tests testSleepNoInterrupt()346 public void testSleepNoInterrupt() { 347 sleepSuccessfully(10); 348 } 349 testSleepSingleInterrupt()350 public void testSleepSingleInterrupt() { 351 requestInterruptIn(10); 352 sleepSuccessfully(50); 353 assertInterrupted(); 354 } 355 testSleepMultiInterrupt()356 public void testSleepMultiInterrupt() { 357 repeatedlyInterruptTestThread(10, tearDownStack); 358 sleepSuccessfully(100); 359 assertInterrupted(); 360 } 361 362 // Semaphore.tryAcquire() tests testTryAcquireWithNoWait()363 public void testTryAcquireWithNoWait() { 364 Stopwatch stopwatch = Stopwatch.createStarted(); 365 Semaphore semaphore = new Semaphore(99); 366 assertTrue(tryAcquireUninterruptibly(semaphore, 0, MILLISECONDS)); 367 assertTrue(tryAcquireUninterruptibly(semaphore, -42, MILLISECONDS)); 368 assertTrue(tryAcquireUninterruptibly(semaphore, LONG_DELAY_MS, MILLISECONDS)); 369 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 370 } 371 testTryAcquireTimeoutNoInterruptNotExpired()372 public void testTryAcquireTimeoutNoInterruptNotExpired() { 373 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(20); 374 semaphore.tryAcquireSuccessfully(LONG_DELAY_MS); 375 assertNotInterrupted(); 376 } 377 testTryAcquireTimeoutNoInterruptExpired()378 public void testTryAcquireTimeoutNoInterruptExpired() { 379 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS); 380 semaphore.tryAcquireUnsuccessfully(30); 381 assertNotInterrupted(); 382 } 383 testTryAcquireTimeoutSingleInterruptNoExpire()384 public void testTryAcquireTimeoutSingleInterruptNoExpire() { 385 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(50); 386 requestInterruptIn(10); 387 semaphore.tryAcquireSuccessfully(LONG_DELAY_MS); 388 assertInterrupted(); 389 } 390 testTryAcquireTimeoutSingleInterruptExpired()391 public void testTryAcquireTimeoutSingleInterruptExpired() { 392 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS); 393 requestInterruptIn(10); 394 semaphore.tryAcquireUnsuccessfully(50); 395 assertInterrupted(); 396 } 397 testTryAcquireTimeoutMultiInterruptNoExpire()398 public void testTryAcquireTimeoutMultiInterruptNoExpire() { 399 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(100); 400 repeatedlyInterruptTestThread(20, tearDownStack); 401 semaphore.tryAcquireSuccessfully(LONG_DELAY_MS); 402 assertInterrupted(); 403 } 404 testTryAcquireTimeoutMultiInterruptExpired()405 public void testTryAcquireTimeoutMultiInterruptExpired() { 406 /* 407 * We don't "need" to schedule a release() call at all here, but by doing 408 * so, we come the closest we can to testing that the wait time is 409 * appropriately decreased on each progressive tryAcquire() call. 410 */ 411 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS); 412 repeatedlyInterruptTestThread(20, tearDownStack); 413 semaphore.tryAcquireUnsuccessfully(70); 414 assertInterrupted(); 415 } 416 testTryAcquireWithNoWaitMultiPermit()417 public void testTryAcquireWithNoWaitMultiPermit() { 418 Stopwatch stopwatch = Stopwatch.createStarted(); 419 Semaphore semaphore = new Semaphore(99); 420 assertTrue(tryAcquireUninterruptibly(semaphore, 10, 0, MILLISECONDS)); 421 assertTrue(tryAcquireUninterruptibly(semaphore, 10, -42, MILLISECONDS)); 422 assertTrue(tryAcquireUninterruptibly(semaphore, 10, LONG_DELAY_MS, MILLISECONDS)); 423 assertTimeNotPassed(stopwatch, LONG_DELAY_MS); 424 } 425 testTryAcquireTimeoutNoInterruptNotExpiredMultiPermit()426 public void testTryAcquireTimeoutNoInterruptNotExpiredMultiPermit() { 427 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(20); 428 semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS); 429 assertNotInterrupted(); 430 } 431 testTryAcquireTimeoutNoInterruptExpiredMultiPermit()432 public void testTryAcquireTimeoutNoInterruptExpiredMultiPermit() { 433 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS); 434 semaphore.tryAcquireUnsuccessfully(10, 30); 435 assertNotInterrupted(); 436 } 437 testTryAcquireTimeoutSingleInterruptNoExpireMultiPermit()438 public void testTryAcquireTimeoutSingleInterruptNoExpireMultiPermit() { 439 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(50); 440 requestInterruptIn(10); 441 semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS); 442 assertInterrupted(); 443 } 444 testTryAcquireTimeoutSingleInterruptExpiredMultiPermit()445 public void testTryAcquireTimeoutSingleInterruptExpiredMultiPermit() { 446 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS); 447 requestInterruptIn(10); 448 semaphore.tryAcquireUnsuccessfully(10, 50); 449 assertInterrupted(); 450 } 451 testTryAcquireTimeoutMultiInterruptNoExpireMultiPermit()452 public void testTryAcquireTimeoutMultiInterruptNoExpireMultiPermit() { 453 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(100); 454 repeatedlyInterruptTestThread(20, tearDownStack); 455 semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS); 456 assertInterrupted(); 457 } 458 testTryAcquireTimeoutMultiInterruptExpiredMultiPermit()459 public void testTryAcquireTimeoutMultiInterruptExpiredMultiPermit() { 460 /* 461 * We don't "need" to schedule a release() call at all here, but by doing 462 * so, we come the closest we can to testing that the wait time is 463 * appropriately decreased on each progressive tryAcquire() call. 464 */ 465 TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS); 466 repeatedlyInterruptTestThread(20, tearDownStack); 467 semaphore.tryAcquireUnsuccessfully(10, 70); 468 assertInterrupted(); 469 } 470 471 // executor.awaitTermination Testcases testTryAwaitTerminationUninterruptiblyLongTimeUnit_success()472 public void testTryAwaitTerminationUninterruptiblyLongTimeUnit_success() { 473 ExecutorService executor = newFixedThreadPool(1); 474 requestInterruptIn(500); 475 executor.execute(new SleepTask(1000)); 476 executor.shutdown(); 477 assertTrue(awaitTerminationUninterruptibly(executor, LONG_DELAY_MS, MILLISECONDS)); 478 assertTrue(executor.isTerminated()); 479 assertInterrupted(); 480 } 481 testTryAwaitTerminationUninterruptiblyLongTimeUnit_failure()482 public void testTryAwaitTerminationUninterruptiblyLongTimeUnit_failure() { 483 ExecutorService executor = newFixedThreadPool(1); 484 requestInterruptIn(500); 485 executor.execute(new SleepTask(10000)); 486 executor.shutdown(); 487 assertFalse(awaitTerminationUninterruptibly(executor, 1000, MILLISECONDS)); 488 assertFalse(executor.isTerminated()); 489 assertInterrupted(); 490 } 491 testTryAwaitTerminationInfiniteTimeout()492 public void testTryAwaitTerminationInfiniteTimeout() { 493 ExecutorService executor = newFixedThreadPool(1); 494 requestInterruptIn(500); 495 executor.execute(new SleepTask(1000)); 496 executor.shutdown(); 497 awaitTerminationUninterruptibly(executor); 498 assertTrue(executor.isTerminated()); 499 assertInterrupted(); 500 } 501 502 /** 503 * Wrapper around {@link Stopwatch} which also contains an "expected completion time." Creating a 504 * {@code Completion} starts the underlying stopwatch. 505 */ 506 private static final class Completion { 507 final Stopwatch stopwatch; 508 final long expectedCompletionWaitMillis; 509 Completion(long expectedCompletionWaitMillis)510 Completion(long expectedCompletionWaitMillis) { 511 this.expectedCompletionWaitMillis = expectedCompletionWaitMillis; 512 stopwatch = Stopwatch.createStarted(); 513 } 514 515 /** 516 * Asserts that the expected completion time has passed (and not "too much" time beyond that). 517 */ assertCompletionExpected()518 void assertCompletionExpected() { 519 assertAtLeastTimePassed(stopwatch, expectedCompletionWaitMillis); 520 assertTimeNotPassed(stopwatch, expectedCompletionWaitMillis + LONG_DELAY_MS); 521 } 522 523 /** 524 * Asserts that at least {@code timeout} has passed but the expected completion time has not. 525 */ assertCompletionNotExpected(long timeout)526 void assertCompletionNotExpected(long timeout) { 527 Preconditions.checkArgument(timeout < expectedCompletionWaitMillis); 528 assertAtLeastTimePassed(stopwatch, timeout); 529 assertTimeNotPassed(stopwatch, expectedCompletionWaitMillis); 530 } 531 } 532 533 private static void assertAtLeastTimePassed(Stopwatch stopwatch, long expectedMillis) { 534 long elapsedMillis = stopwatch.elapsed(MILLISECONDS); 535 /* 536 * The "+ 5" below is to permit, say, sleep(10) to sleep only 9 milliseconds. We see such 537 * behavior sometimes when running these tests publicly as part of Guava. "+ 5" is probably more 538 * generous than it needs to be. 539 */ 540 assertTrue( 541 "Expected elapsed millis to be >= " + expectedMillis + " but was " + elapsedMillis, 542 elapsedMillis + 5 >= expectedMillis); 543 } 544 545 // TODO(cpovirk): Split this into separate CountDownLatch and IncrementableCountDownLatch classes. 546 547 /** Manages a {@link BlockingQueue} and associated timings for a {@code put} call. */ 548 private static final class TimedPutQueue { 549 final BlockingQueue<String> queue; 550 final Completion completed; 551 552 /** 553 * Creates a {@link EnableWrites} which open up a spot for a {@code put} to succeed in {@code 554 * countdownInMillis}. 555 */ createWithDelay(long countdownInMillis)556 static TimedPutQueue createWithDelay(long countdownInMillis) { 557 return new TimedPutQueue(countdownInMillis); 558 } 559 TimedPutQueue(long countdownInMillis)560 private TimedPutQueue(long countdownInMillis) { 561 this.queue = new ArrayBlockingQueue<>(1); 562 assertTrue(queue.offer("blocksPutCallsUntilRemoved")); 563 this.completed = new Completion(countdownInMillis); 564 scheduleEnableWrites(this.queue, countdownInMillis); 565 } 566 567 /** Perform a {@code put} and assert that operation completed in the expected timeframe. */ putSuccessfully()568 void putSuccessfully() { 569 putUninterruptibly(queue, ""); 570 completed.assertCompletionExpected(); 571 assertEquals("", queue.peek()); 572 } 573 scheduleEnableWrites(BlockingQueue<String> queue, long countdownInMillis)574 private static void scheduleEnableWrites(BlockingQueue<String> queue, long countdownInMillis) { 575 Runnable toRun = new EnableWrites(queue, countdownInMillis); 576 // TODO(cpovirk): automatically fail the test if this thread throws 577 Thread enablerThread = new Thread(toRun); 578 enablerThread.start(); 579 } 580 } 581 582 /** Manages a {@link BlockingQueue} and associated timings for a {@code take} call. */ 583 private static final class TimedTakeQueue { 584 final BlockingQueue<String> queue; 585 final Completion completed; 586 587 /** 588 * Creates a {@link EnableReads} which insert an element for a {@code take} to receive in {@code 589 * countdownInMillis}. 590 */ createWithDelay(long countdownInMillis)591 static TimedTakeQueue createWithDelay(long countdownInMillis) { 592 return new TimedTakeQueue(countdownInMillis); 593 } 594 TimedTakeQueue(long countdownInMillis)595 private TimedTakeQueue(long countdownInMillis) { 596 this.queue = new ArrayBlockingQueue<>(1); 597 this.completed = new Completion(countdownInMillis); 598 scheduleEnableReads(this.queue, countdownInMillis); 599 } 600 601 /** Perform a {@code take} and assert that operation completed in the expected timeframe. */ takeSuccessfully()602 void takeSuccessfully() { 603 assertEquals(EXPECTED_TAKE, takeUninterruptibly(queue)); 604 completed.assertCompletionExpected(); 605 assertTrue(queue.isEmpty()); 606 } 607 scheduleEnableReads(BlockingQueue<String> queue, long countdownInMillis)608 private static void scheduleEnableReads(BlockingQueue<String> queue, long countdownInMillis) { 609 Runnable toRun = new EnableReads(queue, countdownInMillis); 610 // TODO(cpovirk): automatically fail the test if this thread throws 611 Thread enablerThread = new Thread(toRun); 612 enablerThread.start(); 613 } 614 } 615 616 /** Manages a {@link Semaphore} and associated timings. */ 617 private static final class TimedSemaphore { 618 final Semaphore semaphore; 619 final Completion completed; 620 621 /** 622 * Create a {@link Release} which will release a semaphore permit in {@code countdownInMillis}. 623 */ createWithDelay(long countdownInMillis)624 static TimedSemaphore createWithDelay(long countdownInMillis) { 625 return new TimedSemaphore(countdownInMillis); 626 } 627 TimedSemaphore(long countdownInMillis)628 private TimedSemaphore(long countdownInMillis) { 629 this.semaphore = new Semaphore(0); 630 this.completed = new Completion(countdownInMillis); 631 scheduleRelease(countdownInMillis); 632 } 633 634 /** 635 * Requests a permit from the semaphore with a timeout and asserts that operation completed in 636 * the expected timeframe. 637 */ tryAcquireSuccessfully(long timeoutMillis)638 void tryAcquireSuccessfully(long timeoutMillis) { 639 assertTrue(tryAcquireUninterruptibly(semaphore, timeoutMillis, MILLISECONDS)); 640 completed.assertCompletionExpected(); 641 } 642 tryAcquireSuccessfully(int permits, long timeoutMillis)643 void tryAcquireSuccessfully(int permits, long timeoutMillis) { 644 assertTrue(tryAcquireUninterruptibly(semaphore, permits, timeoutMillis, MILLISECONDS)); 645 completed.assertCompletionExpected(); 646 } 647 648 /** 649 * Requests a permit from the semaphore with a timeout and asserts that the wait returned within 650 * the expected timeout. 651 */ tryAcquireUnsuccessfully(long timeoutMillis)652 private void tryAcquireUnsuccessfully(long timeoutMillis) { 653 assertFalse(tryAcquireUninterruptibly(semaphore, timeoutMillis, MILLISECONDS)); 654 completed.assertCompletionNotExpected(timeoutMillis); 655 } 656 tryAcquireUnsuccessfully(int permits, long timeoutMillis)657 private void tryAcquireUnsuccessfully(int permits, long timeoutMillis) { 658 assertFalse(tryAcquireUninterruptibly(semaphore, permits, timeoutMillis, MILLISECONDS)); 659 completed.assertCompletionNotExpected(timeoutMillis); 660 } 661 scheduleRelease(long countdownInMillis)662 private void scheduleRelease(long countdownInMillis) { 663 DelayedActionRunnable toRun = new Release(semaphore, countdownInMillis); 664 // TODO(cpovirk): automatically fail the test if this thread throws 665 Thread releaserThread = new Thread(toRun); 666 releaserThread.start(); 667 } 668 } 669 670 private abstract static class DelayedActionRunnable implements Runnable { 671 private final long tMinus; 672 DelayedActionRunnable(long tMinus)673 protected DelayedActionRunnable(long tMinus) { 674 this.tMinus = tMinus; 675 } 676 677 @Override run()678 public final void run() { 679 try { 680 Thread.sleep(tMinus); 681 } catch (InterruptedException e) { 682 throw new AssertionError(e); 683 } 684 doAction(); 685 } 686 doAction()687 protected abstract void doAction(); 688 } 689 690 private static class CountDown extends DelayedActionRunnable { 691 private final CountDownLatch latch; 692 CountDown(CountDownLatch latch, long tMinus)693 public CountDown(CountDownLatch latch, long tMinus) { 694 super(tMinus); 695 this.latch = latch; 696 } 697 698 @Override doAction()699 protected void doAction() { 700 latch.countDown(); 701 } 702 } 703 704 private static class EnableWrites extends DelayedActionRunnable { 705 private final BlockingQueue<String> queue; 706 EnableWrites(BlockingQueue<String> queue, long tMinus)707 public EnableWrites(BlockingQueue<String> queue, long tMinus) { 708 super(tMinus); 709 assertFalse(queue.isEmpty()); 710 assertFalse(queue.offer("shouldBeRejected")); 711 this.queue = queue; 712 } 713 714 @Override doAction()715 protected void doAction() { 716 assertNotNull(queue.remove()); 717 } 718 } 719 720 private static class EnableReads extends DelayedActionRunnable { 721 private final BlockingQueue<String> queue; 722 EnableReads(BlockingQueue<String> queue, long tMinus)723 public EnableReads(BlockingQueue<String> queue, long tMinus) { 724 super(tMinus); 725 assertTrue(queue.isEmpty()); 726 this.queue = queue; 727 } 728 729 @Override doAction()730 protected void doAction() { 731 assertTrue(queue.offer(EXPECTED_TAKE)); 732 } 733 } 734 735 private static final class TimedThread { 736 private final Thread thread; 737 private final Completion completed; 738 createWithDelay(long countdownInMillis)739 static TimedThread createWithDelay(long countdownInMillis) { 740 return new TimedThread(countdownInMillis); 741 } 742 TimedThread(long expectedCompletionWaitMillis)743 private TimedThread(long expectedCompletionWaitMillis) { 744 completed = new Completion(expectedCompletionWaitMillis); 745 thread = new Thread(new JoinTarget(expectedCompletionWaitMillis)); 746 thread.start(); 747 } 748 joinSuccessfully()749 void joinSuccessfully() { 750 Uninterruptibles.joinUninterruptibly(thread); 751 completed.assertCompletionExpected(); 752 assertEquals(Thread.State.TERMINATED, thread.getState()); 753 } 754 joinSuccessfully(long timeoutMillis)755 void joinSuccessfully(long timeoutMillis) { 756 Uninterruptibles.joinUninterruptibly(thread, timeoutMillis, MILLISECONDS); 757 completed.assertCompletionExpected(); 758 assertEquals(Thread.State.TERMINATED, thread.getState()); 759 } 760 joinUnsuccessfully(long timeoutMillis)761 void joinUnsuccessfully(long timeoutMillis) { 762 Uninterruptibles.joinUninterruptibly(thread, timeoutMillis, MILLISECONDS); 763 completed.assertCompletionNotExpected(timeoutMillis); 764 assertFalse(Thread.State.TERMINATED.equals(thread.getState())); 765 } 766 } 767 768 private static class JoinTarget extends DelayedActionRunnable { JoinTarget(long tMinus)769 public JoinTarget(long tMinus) { 770 super(tMinus); 771 } 772 773 @Override doAction()774 protected void doAction() {} 775 } 776 777 private static class Release extends DelayedActionRunnable { 778 private final Semaphore semaphore; 779 Release(Semaphore semaphore, long tMinus)780 public Release(Semaphore semaphore, long tMinus) { 781 super(tMinus); 782 this.semaphore = semaphore; 783 } 784 785 @Override doAction()786 protected void doAction() { 787 semaphore.release(10); 788 } 789 } 790 791 private static final class SleepTask extends DelayedActionRunnable { SleepTask(long tMinus)792 SleepTask(long tMinus) { 793 super(tMinus); 794 } 795 796 @Override doAction()797 protected void doAction() {} 798 } 799 sleepSuccessfully(long sleepMillis)800 private static void sleepSuccessfully(long sleepMillis) { 801 Completion completed = new Completion(sleepMillis - SLEEP_SLACK); 802 Uninterruptibles.sleepUninterruptibly(sleepMillis, MILLISECONDS); 803 completed.assertCompletionExpected(); 804 } 805 assertTimeNotPassed(Stopwatch stopwatch, long timelimitMillis)806 private static void assertTimeNotPassed(Stopwatch stopwatch, long timelimitMillis) { 807 long elapsedMillis = stopwatch.elapsed(MILLISECONDS); 808 assertTrue(elapsedMillis < timelimitMillis); 809 } 810 811 /** 812 * Await an interrupt, then clear the interrupt status. Similar to {@code 813 * assertTrue(Thread.interrupted())} except that this version tolerates late interrupts. 814 */ 815 private static void assertInterrupted() { 816 try { 817 /* 818 * The sleep() will end immediately if we've already been interrupted or 819 * wait patiently for the interrupt if not. 820 */ 821 Thread.sleep(LONG_DELAY_MS); 822 fail("Dude, where's my interrupt?"); 823 } catch (InterruptedException expected) { 824 } 825 } 826 827 private static void assertNotInterrupted() { 828 assertFalse(Thread.interrupted()); 829 } 830 831 private static void requestInterruptIn(long millis) { 832 InterruptionUtil.requestInterruptIn(millis, MILLISECONDS); 833 } 834 835 @CanIgnoreReturnValue 836 private static Thread acquireFor(final Lock lock, final long duration, final TimeUnit unit) { 837 final CountDownLatch latch = new CountDownLatch(1); 838 Thread thread = 839 new Thread() { 840 @Override 841 public void run() { 842 lock.lock(); 843 latch.countDown(); 844 try { 845 Thread.sleep(unit.toMillis(duration)); 846 } catch (InterruptedException e) { 847 // simply finish execution 848 } finally { 849 lock.unlock(); 850 } 851 } 852 }; 853 thread.setDaemon(true); 854 thread.start(); 855 awaitUninterruptibly(latch); 856 return thread; 857 } 858 859 private static class TestCondition implements Condition { 860 private final Lock lock; 861 private final Condition condition; 862 863 private TestCondition(Lock lock, Condition condition) { 864 this.lock = lock; 865 this.condition = condition; 866 } 867 868 static TestCondition createAndSignalAfter(long delay, TimeUnit unit) { 869 final TestCondition testCondition = create(); 870 871 ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(1); 872 // If signal() fails somehow, we should see a failed test, even without looking at the Future. 873 Future<?> unused = 874 scheduledPool.schedule( 875 new Runnable() { 876 @Override 877 public void run() { 878 testCondition.signal(); 879 } 880 }, 881 delay, 882 unit); 883 884 return testCondition; 885 } 886 887 static TestCondition create() { 888 Lock lock = new ReentrantLock(); 889 Condition condition = lock.newCondition(); 890 return new TestCondition(lock, condition); 891 } 892 893 @Override 894 public void await() throws InterruptedException { 895 lock.lock(); 896 try { 897 condition.await(); 898 } finally { 899 lock.unlock(); 900 } 901 } 902 903 @Override 904 public boolean await(long time, TimeUnit unit) throws InterruptedException { 905 lock.lock(); 906 try { 907 return condition.await(time, unit); 908 } finally { 909 lock.unlock(); 910 } 911 } 912 913 @Override 914 public void awaitUninterruptibly() { 915 lock.lock(); 916 try { 917 condition.awaitUninterruptibly(); 918 } finally { 919 lock.unlock(); 920 } 921 } 922 923 @Override 924 public long awaitNanos(long nanosTimeout) throws InterruptedException { 925 lock.lock(); 926 try { 927 return condition.awaitNanos(nanosTimeout); 928 } finally { 929 lock.unlock(); 930 } 931 } 932 933 @Override 934 public boolean awaitUntil(Date deadline) throws InterruptedException { 935 lock.lock(); 936 try { 937 return condition.awaitUntil(deadline); 938 } finally { 939 lock.unlock(); 940 } 941 } 942 943 @Override 944 public void signal() { 945 lock.lock(); 946 try { 947 condition.signal(); 948 } finally { 949 lock.unlock(); 950 } 951 } 952 953 @Override 954 public void signalAll() { 955 lock.lock(); 956 try { 957 condition.signalAll(); 958 } finally { 959 lock.unlock(); 960 } 961 } 962 } 963 } 964