1 /*
2  * Copyright (C) 2011 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5  * use this file except in compliance with the License. You may obtain a copy of
6  * the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations under
14  * the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.util.concurrent.InterruptionUtil.repeatedlyInterruptTestThread;
20 import static com.google.common.util.concurrent.Uninterruptibles.awaitTerminationUninterruptibly;
21 import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly;
22 import static com.google.common.util.concurrent.Uninterruptibles.joinUninterruptibly;
23 import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly;
24 import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly;
25 import static com.google.common.util.concurrent.Uninterruptibles.tryAcquireUninterruptibly;
26 import static com.google.common.util.concurrent.Uninterruptibles.tryLockUninterruptibly;
27 import static java.util.concurrent.Executors.newFixedThreadPool;
28 import static java.util.concurrent.TimeUnit.MILLISECONDS;
29 import static java.util.concurrent.TimeUnit.SECONDS;
30 
31 import com.google.common.base.Preconditions;
32 import com.google.common.base.Stopwatch;
33 import com.google.common.testing.NullPointerTester;
34 import com.google.common.testing.TearDown;
35 import com.google.common.testing.TearDownStack;
36 import com.google.errorprone.annotations.CanIgnoreReturnValue;
37 import java.time.Duration;
38 import java.util.Date;
39 import java.util.concurrent.ArrayBlockingQueue;
40 import java.util.concurrent.BlockingQueue;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.ExecutorService;
43 import java.util.concurrent.Executors;
44 import java.util.concurrent.Future;
45 import java.util.concurrent.ScheduledExecutorService;
46 import java.util.concurrent.Semaphore;
47 import java.util.concurrent.TimeUnit;
48 import java.util.concurrent.locks.Condition;
49 import java.util.concurrent.locks.Lock;
50 import java.util.concurrent.locks.ReentrantLock;
51 import junit.framework.TestCase;
52 
53 /**
54  * Tests for {@link Uninterruptibles}.
55  *
56  * @author Anthony Zana
57  */
58 
59 public class UninterruptiblesTest extends TestCase {
60   private static final String EXPECTED_TAKE = "expectedTake";
61 
62   /** Timeout to use when we don't expect the timeout to expire. */
63   private static final long LONG_DELAY_MS = 2500;
64 
65   private static final long SLEEP_SLACK = 2;
66 
67   private final TearDownStack tearDownStack = new TearDownStack();
68 
69   // NOTE: All durations in these tests are expressed in milliseconds
70   @Override
setUp()71   protected void setUp() {
72     // Clear any previous interrupt before running the test.
73     if (Thread.currentThread().isInterrupted()) {
74       throw new AssertionError(
75           "Thread interrupted on test entry. "
76               + "Some test probably didn't clear the interrupt state");
77     }
78 
79     tearDownStack.addTearDown(
80         new TearDown() {
81           @Override
82           public void tearDown() {
83             Thread.interrupted();
84           }
85         });
86   }
87 
88   @Override
tearDown()89   protected void tearDown() {
90     tearDownStack.runTearDown();
91   }
92 
testNull()93   public void testNull() throws Exception {
94     new NullPointerTester()
95         .setDefault(CountDownLatch.class, new CountDownLatch(0))
96         .setDefault(Semaphore.class, new Semaphore(999))
97         .testAllPublicStaticMethods(Uninterruptibles.class);
98   }
99 
100   // IncrementableCountDownLatch.await() tests
101 
102   // CountDownLatch.await() tests
103 
104   // Condition.await() tests
testConditionAwaitTimeoutExceeded()105   public void testConditionAwaitTimeoutExceeded() {
106     Stopwatch stopwatch = Stopwatch.createStarted();
107     Condition condition = TestCondition.create();
108 
109     boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 500, MILLISECONDS);
110 
111     assertFalse(signaledBeforeTimeout);
112     assertAtLeastTimePassed(stopwatch, 500);
113     assertNotInterrupted();
114   }
115 
testConditionAwaitTimeoutNotExceeded()116   public void testConditionAwaitTimeoutNotExceeded() {
117     Stopwatch stopwatch = Stopwatch.createStarted();
118     Condition condition = TestCondition.createAndSignalAfter(500, MILLISECONDS);
119 
120     boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1500, MILLISECONDS);
121 
122     assertTrue(signaledBeforeTimeout);
123     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
124     assertNotInterrupted();
125   }
126 
testConditionAwaitInterruptedTimeoutExceeded()127   public void testConditionAwaitInterruptedTimeoutExceeded() {
128     Stopwatch stopwatch = Stopwatch.createStarted();
129     Condition condition = TestCondition.create();
130     requestInterruptIn(500);
131 
132     boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1000, MILLISECONDS);
133 
134     assertFalse(signaledBeforeTimeout);
135     assertAtLeastTimePassed(stopwatch, 1000);
136     assertInterrupted();
137   }
138 
testConditionAwaitInterruptedTimeoutNotExceeded()139   public void testConditionAwaitInterruptedTimeoutNotExceeded() {
140     Stopwatch stopwatch = Stopwatch.createStarted();
141     Condition condition = TestCondition.createAndSignalAfter(1000, MILLISECONDS);
142     requestInterruptIn(500);
143 
144     boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1500, MILLISECONDS);
145 
146     assertTrue(signaledBeforeTimeout);
147     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
148     assertInterrupted();
149   }
150 
151   // Lock.tryLock() tests
testTryLockTimeoutExceeded()152   public void testTryLockTimeoutExceeded() {
153     Stopwatch stopwatch = Stopwatch.createStarted();
154     Lock lock = new ReentrantLock();
155     Thread lockThread = acquireFor(lock, 5, SECONDS);
156 
157     boolean lockAcquired = tryLockUninterruptibly(lock, 500, MILLISECONDS);
158 
159     assertFalse(lockAcquired);
160     assertAtLeastTimePassed(stopwatch, 500);
161     assertNotInterrupted();
162 
163     // finish locking thread
164     lockThread.interrupt();
165   }
166 
testTryLockTimeoutNotExceeded()167   public void testTryLockTimeoutNotExceeded() {
168     Stopwatch stopwatch = Stopwatch.createStarted();
169     Lock lock = new ReentrantLock();
170     acquireFor(lock, 500, MILLISECONDS);
171 
172     boolean signaledBeforeTimeout = tryLockUninterruptibly(lock, 1500, MILLISECONDS);
173 
174     assertTrue(signaledBeforeTimeout);
175     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
176     assertNotInterrupted();
177   }
178 
testTryLockInterruptedTimeoutExceeded()179   public void testTryLockInterruptedTimeoutExceeded() {
180     Stopwatch stopwatch = Stopwatch.createStarted();
181     Lock lock = new ReentrantLock();
182     Thread lockThread = acquireFor(lock, 5, SECONDS);
183     requestInterruptIn(500);
184 
185     boolean signaledBeforeTimeout = tryLockUninterruptibly(lock, 1000, MILLISECONDS);
186 
187     assertFalse(signaledBeforeTimeout);
188     assertAtLeastTimePassed(stopwatch, 1000);
189     assertInterrupted();
190 
191     // finish locking thread
192     lockThread.interrupt();
193   }
194 
testTryLockInterruptedTimeoutNotExceeded()195   public void testTryLockInterruptedTimeoutNotExceeded() {
196     Stopwatch stopwatch = Stopwatch.createStarted();
197     Lock lock = new ReentrantLock();
198     acquireFor(lock, 1000, MILLISECONDS);
199     requestInterruptIn(500);
200 
201     boolean signaledBeforeTimeout = tryLockUninterruptibly(lock, 1500, MILLISECONDS);
202 
203     assertTrue(signaledBeforeTimeout);
204     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
205     assertInterrupted();
206   }
207 
208   // BlockingQueue.put() tests
testPutWithNoWait()209   public void testPutWithNoWait() {
210     Stopwatch stopwatch = Stopwatch.createStarted();
211     BlockingQueue<String> queue = new ArrayBlockingQueue<>(999);
212     putUninterruptibly(queue, "");
213     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
214     assertEquals("", queue.peek());
215   }
216 
testPutNoInterrupt()217   public void testPutNoInterrupt() {
218     TimedPutQueue queue = TimedPutQueue.createWithDelay(20);
219     queue.putSuccessfully();
220     assertNotInterrupted();
221   }
222 
testPutSingleInterrupt()223   public void testPutSingleInterrupt() {
224     TimedPutQueue queue = TimedPutQueue.createWithDelay(50);
225     requestInterruptIn(10);
226     queue.putSuccessfully();
227     assertInterrupted();
228   }
229 
testPutMultiInterrupt()230   public void testPutMultiInterrupt() {
231     TimedPutQueue queue = TimedPutQueue.createWithDelay(100);
232     repeatedlyInterruptTestThread(20, tearDownStack);
233     queue.putSuccessfully();
234     assertInterrupted();
235   }
236 
237   // BlockingQueue.take() tests
testTakeWithNoWait()238   public void testTakeWithNoWait() {
239     Stopwatch stopwatch = Stopwatch.createStarted();
240     BlockingQueue<String> queue = new ArrayBlockingQueue<>(1);
241     assertTrue(queue.offer(""));
242     assertEquals("", takeUninterruptibly(queue));
243     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
244   }
245 
testTakeNoInterrupt()246   public void testTakeNoInterrupt() {
247     TimedTakeQueue queue = TimedTakeQueue.createWithDelay(20);
248     queue.takeSuccessfully();
249     assertNotInterrupted();
250   }
251 
testTakeSingleInterrupt()252   public void testTakeSingleInterrupt() {
253     TimedTakeQueue queue = TimedTakeQueue.createWithDelay(50);
254     requestInterruptIn(10);
255     queue.takeSuccessfully();
256     assertInterrupted();
257   }
258 
testTakeMultiInterrupt()259   public void testTakeMultiInterrupt() {
260     TimedTakeQueue queue = TimedTakeQueue.createWithDelay(100);
261     repeatedlyInterruptTestThread(20, tearDownStack);
262     queue.takeSuccessfully();
263     assertInterrupted();
264   }
265 
266   // join() tests
testJoinWithNoWait()267   public void testJoinWithNoWait() throws InterruptedException {
268     Stopwatch stopwatch = Stopwatch.createStarted();
269     Thread thread = new Thread(new JoinTarget(15));
270     thread.start();
271     thread.join();
272     assertFalse(thread.isAlive());
273 
274     joinUninterruptibly(thread);
275     joinUninterruptibly(thread, 0, MILLISECONDS);
276     joinUninterruptibly(thread, -42, MILLISECONDS);
277     joinUninterruptibly(thread, LONG_DELAY_MS, MILLISECONDS);
278     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
279   }
280 
testJoinNoInterrupt()281   public void testJoinNoInterrupt() {
282     TimedThread thread = TimedThread.createWithDelay(20);
283     thread.joinSuccessfully();
284     assertNotInterrupted();
285   }
286 
testJoinTimeoutNoInterruptNotExpired()287   public void testJoinTimeoutNoInterruptNotExpired() {
288     TimedThread thread = TimedThread.createWithDelay(20);
289     thread.joinSuccessfully(LONG_DELAY_MS);
290     assertNotInterrupted();
291   }
292 
testJoinTimeoutNoInterruptExpired()293   public void testJoinTimeoutNoInterruptExpired() {
294     TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS);
295     thread.joinUnsuccessfully(30);
296     assertNotInterrupted();
297   }
298 
testJoinSingleInterrupt()299   public void testJoinSingleInterrupt() {
300     TimedThread thread = TimedThread.createWithDelay(50);
301     requestInterruptIn(10);
302     thread.joinSuccessfully();
303     assertInterrupted();
304   }
305 
testJoinTimeoutSingleInterruptNoExpire()306   public void testJoinTimeoutSingleInterruptNoExpire() {
307     TimedThread thread = TimedThread.createWithDelay(50);
308     requestInterruptIn(10);
309     thread.joinSuccessfully(LONG_DELAY_MS);
310     assertInterrupted();
311   }
312 
testJoinTimeoutSingleInterruptExpired()313   public void testJoinTimeoutSingleInterruptExpired() {
314     TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS);
315     requestInterruptIn(10);
316     thread.joinUnsuccessfully(50);
317     assertInterrupted();
318   }
319 
testJoinMultiInterrupt()320   public void testJoinMultiInterrupt() {
321     TimedThread thread = TimedThread.createWithDelay(100);
322     repeatedlyInterruptTestThread(20, tearDownStack);
323     thread.joinSuccessfully();
324     assertInterrupted();
325   }
326 
testJoinTimeoutMultiInterruptNoExpire()327   public void testJoinTimeoutMultiInterruptNoExpire() {
328     TimedThread thread = TimedThread.createWithDelay(100);
329     repeatedlyInterruptTestThread(20, tearDownStack);
330     thread.joinSuccessfully(LONG_DELAY_MS);
331     assertInterrupted();
332   }
333 
testJoinTimeoutMultiInterruptExpired()334   public void testJoinTimeoutMultiInterruptExpired() {
335     /*
336      * We don't "need" to schedule a thread completion at all here, but by doing
337      * so, we come the closest we can to testing that the wait time is
338      * appropriately decreased on each progressive join() call.
339      */
340     TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS);
341     repeatedlyInterruptTestThread(20, tearDownStack);
342     thread.joinUnsuccessfully(70);
343     assertInterrupted();
344   }
345 
346   // sleep() Tests
testSleepNoInterrupt()347   public void testSleepNoInterrupt() {
348     sleepSuccessfully(10);
349   }
350 
testSleepSingleInterrupt()351   public void testSleepSingleInterrupt() {
352     requestInterruptIn(10);
353     sleepSuccessfully(50);
354     assertInterrupted();
355   }
356 
testSleepMultiInterrupt()357   public void testSleepMultiInterrupt() {
358     repeatedlyInterruptTestThread(10, tearDownStack);
359     sleepSuccessfully(100);
360     assertInterrupted();
361   }
362 
363   // Semaphore.tryAcquire() tests
testTryAcquireWithNoWait()364   public void testTryAcquireWithNoWait() {
365     Stopwatch stopwatch = Stopwatch.createStarted();
366     Semaphore semaphore = new Semaphore(99);
367     assertTrue(tryAcquireUninterruptibly(semaphore, 0, MILLISECONDS));
368     assertTrue(tryAcquireUninterruptibly(semaphore, -42, MILLISECONDS));
369     assertTrue(tryAcquireUninterruptibly(semaphore, LONG_DELAY_MS, MILLISECONDS));
370     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
371   }
372 
testTryAcquireTimeoutNoInterruptNotExpired()373   public void testTryAcquireTimeoutNoInterruptNotExpired() {
374     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(20);
375     semaphore.tryAcquireSuccessfully(LONG_DELAY_MS);
376     assertNotInterrupted();
377   }
378 
testTryAcquireTimeoutNoInterruptExpired()379   public void testTryAcquireTimeoutNoInterruptExpired() {
380     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
381     semaphore.tryAcquireUnsuccessfully(30);
382     assertNotInterrupted();
383   }
384 
testTryAcquireTimeoutSingleInterruptNoExpire()385   public void testTryAcquireTimeoutSingleInterruptNoExpire() {
386     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(50);
387     requestInterruptIn(10);
388     semaphore.tryAcquireSuccessfully(LONG_DELAY_MS);
389     assertInterrupted();
390   }
391 
testTryAcquireTimeoutSingleInterruptExpired()392   public void testTryAcquireTimeoutSingleInterruptExpired() {
393     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
394     requestInterruptIn(10);
395     semaphore.tryAcquireUnsuccessfully(50);
396     assertInterrupted();
397   }
398 
testTryAcquireTimeoutMultiInterruptNoExpire()399   public void testTryAcquireTimeoutMultiInterruptNoExpire() {
400     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(100);
401     repeatedlyInterruptTestThread(20, tearDownStack);
402     semaphore.tryAcquireSuccessfully(LONG_DELAY_MS);
403     assertInterrupted();
404   }
405 
testTryAcquireTimeoutMultiInterruptExpired()406   public void testTryAcquireTimeoutMultiInterruptExpired() {
407     /*
408      * We don't "need" to schedule a release() call at all here, but by doing
409      * so, we come the closest we can to testing that the wait time is
410      * appropriately decreased on each progressive tryAcquire() call.
411      */
412     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
413     repeatedlyInterruptTestThread(20, tearDownStack);
414     semaphore.tryAcquireUnsuccessfully(70);
415     assertInterrupted();
416   }
417 
testTryAcquireWithNoWaitMultiPermit()418   public void testTryAcquireWithNoWaitMultiPermit() {
419     Stopwatch stopwatch = Stopwatch.createStarted();
420     Semaphore semaphore = new Semaphore(99);
421     assertTrue(tryAcquireUninterruptibly(semaphore, 10, 0, MILLISECONDS));
422     assertTrue(tryAcquireUninterruptibly(semaphore, 10, -42, MILLISECONDS));
423     assertTrue(tryAcquireUninterruptibly(semaphore, 10, LONG_DELAY_MS, MILLISECONDS));
424     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
425   }
426 
testTryAcquireTimeoutNoInterruptNotExpiredMultiPermit()427   public void testTryAcquireTimeoutNoInterruptNotExpiredMultiPermit() {
428     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(20);
429     semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS);
430     assertNotInterrupted();
431   }
432 
testTryAcquireTimeoutNoInterruptExpiredMultiPermit()433   public void testTryAcquireTimeoutNoInterruptExpiredMultiPermit() {
434     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
435     semaphore.tryAcquireUnsuccessfully(10, 30);
436     assertNotInterrupted();
437   }
438 
testTryAcquireTimeoutSingleInterruptNoExpireMultiPermit()439   public void testTryAcquireTimeoutSingleInterruptNoExpireMultiPermit() {
440     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(50);
441     requestInterruptIn(10);
442     semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS);
443     assertInterrupted();
444   }
445 
testTryAcquireTimeoutSingleInterruptExpiredMultiPermit()446   public void testTryAcquireTimeoutSingleInterruptExpiredMultiPermit() {
447     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
448     requestInterruptIn(10);
449     semaphore.tryAcquireUnsuccessfully(10, 50);
450     assertInterrupted();
451   }
452 
testTryAcquireTimeoutMultiInterruptNoExpireMultiPermit()453   public void testTryAcquireTimeoutMultiInterruptNoExpireMultiPermit() {
454     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(100);
455     repeatedlyInterruptTestThread(20, tearDownStack);
456     semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS);
457     assertInterrupted();
458   }
459 
testTryAcquireTimeoutMultiInterruptExpiredMultiPermit()460   public void testTryAcquireTimeoutMultiInterruptExpiredMultiPermit() {
461     /*
462      * We don't "need" to schedule a release() call at all here, but by doing
463      * so, we come the closest we can to testing that the wait time is
464      * appropriately decreased on each progressive tryAcquire() call.
465      */
466     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
467     repeatedlyInterruptTestThread(20, tearDownStack);
468     semaphore.tryAcquireUnsuccessfully(10, 70);
469     assertInterrupted();
470   }
471 
472   // executor.awaitTermination Testcases
testTryAwaitTerminationUninterruptiblyDuration_success()473   public void testTryAwaitTerminationUninterruptiblyDuration_success() {
474     ExecutorService executor = newFixedThreadPool(1);
475     requestInterruptIn(500);
476     executor.execute(new SleepTask(1000));
477     executor.shutdown();
478     assertTrue(awaitTerminationUninterruptibly(executor, Duration.ofMillis(LONG_DELAY_MS)));
479     assertTrue(executor.isTerminated());
480     assertInterrupted();
481   }
482 
testTryAwaitTerminationUninterruptiblyDuration_failure()483   public void testTryAwaitTerminationUninterruptiblyDuration_failure() {
484     ExecutorService executor = newFixedThreadPool(1);
485     requestInterruptIn(500);
486     executor.execute(new SleepTask(10000));
487     executor.shutdown();
488     assertFalse(awaitTerminationUninterruptibly(executor, Duration.ofMillis(1000)));
489     assertFalse(executor.isTerminated());
490     assertInterrupted();
491   }
492 
testTryAwaitTerminationUninterruptiblyLongTimeUnit_success()493   public void testTryAwaitTerminationUninterruptiblyLongTimeUnit_success() {
494     ExecutorService executor = newFixedThreadPool(1);
495     requestInterruptIn(500);
496     executor.execute(new SleepTask(1000));
497     executor.shutdown();
498     assertTrue(awaitTerminationUninterruptibly(executor, LONG_DELAY_MS, MILLISECONDS));
499     assertTrue(executor.isTerminated());
500     assertInterrupted();
501   }
502 
testTryAwaitTerminationUninterruptiblyLongTimeUnit_failure()503   public void testTryAwaitTerminationUninterruptiblyLongTimeUnit_failure() {
504     ExecutorService executor = newFixedThreadPool(1);
505     requestInterruptIn(500);
506     executor.execute(new SleepTask(10000));
507     executor.shutdown();
508     assertFalse(awaitTerminationUninterruptibly(executor, 1000, MILLISECONDS));
509     assertFalse(executor.isTerminated());
510     assertInterrupted();
511   }
512 
testTryAwaitTerminationInfiniteTimeout()513   public void testTryAwaitTerminationInfiniteTimeout() {
514     ExecutorService executor = newFixedThreadPool(1);
515     requestInterruptIn(500);
516     executor.execute(new SleepTask(1000));
517     executor.shutdown();
518     awaitTerminationUninterruptibly(executor);
519     assertTrue(executor.isTerminated());
520     assertInterrupted();
521   }
522 
523   /**
524    * Wrapper around {@link Stopwatch} which also contains an "expected completion time." Creating a
525    * {@code Completion} starts the underlying stopwatch.
526    */
527   private static final class Completion {
528     final Stopwatch stopwatch;
529     final long expectedCompletionWaitMillis;
530 
Completion(long expectedCompletionWaitMillis)531     Completion(long expectedCompletionWaitMillis) {
532       this.expectedCompletionWaitMillis = expectedCompletionWaitMillis;
533       stopwatch = Stopwatch.createStarted();
534     }
535 
536     /**
537      * Asserts that the expected completion time has passed (and not "too much" time beyond that).
538      */
assertCompletionExpected()539     void assertCompletionExpected() {
540       assertAtLeastTimePassed(stopwatch, expectedCompletionWaitMillis);
541       assertTimeNotPassed(stopwatch, expectedCompletionWaitMillis + LONG_DELAY_MS);
542     }
543 
544     /**
545      * Asserts that at least {@code timeout} has passed but the expected completion time has not.
546      */
assertCompletionNotExpected(long timeout)547     void assertCompletionNotExpected(long timeout) {
548       Preconditions.checkArgument(timeout < expectedCompletionWaitMillis);
549       assertAtLeastTimePassed(stopwatch, timeout);
550       assertTimeNotPassed(stopwatch, expectedCompletionWaitMillis);
551     }
552   }
553 
554   private static void assertAtLeastTimePassed(Stopwatch stopwatch, long expectedMillis) {
555     long elapsedMillis = stopwatch.elapsed(MILLISECONDS);
556     /*
557      * The "+ 5" below is to permit, say, sleep(10) to sleep only 9 milliseconds. We see such
558      * behavior sometimes when running these tests publicly as part of Guava. "+ 5" is probably more
559      * generous than it needs to be.
560      */
561     assertTrue(
562         "Expected elapsed millis to be >= " + expectedMillis + " but was " + elapsedMillis,
563         elapsedMillis + 5 >= expectedMillis);
564   }
565 
566   // TODO(cpovirk): Split this into separate CountDownLatch and IncrementableCountDownLatch classes.
567 
568   /** Manages a {@link BlockingQueue} and associated timings for a {@code put} call. */
569   private static final class TimedPutQueue {
570     final BlockingQueue<String> queue;
571     final Completion completed;
572 
573     /**
574      * Creates a {@link EnableWrites} which open up a spot for a {@code put} to succeed in {@code
575      * countdownInMillis}.
576      */
createWithDelay(long countdownInMillis)577     static TimedPutQueue createWithDelay(long countdownInMillis) {
578       return new TimedPutQueue(countdownInMillis);
579     }
580 
TimedPutQueue(long countdownInMillis)581     private TimedPutQueue(long countdownInMillis) {
582       this.queue = new ArrayBlockingQueue<>(1);
583       assertTrue(queue.offer("blocksPutCallsUntilRemoved"));
584       this.completed = new Completion(countdownInMillis);
585       scheduleEnableWrites(this.queue, countdownInMillis);
586     }
587 
588     /** Perform a {@code put} and assert that operation completed in the expected timeframe. */
putSuccessfully()589     void putSuccessfully() {
590       putUninterruptibly(queue, "");
591       completed.assertCompletionExpected();
592       assertEquals("", queue.peek());
593     }
594 
scheduleEnableWrites(BlockingQueue<String> queue, long countdownInMillis)595     private static void scheduleEnableWrites(BlockingQueue<String> queue, long countdownInMillis) {
596       Runnable toRun = new EnableWrites(queue, countdownInMillis);
597       // TODO(cpovirk): automatically fail the test if this thread throws
598       Thread enablerThread = new Thread(toRun);
599       enablerThread.start();
600     }
601   }
602 
603   /** Manages a {@link BlockingQueue} and associated timings for a {@code take} call. */
604   private static final class TimedTakeQueue {
605     final BlockingQueue<String> queue;
606     final Completion completed;
607 
608     /**
609      * Creates a {@link EnableReads} which insert an element for a {@code take} to receive in {@code
610      * countdownInMillis}.
611      */
createWithDelay(long countdownInMillis)612     static TimedTakeQueue createWithDelay(long countdownInMillis) {
613       return new TimedTakeQueue(countdownInMillis);
614     }
615 
TimedTakeQueue(long countdownInMillis)616     private TimedTakeQueue(long countdownInMillis) {
617       this.queue = new ArrayBlockingQueue<>(1);
618       this.completed = new Completion(countdownInMillis);
619       scheduleEnableReads(this.queue, countdownInMillis);
620     }
621 
622     /** Perform a {@code take} and assert that operation completed in the expected timeframe. */
takeSuccessfully()623     void takeSuccessfully() {
624       assertEquals(EXPECTED_TAKE, takeUninterruptibly(queue));
625       completed.assertCompletionExpected();
626       assertTrue(queue.isEmpty());
627     }
628 
scheduleEnableReads(BlockingQueue<String> queue, long countdownInMillis)629     private static void scheduleEnableReads(BlockingQueue<String> queue, long countdownInMillis) {
630       Runnable toRun = new EnableReads(queue, countdownInMillis);
631       // TODO(cpovirk): automatically fail the test if this thread throws
632       Thread enablerThread = new Thread(toRun);
633       enablerThread.start();
634     }
635   }
636 
637   /** Manages a {@link Semaphore} and associated timings. */
638   private static final class TimedSemaphore {
639     final Semaphore semaphore;
640     final Completion completed;
641 
642     /**
643      * Create a {@link Release} which will release a semaphore permit in {@code countdownInMillis}.
644      */
createWithDelay(long countdownInMillis)645     static TimedSemaphore createWithDelay(long countdownInMillis) {
646       return new TimedSemaphore(countdownInMillis);
647     }
648 
TimedSemaphore(long countdownInMillis)649     private TimedSemaphore(long countdownInMillis) {
650       this.semaphore = new Semaphore(0);
651       this.completed = new Completion(countdownInMillis);
652       scheduleRelease(countdownInMillis);
653     }
654 
655     /**
656      * Requests a permit from the semaphore with a timeout and asserts that operation completed in
657      * the expected timeframe.
658      */
tryAcquireSuccessfully(long timeoutMillis)659     void tryAcquireSuccessfully(long timeoutMillis) {
660       assertTrue(tryAcquireUninterruptibly(semaphore, timeoutMillis, MILLISECONDS));
661       completed.assertCompletionExpected();
662     }
663 
tryAcquireSuccessfully(int permits, long timeoutMillis)664     void tryAcquireSuccessfully(int permits, long timeoutMillis) {
665       assertTrue(tryAcquireUninterruptibly(semaphore, permits, timeoutMillis, MILLISECONDS));
666       completed.assertCompletionExpected();
667     }
668 
669     /**
670      * Requests a permit from the semaphore with a timeout and asserts that the wait returned within
671      * the expected timeout.
672      */
tryAcquireUnsuccessfully(long timeoutMillis)673     private void tryAcquireUnsuccessfully(long timeoutMillis) {
674       assertFalse(tryAcquireUninterruptibly(semaphore, timeoutMillis, MILLISECONDS));
675       completed.assertCompletionNotExpected(timeoutMillis);
676     }
677 
tryAcquireUnsuccessfully(int permits, long timeoutMillis)678     private void tryAcquireUnsuccessfully(int permits, long timeoutMillis) {
679       assertFalse(tryAcquireUninterruptibly(semaphore, permits, timeoutMillis, MILLISECONDS));
680       completed.assertCompletionNotExpected(timeoutMillis);
681     }
682 
scheduleRelease(long countdownInMillis)683     private void scheduleRelease(long countdownInMillis) {
684       DelayedActionRunnable toRun = new Release(semaphore, countdownInMillis);
685       // TODO(cpovirk): automatically fail the test if this thread throws
686       Thread releaserThread = new Thread(toRun);
687       releaserThread.start();
688     }
689   }
690 
691   private abstract static class DelayedActionRunnable implements Runnable {
692     private final long tMinus;
693 
DelayedActionRunnable(long tMinus)694     protected DelayedActionRunnable(long tMinus) {
695       this.tMinus = tMinus;
696     }
697 
698     @Override
run()699     public final void run() {
700       try {
701         Thread.sleep(tMinus);
702       } catch (InterruptedException e) {
703         throw new AssertionError(e);
704       }
705       doAction();
706     }
707 
doAction()708     protected abstract void doAction();
709   }
710 
711   private static class CountDown extends DelayedActionRunnable {
712     private final CountDownLatch latch;
713 
CountDown(CountDownLatch latch, long tMinus)714     public CountDown(CountDownLatch latch, long tMinus) {
715       super(tMinus);
716       this.latch = latch;
717     }
718 
719     @Override
doAction()720     protected void doAction() {
721       latch.countDown();
722     }
723   }
724 
725   private static class EnableWrites extends DelayedActionRunnable {
726     private final BlockingQueue<String> queue;
727 
EnableWrites(BlockingQueue<String> queue, long tMinus)728     public EnableWrites(BlockingQueue<String> queue, long tMinus) {
729       super(tMinus);
730       assertFalse(queue.isEmpty());
731       assertFalse(queue.offer("shouldBeRejected"));
732       this.queue = queue;
733     }
734 
735     @Override
doAction()736     protected void doAction() {
737       assertNotNull(queue.remove());
738     }
739   }
740 
741   private static class EnableReads extends DelayedActionRunnable {
742     private final BlockingQueue<String> queue;
743 
EnableReads(BlockingQueue<String> queue, long tMinus)744     public EnableReads(BlockingQueue<String> queue, long tMinus) {
745       super(tMinus);
746       assertTrue(queue.isEmpty());
747       this.queue = queue;
748     }
749 
750     @Override
doAction()751     protected void doAction() {
752       assertTrue(queue.offer(EXPECTED_TAKE));
753     }
754   }
755 
756   private static final class TimedThread {
757     private final Thread thread;
758     private final Completion completed;
759 
createWithDelay(long countdownInMillis)760     static TimedThread createWithDelay(long countdownInMillis) {
761       return new TimedThread(countdownInMillis);
762     }
763 
TimedThread(long expectedCompletionWaitMillis)764     private TimedThread(long expectedCompletionWaitMillis) {
765       completed = new Completion(expectedCompletionWaitMillis);
766       thread = new Thread(new JoinTarget(expectedCompletionWaitMillis));
767       thread.start();
768     }
769 
joinSuccessfully()770     void joinSuccessfully() {
771       Uninterruptibles.joinUninterruptibly(thread);
772       completed.assertCompletionExpected();
773       assertEquals(Thread.State.TERMINATED, thread.getState());
774     }
775 
joinSuccessfully(long timeoutMillis)776     void joinSuccessfully(long timeoutMillis) {
777       Uninterruptibles.joinUninterruptibly(thread, timeoutMillis, MILLISECONDS);
778       completed.assertCompletionExpected();
779       assertEquals(Thread.State.TERMINATED, thread.getState());
780     }
781 
joinUnsuccessfully(long timeoutMillis)782     void joinUnsuccessfully(long timeoutMillis) {
783       Uninterruptibles.joinUninterruptibly(thread, timeoutMillis, MILLISECONDS);
784       completed.assertCompletionNotExpected(timeoutMillis);
785       assertFalse(Thread.State.TERMINATED.equals(thread.getState()));
786     }
787   }
788 
789   private static class JoinTarget extends DelayedActionRunnable {
JoinTarget(long tMinus)790     public JoinTarget(long tMinus) {
791       super(tMinus);
792     }
793 
794     @Override
doAction()795     protected void doAction() {}
796   }
797 
798   private static class Release extends DelayedActionRunnable {
799     private final Semaphore semaphore;
800 
Release(Semaphore semaphore, long tMinus)801     public Release(Semaphore semaphore, long tMinus) {
802       super(tMinus);
803       this.semaphore = semaphore;
804     }
805 
806     @Override
doAction()807     protected void doAction() {
808       semaphore.release(10);
809     }
810   }
811 
812   private static final class SleepTask extends DelayedActionRunnable {
SleepTask(long tMinus)813     SleepTask(long tMinus) {
814       super(tMinus);
815     }
816 
817     @Override
doAction()818     protected void doAction() {}
819   }
820 
sleepSuccessfully(long sleepMillis)821   private static void sleepSuccessfully(long sleepMillis) {
822     Completion completed = new Completion(sleepMillis - SLEEP_SLACK);
823     Uninterruptibles.sleepUninterruptibly(sleepMillis, MILLISECONDS);
824     completed.assertCompletionExpected();
825   }
826 
assertTimeNotPassed(Stopwatch stopwatch, long timelimitMillis)827   private static void assertTimeNotPassed(Stopwatch stopwatch, long timelimitMillis) {
828     long elapsedMillis = stopwatch.elapsed(MILLISECONDS);
829     assertTrue(elapsedMillis < timelimitMillis);
830   }
831 
832   /**
833    * Await an interrupt, then clear the interrupt status. Similar to {@code
834    * assertTrue(Thread.interrupted())} except that this version tolerates late interrupts.
835    */
836   private static void assertInterrupted() {
837     try {
838       /*
839        * The sleep() will end immediately if we've already been interrupted or
840        * wait patiently for the interrupt if not.
841        */
842       Thread.sleep(LONG_DELAY_MS);
843       fail("Dude, where's my interrupt?");
844     } catch (InterruptedException expected) {
845     }
846   }
847 
848   private static void assertNotInterrupted() {
849     assertFalse(Thread.interrupted());
850   }
851 
852   private static void requestInterruptIn(long millis) {
853     InterruptionUtil.requestInterruptIn(millis, MILLISECONDS);
854   }
855 
856   @CanIgnoreReturnValue
857   private static Thread acquireFor(final Lock lock, final long duration, final TimeUnit unit) {
858     final CountDownLatch latch = new CountDownLatch(1);
859     Thread thread =
860         new Thread() {
861           @Override
862           public void run() {
863             lock.lock();
864             latch.countDown();
865             try {
866               Thread.sleep(unit.toMillis(duration));
867             } catch (InterruptedException e) {
868               // simply finish execution
869             } finally {
870               lock.unlock();
871             }
872           }
873         };
874     thread.setDaemon(true);
875     thread.start();
876     awaitUninterruptibly(latch);
877     return thread;
878   }
879 
880   private static class TestCondition implements Condition {
881     private final Lock lock;
882     private final Condition condition;
883 
884     private TestCondition(Lock lock, Condition condition) {
885       this.lock = lock;
886       this.condition = condition;
887     }
888 
889     static TestCondition createAndSignalAfter(long delay, TimeUnit unit) {
890       final TestCondition testCondition = create();
891 
892       ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(1);
893       // If signal() fails somehow, we should see a failed test, even without looking at the Future.
894       Future<?> unused =
895           scheduledPool.schedule(
896               new Runnable() {
897                 @Override
898                 public void run() {
899                   testCondition.signal();
900                 }
901               },
902               delay,
903               unit);
904 
905       return testCondition;
906     }
907 
908     static TestCondition create() {
909       Lock lock = new ReentrantLock();
910       Condition condition = lock.newCondition();
911       return new TestCondition(lock, condition);
912     }
913 
914     @Override
915     public void await() throws InterruptedException {
916       lock.lock();
917       try {
918         condition.await();
919       } finally {
920         lock.unlock();
921       }
922     }
923 
924     @Override
925     public boolean await(long time, TimeUnit unit) throws InterruptedException {
926       lock.lock();
927       try {
928         return condition.await(time, unit);
929       } finally {
930         lock.unlock();
931       }
932     }
933 
934     @Override
935     public void awaitUninterruptibly() {
936       lock.lock();
937       try {
938         condition.awaitUninterruptibly();
939       } finally {
940         lock.unlock();
941       }
942     }
943 
944     @Override
945     public long awaitNanos(long nanosTimeout) throws InterruptedException {
946       lock.lock();
947       try {
948         return condition.awaitNanos(nanosTimeout);
949       } finally {
950         lock.unlock();
951       }
952     }
953 
954     @Override
955     public boolean awaitUntil(Date deadline) throws InterruptedException {
956       lock.lock();
957       try {
958         return condition.awaitUntil(deadline);
959       } finally {
960         lock.unlock();
961       }
962     }
963 
964     @Override
965     public void signal() {
966       lock.lock();
967       try {
968         condition.signal();
969       } finally {
970         lock.unlock();
971       }
972     }
973 
974     @Override
975     public void signalAll() {
976       lock.lock();
977       try {
978         condition.signalAll();
979       } finally {
980         lock.unlock();
981       }
982     }
983   }
984 }
985