1 /*
2  * Copyright (C) 2011 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5  * use this file except in compliance with the License. You may obtain a copy of
6  * the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations under
14  * the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import static com.google.common.util.concurrent.InterruptionUtil.repeatedlyInterruptTestThread;
20 import static com.google.common.util.concurrent.Uninterruptibles.awaitTerminationUninterruptibly;
21 import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly;
22 import static com.google.common.util.concurrent.Uninterruptibles.joinUninterruptibly;
23 import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly;
24 import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly;
25 import static com.google.common.util.concurrent.Uninterruptibles.tryAcquireUninterruptibly;
26 import static com.google.common.util.concurrent.Uninterruptibles.tryLockUninterruptibly;
27 import static java.util.concurrent.Executors.newFixedThreadPool;
28 import static java.util.concurrent.TimeUnit.MILLISECONDS;
29 import static java.util.concurrent.TimeUnit.SECONDS;
30 
31 import com.google.common.base.Preconditions;
32 import com.google.common.base.Stopwatch;
33 import com.google.common.testing.NullPointerTester;
34 import com.google.common.testing.TearDown;
35 import com.google.common.testing.TearDownStack;
36 import com.google.errorprone.annotations.CanIgnoreReturnValue;
37 import java.util.Date;
38 import java.util.concurrent.ArrayBlockingQueue;
39 import java.util.concurrent.BlockingQueue;
40 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.ExecutorService;
42 import java.util.concurrent.Executors;
43 import java.util.concurrent.Future;
44 import java.util.concurrent.ScheduledExecutorService;
45 import java.util.concurrent.Semaphore;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.locks.Condition;
48 import java.util.concurrent.locks.Lock;
49 import java.util.concurrent.locks.ReentrantLock;
50 import junit.framework.TestCase;
51 
52 /**
53  * Tests for {@link Uninterruptibles}.
54  *
55  * @author Anthony Zana
56  */
57 
58 public class UninterruptiblesTest extends TestCase {
59   private static final String EXPECTED_TAKE = "expectedTake";
60 
61   /** Timeout to use when we don't expect the timeout to expire. */
62   private static final long LONG_DELAY_MS = 2500;
63 
64   private static final long SLEEP_SLACK = 2;
65 
66   private final TearDownStack tearDownStack = new TearDownStack();
67 
68   // NOTE: All durations in these tests are expressed in milliseconds
69   @Override
setUp()70   protected void setUp() {
71     // Clear any previous interrupt before running the test.
72     if (Thread.currentThread().isInterrupted()) {
73       throw new AssertionError(
74           "Thread interrupted on test entry. "
75               + "Some test probably didn't clear the interrupt state");
76     }
77 
78     tearDownStack.addTearDown(
79         new TearDown() {
80           @Override
81           public void tearDown() {
82             Thread.interrupted();
83           }
84         });
85   }
86 
87   @Override
tearDown()88   protected void tearDown() {
89     tearDownStack.runTearDown();
90   }
91 
testNull()92   public void testNull() throws Exception {
93     new NullPointerTester()
94         .setDefault(CountDownLatch.class, new CountDownLatch(0))
95         .setDefault(Semaphore.class, new Semaphore(999))
96         .testAllPublicStaticMethods(Uninterruptibles.class);
97   }
98 
99   // IncrementableCountDownLatch.await() tests
100 
101   // CountDownLatch.await() tests
102 
103   // Condition.await() tests
testConditionAwaitTimeoutExceeded()104   public void testConditionAwaitTimeoutExceeded() {
105     Stopwatch stopwatch = Stopwatch.createStarted();
106     Condition condition = TestCondition.create();
107 
108     boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 500, MILLISECONDS);
109 
110     assertFalse(signaledBeforeTimeout);
111     assertAtLeastTimePassed(stopwatch, 500);
112     assertNotInterrupted();
113   }
114 
testConditionAwaitTimeoutNotExceeded()115   public void testConditionAwaitTimeoutNotExceeded() {
116     Stopwatch stopwatch = Stopwatch.createStarted();
117     Condition condition = TestCondition.createAndSignalAfter(500, MILLISECONDS);
118 
119     boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1500, MILLISECONDS);
120 
121     assertTrue(signaledBeforeTimeout);
122     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
123     assertNotInterrupted();
124   }
125 
testConditionAwaitInterruptedTimeoutExceeded()126   public void testConditionAwaitInterruptedTimeoutExceeded() {
127     Stopwatch stopwatch = Stopwatch.createStarted();
128     Condition condition = TestCondition.create();
129     requestInterruptIn(500);
130 
131     boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1000, MILLISECONDS);
132 
133     assertFalse(signaledBeforeTimeout);
134     assertAtLeastTimePassed(stopwatch, 1000);
135     assertInterrupted();
136   }
137 
testConditionAwaitInterruptedTimeoutNotExceeded()138   public void testConditionAwaitInterruptedTimeoutNotExceeded() {
139     Stopwatch stopwatch = Stopwatch.createStarted();
140     Condition condition = TestCondition.createAndSignalAfter(1000, MILLISECONDS);
141     requestInterruptIn(500);
142 
143     boolean signaledBeforeTimeout = awaitUninterruptibly(condition, 1500, MILLISECONDS);
144 
145     assertTrue(signaledBeforeTimeout);
146     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
147     assertInterrupted();
148   }
149 
150   // Lock.tryLock() tests
testTryLockTimeoutExceeded()151   public void testTryLockTimeoutExceeded() {
152     Stopwatch stopwatch = Stopwatch.createStarted();
153     Lock lock = new ReentrantLock();
154     Thread lockThread = acquireFor(lock, 5, SECONDS);
155 
156     boolean lockAcquired = tryLockUninterruptibly(lock, 500, MILLISECONDS);
157 
158     assertFalse(lockAcquired);
159     assertAtLeastTimePassed(stopwatch, 500);
160     assertNotInterrupted();
161 
162     // finish locking thread
163     lockThread.interrupt();
164   }
165 
testTryLockTimeoutNotExceeded()166   public void testTryLockTimeoutNotExceeded() {
167     Stopwatch stopwatch = Stopwatch.createStarted();
168     Lock lock = new ReentrantLock();
169     acquireFor(lock, 500, MILLISECONDS);
170 
171     boolean signaledBeforeTimeout = tryLockUninterruptibly(lock, 1500, MILLISECONDS);
172 
173     assertTrue(signaledBeforeTimeout);
174     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
175     assertNotInterrupted();
176   }
177 
testTryLockInterruptedTimeoutExceeded()178   public void testTryLockInterruptedTimeoutExceeded() {
179     Stopwatch stopwatch = Stopwatch.createStarted();
180     Lock lock = new ReentrantLock();
181     Thread lockThread = acquireFor(lock, 5, SECONDS);
182     requestInterruptIn(500);
183 
184     boolean signaledBeforeTimeout = tryLockUninterruptibly(lock, 1000, MILLISECONDS);
185 
186     assertFalse(signaledBeforeTimeout);
187     assertAtLeastTimePassed(stopwatch, 1000);
188     assertInterrupted();
189 
190     // finish locking thread
191     lockThread.interrupt();
192   }
193 
testTryLockInterruptedTimeoutNotExceeded()194   public void testTryLockInterruptedTimeoutNotExceeded() {
195     Stopwatch stopwatch = Stopwatch.createStarted();
196     Lock lock = new ReentrantLock();
197     acquireFor(lock, 1000, MILLISECONDS);
198     requestInterruptIn(500);
199 
200     boolean signaledBeforeTimeout = tryLockUninterruptibly(lock, 1500, MILLISECONDS);
201 
202     assertTrue(signaledBeforeTimeout);
203     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
204     assertInterrupted();
205   }
206 
207   // BlockingQueue.put() tests
testPutWithNoWait()208   public void testPutWithNoWait() {
209     Stopwatch stopwatch = Stopwatch.createStarted();
210     BlockingQueue<String> queue = new ArrayBlockingQueue<>(999);
211     putUninterruptibly(queue, "");
212     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
213     assertEquals("", queue.peek());
214   }
215 
testPutNoInterrupt()216   public void testPutNoInterrupt() {
217     TimedPutQueue queue = TimedPutQueue.createWithDelay(20);
218     queue.putSuccessfully();
219     assertNotInterrupted();
220   }
221 
testPutSingleInterrupt()222   public void testPutSingleInterrupt() {
223     TimedPutQueue queue = TimedPutQueue.createWithDelay(50);
224     requestInterruptIn(10);
225     queue.putSuccessfully();
226     assertInterrupted();
227   }
228 
testPutMultiInterrupt()229   public void testPutMultiInterrupt() {
230     TimedPutQueue queue = TimedPutQueue.createWithDelay(100);
231     repeatedlyInterruptTestThread(20, tearDownStack);
232     queue.putSuccessfully();
233     assertInterrupted();
234   }
235 
236   // BlockingQueue.take() tests
testTakeWithNoWait()237   public void testTakeWithNoWait() {
238     Stopwatch stopwatch = Stopwatch.createStarted();
239     BlockingQueue<String> queue = new ArrayBlockingQueue<>(1);
240     assertTrue(queue.offer(""));
241     assertEquals("", takeUninterruptibly(queue));
242     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
243   }
244 
testTakeNoInterrupt()245   public void testTakeNoInterrupt() {
246     TimedTakeQueue queue = TimedTakeQueue.createWithDelay(20);
247     queue.takeSuccessfully();
248     assertNotInterrupted();
249   }
250 
testTakeSingleInterrupt()251   public void testTakeSingleInterrupt() {
252     TimedTakeQueue queue = TimedTakeQueue.createWithDelay(50);
253     requestInterruptIn(10);
254     queue.takeSuccessfully();
255     assertInterrupted();
256   }
257 
testTakeMultiInterrupt()258   public void testTakeMultiInterrupt() {
259     TimedTakeQueue queue = TimedTakeQueue.createWithDelay(100);
260     repeatedlyInterruptTestThread(20, tearDownStack);
261     queue.takeSuccessfully();
262     assertInterrupted();
263   }
264 
265   // join() tests
testJoinWithNoWait()266   public void testJoinWithNoWait() throws InterruptedException {
267     Stopwatch stopwatch = Stopwatch.createStarted();
268     Thread thread = new Thread(new JoinTarget(15));
269     thread.start();
270     thread.join();
271     assertFalse(thread.isAlive());
272 
273     joinUninterruptibly(thread);
274     joinUninterruptibly(thread, 0, MILLISECONDS);
275     joinUninterruptibly(thread, -42, MILLISECONDS);
276     joinUninterruptibly(thread, LONG_DELAY_MS, MILLISECONDS);
277     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
278   }
279 
testJoinNoInterrupt()280   public void testJoinNoInterrupt() {
281     TimedThread thread = TimedThread.createWithDelay(20);
282     thread.joinSuccessfully();
283     assertNotInterrupted();
284   }
285 
testJoinTimeoutNoInterruptNotExpired()286   public void testJoinTimeoutNoInterruptNotExpired() {
287     TimedThread thread = TimedThread.createWithDelay(20);
288     thread.joinSuccessfully(LONG_DELAY_MS);
289     assertNotInterrupted();
290   }
291 
testJoinTimeoutNoInterruptExpired()292   public void testJoinTimeoutNoInterruptExpired() {
293     TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS);
294     thread.joinUnsuccessfully(30);
295     assertNotInterrupted();
296   }
297 
testJoinSingleInterrupt()298   public void testJoinSingleInterrupt() {
299     TimedThread thread = TimedThread.createWithDelay(50);
300     requestInterruptIn(10);
301     thread.joinSuccessfully();
302     assertInterrupted();
303   }
304 
testJoinTimeoutSingleInterruptNoExpire()305   public void testJoinTimeoutSingleInterruptNoExpire() {
306     TimedThread thread = TimedThread.createWithDelay(50);
307     requestInterruptIn(10);
308     thread.joinSuccessfully(LONG_DELAY_MS);
309     assertInterrupted();
310   }
311 
testJoinTimeoutSingleInterruptExpired()312   public void testJoinTimeoutSingleInterruptExpired() {
313     TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS);
314     requestInterruptIn(10);
315     thread.joinUnsuccessfully(50);
316     assertInterrupted();
317   }
318 
testJoinMultiInterrupt()319   public void testJoinMultiInterrupt() {
320     TimedThread thread = TimedThread.createWithDelay(100);
321     repeatedlyInterruptTestThread(20, tearDownStack);
322     thread.joinSuccessfully();
323     assertInterrupted();
324   }
325 
testJoinTimeoutMultiInterruptNoExpire()326   public void testJoinTimeoutMultiInterruptNoExpire() {
327     TimedThread thread = TimedThread.createWithDelay(100);
328     repeatedlyInterruptTestThread(20, tearDownStack);
329     thread.joinSuccessfully(LONG_DELAY_MS);
330     assertInterrupted();
331   }
332 
testJoinTimeoutMultiInterruptExpired()333   public void testJoinTimeoutMultiInterruptExpired() {
334     /*
335      * We don't "need" to schedule a thread completion at all here, but by doing
336      * so, we come the closest we can to testing that the wait time is
337      * appropriately decreased on each progressive join() call.
338      */
339     TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS);
340     repeatedlyInterruptTestThread(20, tearDownStack);
341     thread.joinUnsuccessfully(70);
342     assertInterrupted();
343   }
344 
345   // sleep() Tests
testSleepNoInterrupt()346   public void testSleepNoInterrupt() {
347     sleepSuccessfully(10);
348   }
349 
testSleepSingleInterrupt()350   public void testSleepSingleInterrupt() {
351     requestInterruptIn(10);
352     sleepSuccessfully(50);
353     assertInterrupted();
354   }
355 
testSleepMultiInterrupt()356   public void testSleepMultiInterrupt() {
357     repeatedlyInterruptTestThread(10, tearDownStack);
358     sleepSuccessfully(100);
359     assertInterrupted();
360   }
361 
362   // Semaphore.tryAcquire() tests
testTryAcquireWithNoWait()363   public void testTryAcquireWithNoWait() {
364     Stopwatch stopwatch = Stopwatch.createStarted();
365     Semaphore semaphore = new Semaphore(99);
366     assertTrue(tryAcquireUninterruptibly(semaphore, 0, MILLISECONDS));
367     assertTrue(tryAcquireUninterruptibly(semaphore, -42, MILLISECONDS));
368     assertTrue(tryAcquireUninterruptibly(semaphore, LONG_DELAY_MS, MILLISECONDS));
369     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
370   }
371 
testTryAcquireTimeoutNoInterruptNotExpired()372   public void testTryAcquireTimeoutNoInterruptNotExpired() {
373     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(20);
374     semaphore.tryAcquireSuccessfully(LONG_DELAY_MS);
375     assertNotInterrupted();
376   }
377 
testTryAcquireTimeoutNoInterruptExpired()378   public void testTryAcquireTimeoutNoInterruptExpired() {
379     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
380     semaphore.tryAcquireUnsuccessfully(30);
381     assertNotInterrupted();
382   }
383 
testTryAcquireTimeoutSingleInterruptNoExpire()384   public void testTryAcquireTimeoutSingleInterruptNoExpire() {
385     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(50);
386     requestInterruptIn(10);
387     semaphore.tryAcquireSuccessfully(LONG_DELAY_MS);
388     assertInterrupted();
389   }
390 
testTryAcquireTimeoutSingleInterruptExpired()391   public void testTryAcquireTimeoutSingleInterruptExpired() {
392     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
393     requestInterruptIn(10);
394     semaphore.tryAcquireUnsuccessfully(50);
395     assertInterrupted();
396   }
397 
testTryAcquireTimeoutMultiInterruptNoExpire()398   public void testTryAcquireTimeoutMultiInterruptNoExpire() {
399     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(100);
400     repeatedlyInterruptTestThread(20, tearDownStack);
401     semaphore.tryAcquireSuccessfully(LONG_DELAY_MS);
402     assertInterrupted();
403   }
404 
testTryAcquireTimeoutMultiInterruptExpired()405   public void testTryAcquireTimeoutMultiInterruptExpired() {
406     /*
407      * We don't "need" to schedule a release() call at all here, but by doing
408      * so, we come the closest we can to testing that the wait time is
409      * appropriately decreased on each progressive tryAcquire() call.
410      */
411     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
412     repeatedlyInterruptTestThread(20, tearDownStack);
413     semaphore.tryAcquireUnsuccessfully(70);
414     assertInterrupted();
415   }
416 
testTryAcquireWithNoWaitMultiPermit()417   public void testTryAcquireWithNoWaitMultiPermit() {
418     Stopwatch stopwatch = Stopwatch.createStarted();
419     Semaphore semaphore = new Semaphore(99);
420     assertTrue(tryAcquireUninterruptibly(semaphore, 10, 0, MILLISECONDS));
421     assertTrue(tryAcquireUninterruptibly(semaphore, 10, -42, MILLISECONDS));
422     assertTrue(tryAcquireUninterruptibly(semaphore, 10, LONG_DELAY_MS, MILLISECONDS));
423     assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
424   }
425 
testTryAcquireTimeoutNoInterruptNotExpiredMultiPermit()426   public void testTryAcquireTimeoutNoInterruptNotExpiredMultiPermit() {
427     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(20);
428     semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS);
429     assertNotInterrupted();
430   }
431 
testTryAcquireTimeoutNoInterruptExpiredMultiPermit()432   public void testTryAcquireTimeoutNoInterruptExpiredMultiPermit() {
433     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
434     semaphore.tryAcquireUnsuccessfully(10, 30);
435     assertNotInterrupted();
436   }
437 
testTryAcquireTimeoutSingleInterruptNoExpireMultiPermit()438   public void testTryAcquireTimeoutSingleInterruptNoExpireMultiPermit() {
439     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(50);
440     requestInterruptIn(10);
441     semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS);
442     assertInterrupted();
443   }
444 
testTryAcquireTimeoutSingleInterruptExpiredMultiPermit()445   public void testTryAcquireTimeoutSingleInterruptExpiredMultiPermit() {
446     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
447     requestInterruptIn(10);
448     semaphore.tryAcquireUnsuccessfully(10, 50);
449     assertInterrupted();
450   }
451 
testTryAcquireTimeoutMultiInterruptNoExpireMultiPermit()452   public void testTryAcquireTimeoutMultiInterruptNoExpireMultiPermit() {
453     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(100);
454     repeatedlyInterruptTestThread(20, tearDownStack);
455     semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS);
456     assertInterrupted();
457   }
458 
testTryAcquireTimeoutMultiInterruptExpiredMultiPermit()459   public void testTryAcquireTimeoutMultiInterruptExpiredMultiPermit() {
460     /*
461      * We don't "need" to schedule a release() call at all here, but by doing
462      * so, we come the closest we can to testing that the wait time is
463      * appropriately decreased on each progressive tryAcquire() call.
464      */
465     TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
466     repeatedlyInterruptTestThread(20, tearDownStack);
467     semaphore.tryAcquireUnsuccessfully(10, 70);
468     assertInterrupted();
469   }
470 
471   // executor.awaitTermination Testcases
testTryAwaitTerminationUninterruptiblyLongTimeUnit_success()472   public void testTryAwaitTerminationUninterruptiblyLongTimeUnit_success() {
473     ExecutorService executor = newFixedThreadPool(1);
474     requestInterruptIn(500);
475     executor.execute(new SleepTask(1000));
476     executor.shutdown();
477     assertTrue(awaitTerminationUninterruptibly(executor, LONG_DELAY_MS, MILLISECONDS));
478     assertTrue(executor.isTerminated());
479     assertInterrupted();
480   }
481 
testTryAwaitTerminationUninterruptiblyLongTimeUnit_failure()482   public void testTryAwaitTerminationUninterruptiblyLongTimeUnit_failure() {
483     ExecutorService executor = newFixedThreadPool(1);
484     requestInterruptIn(500);
485     executor.execute(new SleepTask(10000));
486     executor.shutdown();
487     assertFalse(awaitTerminationUninterruptibly(executor, 1000, MILLISECONDS));
488     assertFalse(executor.isTerminated());
489     assertInterrupted();
490   }
491 
testTryAwaitTerminationInfiniteTimeout()492   public void testTryAwaitTerminationInfiniteTimeout() {
493     ExecutorService executor = newFixedThreadPool(1);
494     requestInterruptIn(500);
495     executor.execute(new SleepTask(1000));
496     executor.shutdown();
497     awaitTerminationUninterruptibly(executor);
498     assertTrue(executor.isTerminated());
499     assertInterrupted();
500   }
501 
502   /**
503    * Wrapper around {@link Stopwatch} which also contains an "expected completion time." Creating a
504    * {@code Completion} starts the underlying stopwatch.
505    */
506   private static final class Completion {
507     final Stopwatch stopwatch;
508     final long expectedCompletionWaitMillis;
509 
Completion(long expectedCompletionWaitMillis)510     Completion(long expectedCompletionWaitMillis) {
511       this.expectedCompletionWaitMillis = expectedCompletionWaitMillis;
512       stopwatch = Stopwatch.createStarted();
513     }
514 
515     /**
516      * Asserts that the expected completion time has passed (and not "too much" time beyond that).
517      */
assertCompletionExpected()518     void assertCompletionExpected() {
519       assertAtLeastTimePassed(stopwatch, expectedCompletionWaitMillis);
520       assertTimeNotPassed(stopwatch, expectedCompletionWaitMillis + LONG_DELAY_MS);
521     }
522 
523     /**
524      * Asserts that at least {@code timeout} has passed but the expected completion time has not.
525      */
assertCompletionNotExpected(long timeout)526     void assertCompletionNotExpected(long timeout) {
527       Preconditions.checkArgument(timeout < expectedCompletionWaitMillis);
528       assertAtLeastTimePassed(stopwatch, timeout);
529       assertTimeNotPassed(stopwatch, expectedCompletionWaitMillis);
530     }
531   }
532 
533   private static void assertAtLeastTimePassed(Stopwatch stopwatch, long expectedMillis) {
534     long elapsedMillis = stopwatch.elapsed(MILLISECONDS);
535     /*
536      * The "+ 5" below is to permit, say, sleep(10) to sleep only 9 milliseconds. We see such
537      * behavior sometimes when running these tests publicly as part of Guava. "+ 5" is probably more
538      * generous than it needs to be.
539      */
540     assertTrue(
541         "Expected elapsed millis to be >= " + expectedMillis + " but was " + elapsedMillis,
542         elapsedMillis + 5 >= expectedMillis);
543   }
544 
545   // TODO(cpovirk): Split this into separate CountDownLatch and IncrementableCountDownLatch classes.
546 
547   /** Manages a {@link BlockingQueue} and associated timings for a {@code put} call. */
548   private static final class TimedPutQueue {
549     final BlockingQueue<String> queue;
550     final Completion completed;
551 
552     /**
553      * Creates a {@link EnableWrites} which open up a spot for a {@code put} to succeed in {@code
554      * countdownInMillis}.
555      */
createWithDelay(long countdownInMillis)556     static TimedPutQueue createWithDelay(long countdownInMillis) {
557       return new TimedPutQueue(countdownInMillis);
558     }
559 
TimedPutQueue(long countdownInMillis)560     private TimedPutQueue(long countdownInMillis) {
561       this.queue = new ArrayBlockingQueue<>(1);
562       assertTrue(queue.offer("blocksPutCallsUntilRemoved"));
563       this.completed = new Completion(countdownInMillis);
564       scheduleEnableWrites(this.queue, countdownInMillis);
565     }
566 
567     /** Perform a {@code put} and assert that operation completed in the expected timeframe. */
putSuccessfully()568     void putSuccessfully() {
569       putUninterruptibly(queue, "");
570       completed.assertCompletionExpected();
571       assertEquals("", queue.peek());
572     }
573 
scheduleEnableWrites(BlockingQueue<String> queue, long countdownInMillis)574     private static void scheduleEnableWrites(BlockingQueue<String> queue, long countdownInMillis) {
575       Runnable toRun = new EnableWrites(queue, countdownInMillis);
576       // TODO(cpovirk): automatically fail the test if this thread throws
577       Thread enablerThread = new Thread(toRun);
578       enablerThread.start();
579     }
580   }
581 
582   /** Manages a {@link BlockingQueue} and associated timings for a {@code take} call. */
583   private static final class TimedTakeQueue {
584     final BlockingQueue<String> queue;
585     final Completion completed;
586 
587     /**
588      * Creates a {@link EnableReads} which insert an element for a {@code take} to receive in {@code
589      * countdownInMillis}.
590      */
createWithDelay(long countdownInMillis)591     static TimedTakeQueue createWithDelay(long countdownInMillis) {
592       return new TimedTakeQueue(countdownInMillis);
593     }
594 
TimedTakeQueue(long countdownInMillis)595     private TimedTakeQueue(long countdownInMillis) {
596       this.queue = new ArrayBlockingQueue<>(1);
597       this.completed = new Completion(countdownInMillis);
598       scheduleEnableReads(this.queue, countdownInMillis);
599     }
600 
601     /** Perform a {@code take} and assert that operation completed in the expected timeframe. */
takeSuccessfully()602     void takeSuccessfully() {
603       assertEquals(EXPECTED_TAKE, takeUninterruptibly(queue));
604       completed.assertCompletionExpected();
605       assertTrue(queue.isEmpty());
606     }
607 
scheduleEnableReads(BlockingQueue<String> queue, long countdownInMillis)608     private static void scheduleEnableReads(BlockingQueue<String> queue, long countdownInMillis) {
609       Runnable toRun = new EnableReads(queue, countdownInMillis);
610       // TODO(cpovirk): automatically fail the test if this thread throws
611       Thread enablerThread = new Thread(toRun);
612       enablerThread.start();
613     }
614   }
615 
616   /** Manages a {@link Semaphore} and associated timings. */
617   private static final class TimedSemaphore {
618     final Semaphore semaphore;
619     final Completion completed;
620 
621     /**
622      * Create a {@link Release} which will release a semaphore permit in {@code countdownInMillis}.
623      */
createWithDelay(long countdownInMillis)624     static TimedSemaphore createWithDelay(long countdownInMillis) {
625       return new TimedSemaphore(countdownInMillis);
626     }
627 
TimedSemaphore(long countdownInMillis)628     private TimedSemaphore(long countdownInMillis) {
629       this.semaphore = new Semaphore(0);
630       this.completed = new Completion(countdownInMillis);
631       scheduleRelease(countdownInMillis);
632     }
633 
634     /**
635      * Requests a permit from the semaphore with a timeout and asserts that operation completed in
636      * the expected timeframe.
637      */
tryAcquireSuccessfully(long timeoutMillis)638     void tryAcquireSuccessfully(long timeoutMillis) {
639       assertTrue(tryAcquireUninterruptibly(semaphore, timeoutMillis, MILLISECONDS));
640       completed.assertCompletionExpected();
641     }
642 
tryAcquireSuccessfully(int permits, long timeoutMillis)643     void tryAcquireSuccessfully(int permits, long timeoutMillis) {
644       assertTrue(tryAcquireUninterruptibly(semaphore, permits, timeoutMillis, MILLISECONDS));
645       completed.assertCompletionExpected();
646     }
647 
648     /**
649      * Requests a permit from the semaphore with a timeout and asserts that the wait returned within
650      * the expected timeout.
651      */
tryAcquireUnsuccessfully(long timeoutMillis)652     private void tryAcquireUnsuccessfully(long timeoutMillis) {
653       assertFalse(tryAcquireUninterruptibly(semaphore, timeoutMillis, MILLISECONDS));
654       completed.assertCompletionNotExpected(timeoutMillis);
655     }
656 
tryAcquireUnsuccessfully(int permits, long timeoutMillis)657     private void tryAcquireUnsuccessfully(int permits, long timeoutMillis) {
658       assertFalse(tryAcquireUninterruptibly(semaphore, permits, timeoutMillis, MILLISECONDS));
659       completed.assertCompletionNotExpected(timeoutMillis);
660     }
661 
scheduleRelease(long countdownInMillis)662     private void scheduleRelease(long countdownInMillis) {
663       DelayedActionRunnable toRun = new Release(semaphore, countdownInMillis);
664       // TODO(cpovirk): automatically fail the test if this thread throws
665       Thread releaserThread = new Thread(toRun);
666       releaserThread.start();
667     }
668   }
669 
670   private abstract static class DelayedActionRunnable implements Runnable {
671     private final long tMinus;
672 
DelayedActionRunnable(long tMinus)673     protected DelayedActionRunnable(long tMinus) {
674       this.tMinus = tMinus;
675     }
676 
677     @Override
run()678     public final void run() {
679       try {
680         Thread.sleep(tMinus);
681       } catch (InterruptedException e) {
682         throw new AssertionError(e);
683       }
684       doAction();
685     }
686 
doAction()687     protected abstract void doAction();
688   }
689 
690   private static class CountDown extends DelayedActionRunnable {
691     private final CountDownLatch latch;
692 
CountDown(CountDownLatch latch, long tMinus)693     public CountDown(CountDownLatch latch, long tMinus) {
694       super(tMinus);
695       this.latch = latch;
696     }
697 
698     @Override
doAction()699     protected void doAction() {
700       latch.countDown();
701     }
702   }
703 
704   private static class EnableWrites extends DelayedActionRunnable {
705     private final BlockingQueue<String> queue;
706 
EnableWrites(BlockingQueue<String> queue, long tMinus)707     public EnableWrites(BlockingQueue<String> queue, long tMinus) {
708       super(tMinus);
709       assertFalse(queue.isEmpty());
710       assertFalse(queue.offer("shouldBeRejected"));
711       this.queue = queue;
712     }
713 
714     @Override
doAction()715     protected void doAction() {
716       assertNotNull(queue.remove());
717     }
718   }
719 
720   private static class EnableReads extends DelayedActionRunnable {
721     private final BlockingQueue<String> queue;
722 
EnableReads(BlockingQueue<String> queue, long tMinus)723     public EnableReads(BlockingQueue<String> queue, long tMinus) {
724       super(tMinus);
725       assertTrue(queue.isEmpty());
726       this.queue = queue;
727     }
728 
729     @Override
doAction()730     protected void doAction() {
731       assertTrue(queue.offer(EXPECTED_TAKE));
732     }
733   }
734 
735   private static final class TimedThread {
736     private final Thread thread;
737     private final Completion completed;
738 
createWithDelay(long countdownInMillis)739     static TimedThread createWithDelay(long countdownInMillis) {
740       return new TimedThread(countdownInMillis);
741     }
742 
TimedThread(long expectedCompletionWaitMillis)743     private TimedThread(long expectedCompletionWaitMillis) {
744       completed = new Completion(expectedCompletionWaitMillis);
745       thread = new Thread(new JoinTarget(expectedCompletionWaitMillis));
746       thread.start();
747     }
748 
joinSuccessfully()749     void joinSuccessfully() {
750       Uninterruptibles.joinUninterruptibly(thread);
751       completed.assertCompletionExpected();
752       assertEquals(Thread.State.TERMINATED, thread.getState());
753     }
754 
joinSuccessfully(long timeoutMillis)755     void joinSuccessfully(long timeoutMillis) {
756       Uninterruptibles.joinUninterruptibly(thread, timeoutMillis, MILLISECONDS);
757       completed.assertCompletionExpected();
758       assertEquals(Thread.State.TERMINATED, thread.getState());
759     }
760 
joinUnsuccessfully(long timeoutMillis)761     void joinUnsuccessfully(long timeoutMillis) {
762       Uninterruptibles.joinUninterruptibly(thread, timeoutMillis, MILLISECONDS);
763       completed.assertCompletionNotExpected(timeoutMillis);
764       assertFalse(Thread.State.TERMINATED.equals(thread.getState()));
765     }
766   }
767 
768   private static class JoinTarget extends DelayedActionRunnable {
JoinTarget(long tMinus)769     public JoinTarget(long tMinus) {
770       super(tMinus);
771     }
772 
773     @Override
doAction()774     protected void doAction() {}
775   }
776 
777   private static class Release extends DelayedActionRunnable {
778     private final Semaphore semaphore;
779 
Release(Semaphore semaphore, long tMinus)780     public Release(Semaphore semaphore, long tMinus) {
781       super(tMinus);
782       this.semaphore = semaphore;
783     }
784 
785     @Override
doAction()786     protected void doAction() {
787       semaphore.release(10);
788     }
789   }
790 
791   private static final class SleepTask extends DelayedActionRunnable {
SleepTask(long tMinus)792     SleepTask(long tMinus) {
793       super(tMinus);
794     }
795 
796     @Override
doAction()797     protected void doAction() {}
798   }
799 
sleepSuccessfully(long sleepMillis)800   private static void sleepSuccessfully(long sleepMillis) {
801     Completion completed = new Completion(sleepMillis - SLEEP_SLACK);
802     Uninterruptibles.sleepUninterruptibly(sleepMillis, MILLISECONDS);
803     completed.assertCompletionExpected();
804   }
805 
assertTimeNotPassed(Stopwatch stopwatch, long timelimitMillis)806   private static void assertTimeNotPassed(Stopwatch stopwatch, long timelimitMillis) {
807     long elapsedMillis = stopwatch.elapsed(MILLISECONDS);
808     assertTrue(elapsedMillis < timelimitMillis);
809   }
810 
811   /**
812    * Await an interrupt, then clear the interrupt status. Similar to {@code
813    * assertTrue(Thread.interrupted())} except that this version tolerates late interrupts.
814    */
815   private static void assertInterrupted() {
816     try {
817       /*
818        * The sleep() will end immediately if we've already been interrupted or
819        * wait patiently for the interrupt if not.
820        */
821       Thread.sleep(LONG_DELAY_MS);
822       fail("Dude, where's my interrupt?");
823     } catch (InterruptedException expected) {
824     }
825   }
826 
827   private static void assertNotInterrupted() {
828     assertFalse(Thread.interrupted());
829   }
830 
831   private static void requestInterruptIn(long millis) {
832     InterruptionUtil.requestInterruptIn(millis, MILLISECONDS);
833   }
834 
835   @CanIgnoreReturnValue
836   private static Thread acquireFor(final Lock lock, final long duration, final TimeUnit unit) {
837     final CountDownLatch latch = new CountDownLatch(1);
838     Thread thread =
839         new Thread() {
840           @Override
841           public void run() {
842             lock.lock();
843             latch.countDown();
844             try {
845               Thread.sleep(unit.toMillis(duration));
846             } catch (InterruptedException e) {
847               // simply finish execution
848             } finally {
849               lock.unlock();
850             }
851           }
852         };
853     thread.setDaemon(true);
854     thread.start();
855     awaitUninterruptibly(latch);
856     return thread;
857   }
858 
859   private static class TestCondition implements Condition {
860     private final Lock lock;
861     private final Condition condition;
862 
863     private TestCondition(Lock lock, Condition condition) {
864       this.lock = lock;
865       this.condition = condition;
866     }
867 
868     static TestCondition createAndSignalAfter(long delay, TimeUnit unit) {
869       final TestCondition testCondition = create();
870 
871       ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(1);
872       // If signal() fails somehow, we should see a failed test, even without looking at the Future.
873       Future<?> unused =
874           scheduledPool.schedule(
875               new Runnable() {
876                 @Override
877                 public void run() {
878                   testCondition.signal();
879                 }
880               },
881               delay,
882               unit);
883 
884       return testCondition;
885     }
886 
887     static TestCondition create() {
888       Lock lock = new ReentrantLock();
889       Condition condition = lock.newCondition();
890       return new TestCondition(lock, condition);
891     }
892 
893     @Override
894     public void await() throws InterruptedException {
895       lock.lock();
896       try {
897         condition.await();
898       } finally {
899         lock.unlock();
900       }
901     }
902 
903     @Override
904     public boolean await(long time, TimeUnit unit) throws InterruptedException {
905       lock.lock();
906       try {
907         return condition.await(time, unit);
908       } finally {
909         lock.unlock();
910       }
911     }
912 
913     @Override
914     public void awaitUninterruptibly() {
915       lock.lock();
916       try {
917         condition.awaitUninterruptibly();
918       } finally {
919         lock.unlock();
920       }
921     }
922 
923     @Override
924     public long awaitNanos(long nanosTimeout) throws InterruptedException {
925       lock.lock();
926       try {
927         return condition.awaitNanos(nanosTimeout);
928       } finally {
929         lock.unlock();
930       }
931     }
932 
933     @Override
934     public boolean awaitUntil(Date deadline) throws InterruptedException {
935       lock.lock();
936       try {
937         return condition.awaitUntil(deadline);
938       } finally {
939         lock.unlock();
940       }
941     }
942 
943     @Override
944     public void signal() {
945       lock.lock();
946       try {
947         condition.signal();
948       } finally {
949         lock.unlock();
950       }
951     }
952 
953     @Override
954     public void signalAll() {
955       lock.lock();
956       try {
957         condition.signalAll();
958       } finally {
959         lock.unlock();
960       }
961     }
962   }
963 }
964