/* * Copyright (C) 2008 The Guava Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.google.common.util.concurrent; import static com.google.common.base.Functions.constant; import static com.google.common.base.Functions.identity; import static com.google.common.base.Throwables.propagateIfInstanceOf; import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.collect.Lists.newArrayList; import static com.google.common.collect.Sets.intersection; import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertWithMessage; import static com.google.common.util.concurrent.Futures.allAsList; import static com.google.common.util.concurrent.Futures.catching; import static com.google.common.util.concurrent.Futures.catchingAsync; import static com.google.common.util.concurrent.Futures.getDone; import static com.google.common.util.concurrent.Futures.immediateCancelledFuture; import static com.google.common.util.concurrent.Futures.immediateFailedFuture; import static com.google.common.util.concurrent.Futures.immediateFuture; import static com.google.common.util.concurrent.Futures.immediateVoidFuture; import static com.google.common.util.concurrent.Futures.inCompletionOrder; import static com.google.common.util.concurrent.Futures.lazyTransform; import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; import static com.google.common.util.concurrent.Futures.scheduleAsync; import static com.google.common.util.concurrent.Futures.submit; import static com.google.common.util.concurrent.Futures.submitAsync; import static com.google.common.util.concurrent.Futures.successfulAsList; import static com.google.common.util.concurrent.Futures.transform; import static com.google.common.util.concurrent.Futures.transformAsync; import static com.google.common.util.concurrent.Futures.whenAllComplete; import static com.google.common.util.concurrent.Futures.whenAllSucceed; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static com.google.common.util.concurrent.TestPlatform.clearInterrupt; import static com.google.common.util.concurrent.TestPlatform.getDoneFromTimeoutOverload; import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly; import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; import static com.google.common.util.concurrent.testing.TestingExecutors.noOpScheduledExecutor; import static java.util.Arrays.asList; import static java.util.concurrent.Executors.newSingleThreadExecutor; import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import com.google.common.annotations.GwtCompatible; import com.google.common.annotations.GwtIncompatible; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.testing.ClassSanityTester; import com.google.common.testing.GcFinalization; import com.google.common.testing.TestLogHandler; import com.google.errorprone.annotations.CanIgnoreReturnValue; import java.io.FileNotFoundException; import java.io.IOException; import java.lang.ref.WeakReference; import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.LogRecord; import java.util.logging.Logger; import junit.framework.AssertionFailedError; import junit.framework.TestCase; import org.checkerframework.checker.nullness.qual.Nullable; /** * Unit tests for {@link Futures}. * * @author Nishant Thakkar */ @GwtCompatible(emulated = true) public class FuturesTest extends TestCase { private static final Logger aggregateFutureLogger = Logger.getLogger(AggregateFuture.class.getName()); private final TestLogHandler aggregateFutureLogHandler = new TestLogHandler(); private static final String DATA1 = "data"; private static final String DATA2 = "more data"; private static final String DATA3 = "most data"; @Override public void setUp() throws Exception { super.setUp(); aggregateFutureLogger.addHandler(aggregateFutureLogHandler); } @Override public void tearDown() throws Exception { /* * Clear interrupt for future tests. * * (Ideally we would perform interrupts only in threads that we create, but * it's hard to imagine that anything will break in practice.) */ clearInterrupt(); aggregateFutureLogger.removeHandler(aggregateFutureLogHandler); super.tearDown(); } /* * TODO(cpovirk): Use FutureSubject once it's part of core Truth. But be wary of using it when I'm * really testing a Future implementation (e.g., in the case of immediate*Future()). But it's OK * to use in the case of the majority of Futures that are AbstractFutures. */ public void testImmediateFuture() throws Exception { ListenableFuture future = immediateFuture(DATA1); assertSame(DATA1, getDone(future)); assertSame(DATA1, getDoneFromTimeoutOverload(future)); assertThat(future.toString()).contains("[status=SUCCESS, result=[" + DATA1 + "]]"); } public void testImmediateVoidFuture() throws Exception { ListenableFuture voidFuture = immediateVoidFuture(); assertThat(getDone(voidFuture)).isNull(); assertThat(getDoneFromTimeoutOverload(voidFuture)).isNull(); assertThat(voidFuture.toString()).contains("[status=SUCCESS, result=[null]]"); } public void testImmediateFailedFuture() throws Exception { Exception exception = new Exception(); ListenableFuture future = immediateFailedFuture(exception); assertThat(future.toString()).endsWith("[status=FAILURE, cause=[" + exception + "]]"); try { getDone(future); fail(); } catch (ExecutionException expected) { assertSame(exception, expected.getCause()); } try { getDoneFromTimeoutOverload(future); fail(); } catch (ExecutionException expected) { assertSame(exception, expected.getCause()); } } public void testImmediateFailedFuture_cancellationException() throws Exception { CancellationException exception = new CancellationException(); ListenableFuture future = immediateFailedFuture(exception); assertFalse(future.isCancelled()); assertThat(future.toString()).endsWith("[status=FAILURE, cause=[" + exception + "]]"); try { getDone(future); fail(); } catch (ExecutionException expected) { assertSame(exception, expected.getCause()); } try { getDoneFromTimeoutOverload(future); fail(); } catch (ExecutionException expected) { assertSame(exception, expected.getCause()); } } public void testImmediateCancelledFutureBasic() throws Exception { ListenableFuture future = CallerClass1.makeImmediateCancelledFuture(); assertTrue(future.isCancelled()); } @GwtIncompatible public void testImmediateCancelledFutureStack() throws Exception { ListenableFuture future = CallerClass1.makeImmediateCancelledFuture(); assertTrue(future.isCancelled()); try { CallerClass2.get(future); fail(); } catch (CancellationException expected) { // There should be two CancellationException chained together. The outer one should have the // stack trace of where the get() call was made, and the inner should have the stack trace of // where the immediateCancelledFuture() call was made. List stackTrace = ImmutableList.copyOf(expected.getStackTrace()); assertFalse(Iterables.any(stackTrace, hasClassName(CallerClass1.class))); assertTrue(Iterables.any(stackTrace, hasClassName(CallerClass2.class))); // See AbstractFutureCancellationCauseTest for how to set causes. assertThat(expected.getCause()).isNull(); } } @GwtIncompatible // used only in GwtIncompatible tests private static Predicate hasClassName(final Class clazz) { return new Predicate() { @Override public boolean apply(StackTraceElement element) { return element.getClassName().equals(clazz.getName()); } }; } private static final class CallerClass1 { static ListenableFuture makeImmediateCancelledFuture() { return immediateCancelledFuture(); } } private static final class CallerClass2 { @CanIgnoreReturnValue static V get(ListenableFuture future) throws ExecutionException, InterruptedException { return getDone(future); } } private static class MyException extends Exception {} // Class hierarchy for generics sanity checks private static class Foo {} private static class FooChild extends Foo {} private static class Bar {} private static class BarChild extends Bar {} public void testTransform_genericsNull() throws Exception { ListenableFuture nullFuture = immediateFuture(null); ListenableFuture transformedFuture = transform(nullFuture, constant(null), directExecutor()); assertNull(getDone(transformedFuture)); } public void testTransform_genericsHierarchy() throws Exception { ListenableFuture future = immediateFuture(null); final BarChild barChild = new BarChild(); Function function = new Function() { @Override public BarChild apply(Foo unused) { return barChild; } }; Bar bar = getDone(transform(future, function, directExecutor())); assertSame(barChild, bar); } /* * Android does not handle this stack overflow gracefully... though somehow some other * stack-overflow tests work. It must depend on the exact place the error occurs. */ @AndroidIncompatible @GwtIncompatible // StackOverflowError public void testTransform_StackOverflow() throws Exception { { /* * Initialize all relevant classes before running the test, which may otherwise poison any * classes it is trying to load during its stack overflow. */ SettableFuture root = SettableFuture.create(); ListenableFuture unused = transform(root, identity(), directExecutor()); root.set("foo"); } SettableFuture root = SettableFuture.create(); ListenableFuture output = root; for (int i = 0; i < 10000; i++) { output = transform(output, identity(), directExecutor()); } try { root.set("foo"); fail(); } catch (StackOverflowError expected) { } } public void testTransform_ErrorAfterCancellation() throws Exception { class Transformer implements Function { ListenableFuture output; @Override public Object apply(Object input) { output.cancel(false); throw new MyError(); } } Transformer transformer = new Transformer(); SettableFuture input = SettableFuture.create(); ListenableFuture output = transform(input, transformer, directExecutor()); transformer.output = output; input.set("foo"); assertTrue(output.isCancelled()); } public void testTransform_ExceptionAfterCancellation() throws Exception { class Transformer implements Function { ListenableFuture output; @Override public Object apply(Object input) { output.cancel(false); throw new MyRuntimeException(); } } Transformer transformer = new Transformer(); SettableFuture input = SettableFuture.create(); ListenableFuture output = transform(input, transformer, directExecutor()); transformer.output = output; input.set("foo"); assertTrue(output.isCancelled()); } public void testTransform_getThrowsRuntimeException() throws Exception { ListenableFuture input = UncheckedThrowingFuture.throwingRuntimeException(new MyRuntimeException()); ListenableFuture output = transform(input, identity(), directExecutor()); try { getDone(output); fail(); } catch (ExecutionException expected) { assertThat(expected.getCause()).isInstanceOf(MyRuntimeException.class); } } public void testTransform_getThrowsError() throws Exception { ListenableFuture input = UncheckedThrowingFuture.throwingError(new MyError()); ListenableFuture output = transform(input, identity(), directExecutor()); try { getDone(output); fail(); } catch (ExecutionException expected) { assertThat(expected.getCause()).isInstanceOf(MyError.class); } } public void testTransform_listenerThrowsError() throws Exception { SettableFuture input = SettableFuture.create(); ListenableFuture output = transform(input, identity(), directExecutor()); output.addListener( new Runnable() { @Override public void run() { throw new MyError(); } }, directExecutor()); try { input.set("foo"); fail(); } catch (MyError expected) { } } public void testTransformAsync_cancelPropagatesToInput() throws Exception { SettableFuture input = SettableFuture.create(); AsyncFunction function = new AsyncFunction() { @Override public ListenableFuture apply(Foo unused) { throw new AssertionFailedError("Unexpeted call to apply."); } }; assertTrue(transformAsync(input, function, directExecutor()).cancel(false)); assertTrue(input.isCancelled()); assertFalse(input.wasInterrupted()); } public void testTransformAsync_interruptPropagatesToInput() throws Exception { SettableFuture input = SettableFuture.create(); AsyncFunction function = new AsyncFunction() { @Override public ListenableFuture apply(Foo unused) { throw new AssertionFailedError("Unexpeted call to apply."); } }; assertTrue(transformAsync(input, function, directExecutor()).cancel(true)); assertTrue(input.isCancelled()); assertTrue(input.wasInterrupted()); } @GwtIncompatible // threads public void testTransformAsync_interruptPropagatesToTransformingThread() throws Exception { SettableFuture input = SettableFuture.create(); final CountDownLatch inFunction = new CountDownLatch(1); final CountDownLatch shouldCompleteFunction = new CountDownLatch(1); final CountDownLatch gotException = new CountDownLatch(1); AsyncFunction function = new AsyncFunction() { @Override public ListenableFuture apply(String s) throws Exception { inFunction.countDown(); try { shouldCompleteFunction.await(); } catch (InterruptedException expected) { gotException.countDown(); throw expected; } return immediateFuture("a"); } }; ListenableFuture futureResult = transformAsync(input, function, newSingleThreadExecutor()); input.set("value"); inFunction.await(); futureResult.cancel(true); shouldCompleteFunction.countDown(); try { futureResult.get(); fail(); } catch (CancellationException expected) { } // TODO(cpovirk): implement interruption, updating this test: // https://github.com/google/guava/issues/1989 assertEquals(1, gotException.getCount()); // gotException.await(); } public void testTransformAsync_cancelPropagatesToAsyncOutput() throws Exception { ListenableFuture immediate = immediateFuture(new Foo()); final SettableFuture secondary = SettableFuture.create(); AsyncFunction function = new AsyncFunction() { @Override public ListenableFuture apply(Foo unused) { return secondary; } }; assertTrue(transformAsync(immediate, function, directExecutor()).cancel(false)); assertTrue(secondary.isCancelled()); assertFalse(secondary.wasInterrupted()); } public void testTransformAsync_interruptPropagatesToAsyncOutput() throws Exception { ListenableFuture immediate = immediateFuture(new Foo()); final SettableFuture secondary = SettableFuture.create(); AsyncFunction function = new AsyncFunction() { @Override public ListenableFuture apply(Foo unused) { return secondary; } }; assertTrue(transformAsync(immediate, function, directExecutor()).cancel(true)); assertTrue(secondary.isCancelled()); assertTrue(secondary.wasInterrupted()); } public void testTransformAsync_inputCancelButNotInterruptPropagatesToOutput() throws Exception { SettableFuture f1 = SettableFuture.create(); final SettableFuture secondary = SettableFuture.create(); AsyncFunction function = new AsyncFunction() { @Override public ListenableFuture apply(Foo unused) { return secondary; } }; ListenableFuture f2 = transformAsync(f1, function, directExecutor()); f1.cancel(true); assertTrue(f2.isCancelled()); /* * We might like to propagate interruption, too, but it's not clear that it matters. For now, we * test for the behavior that we have today. */ assertFalse(((AbstractFuture) f2).wasInterrupted()); } /* * Android does not handle this stack overflow gracefully... though somehow some other * stack-overflow tests work. It must depend on the exact place the error occurs. */ @AndroidIncompatible @GwtIncompatible // StackOverflowError public void testTransformAsync_StackOverflow() throws Exception { { /* * Initialize all relevant classes before running the test, which may otherwise poison any * classes it is trying to load during its stack overflow. */ SettableFuture root = SettableFuture.create(); ListenableFuture unused = transformAsync(root, asyncIdentity(), directExecutor()); root.set("foo"); } SettableFuture root = SettableFuture.create(); ListenableFuture output = root; for (int i = 0; i < 10000; i++) { output = transformAsync(output, asyncIdentity(), directExecutor()); } try { root.set("foo"); fail(); } catch (StackOverflowError expected) { } } public void testTransformAsync_ErrorAfterCancellation() throws Exception { class Transformer implements AsyncFunction { ListenableFuture output; @Override public ListenableFuture apply(Object input) { output.cancel(false); throw new MyError(); } } Transformer transformer = new Transformer(); SettableFuture input = SettableFuture.create(); ListenableFuture output = transformAsync(input, transformer, directExecutor()); transformer.output = output; input.set("foo"); assertTrue(output.isCancelled()); } public void testTransformAsync_ExceptionAfterCancellation() throws Exception { class Transformer implements AsyncFunction { ListenableFuture output; @Override public ListenableFuture apply(Object input) { output.cancel(false); throw new MyRuntimeException(); } } Transformer transformer = new Transformer(); SettableFuture input = SettableFuture.create(); ListenableFuture output = transformAsync(input, transformer, directExecutor()); transformer.output = output; input.set("foo"); assertTrue(output.isCancelled()); } public void testTransformAsync_getThrowsRuntimeException() throws Exception { ListenableFuture input = UncheckedThrowingFuture.throwingRuntimeException(new MyRuntimeException()); ListenableFuture output = transformAsync(input, asyncIdentity(), directExecutor()); try { getDone(output); fail(); } catch (ExecutionException expected) { assertThat(expected.getCause()).isInstanceOf(MyRuntimeException.class); } } public void testTransformAsync_getThrowsError() throws Exception { ListenableFuture input = UncheckedThrowingFuture.throwingError(new MyError()); ListenableFuture output = transformAsync(input, asyncIdentity(), directExecutor()); try { getDone(output); fail(); } catch (ExecutionException expected) { assertThat(expected.getCause()).isInstanceOf(MyError.class); } } public void testTransformAsync_listenerThrowsError() throws Exception { SettableFuture input = SettableFuture.create(); ListenableFuture output = transformAsync(input, asyncIdentity(), directExecutor()); output.addListener( new Runnable() { @Override public void run() { throw new MyError(); } }, directExecutor()); try { input.set("foo"); fail(); } catch (MyError expected) { } } public void testTransform_rejectionPropagatesToOutput() throws Exception { SettableFuture input = SettableFuture.create(); Function identity = identity(); ListenableFuture transformed = transform(input, identity, REJECTING_EXECUTOR); input.set(new Foo()); try { getDone(transformed); fail(); } catch (ExecutionException expected) { assertThat(expected.getCause()).isInstanceOf(RejectedExecutionException.class); } } public void testTransformAsync_rejectionPropagatesToOutput() throws Exception { SettableFuture input = SettableFuture.create(); AsyncFunction asyncIdentity = asyncIdentity(); ListenableFuture transformed = transformAsync(input, asyncIdentity, REJECTING_EXECUTOR); input.set(new Foo()); try { getDone(transformed); fail(); } catch (ExecutionException expected) { assertThat(expected.getCause()).isInstanceOf(RejectedExecutionException.class); } } /** Tests that the function is invoked only once, even if it throws an exception. */ public void testTransformValueRemainsMemoized() throws Exception { class Holder { int value = 2; } final Holder holder = new Holder(); // This function adds the holder's value to the input value. Function adder = new Function() { @Override public Integer apply(Integer from) { return from + holder.value; } }; // Since holder.value is 2, applying 4 should yield 6. assertEquals(6, adder.apply(4).intValue()); ListenableFuture immediateFuture = immediateFuture(4); Future transformedFuture = transform(immediateFuture, adder, directExecutor()); // The composed future also yields 6. assertEquals(6, getDone(transformedFuture).intValue()); // Repeated calls yield the same value even though the function's behavior // changes holder.value = 3; assertEquals(6, getDone(transformedFuture).intValue()); assertEquals(7, adder.apply(4).intValue()); // Once more, with feeling. holder.value = 4; assertEquals(6, getDone(transformedFuture).intValue()); assertEquals(8, adder.apply(4).intValue()); // Memoized get also retains the value. assertEquals(6, getDoneFromTimeoutOverload(transformedFuture).intValue()); // Unsurprisingly, recomposing the future will return an updated value. assertEquals(8, getDone(transform(immediateFuture, adder, directExecutor())).intValue()); // Repeating, with the timeout version assertEquals( 8, getDoneFromTimeoutOverload(transform(immediateFuture, adder, directExecutor())).intValue()); } static class MyError extends Error {} static class MyRuntimeException extends RuntimeException {} /** * Test that the function is invoked only once, even if it throws an exception. Also, test that * that function's result is wrapped in an ExecutionException. */ @GwtIncompatible // reflection public void testTransformExceptionRemainsMemoized() throws Throwable { // We need to test with two input futures since ExecutionList.execute // doesn't catch Errors and we cannot depend on the order that our // transformations run. (So it is possible that the Error being thrown // could prevent our second transformations from running). SettableFuture exceptionInput = SettableFuture.create(); ListenableFuture exceptionComposedFuture = transform(exceptionInput, newOneTimeExceptionThrower(), directExecutor()); exceptionInput.set(0); runGetIdempotencyTest(exceptionComposedFuture, MyRuntimeException.class); SettableFuture errorInput = SettableFuture.create(); ListenableFuture errorComposedFuture = transform(errorInput, newOneTimeErrorThrower(), directExecutor()); errorInput.set(0); runGetIdempotencyTest(errorComposedFuture, MyError.class); /* * Try again when the input's value is already filled in, since the flow is * slightly different in that case. */ exceptionComposedFuture = transform(exceptionInput, newOneTimeExceptionThrower(), directExecutor()); runGetIdempotencyTest(exceptionComposedFuture, MyRuntimeException.class); runGetIdempotencyTest( transform(errorInput, newOneTimeErrorThrower(), directExecutor()), MyError.class); runGetIdempotencyTest(errorComposedFuture, MyError.class); } @GwtIncompatible // reflection private static void runGetIdempotencyTest( Future transformedFuture, Class expectedExceptionClass) throws Throwable { for (int i = 0; i < 5; i++) { try { getDone(transformedFuture); fail(); } catch (ExecutionException expected) { if (!expectedExceptionClass.isInstance(expected.getCause())) { throw expected.getCause(); } } } } @GwtIncompatible // used only in GwtIncompatible tests private static Function newOneTimeExceptionThrower() { return new Function() { int calls = 0; @Override public Integer apply(Integer from) { if (++calls > 1) { fail(); } throw new MyRuntimeException(); } }; } @GwtIncompatible // used only in GwtIncompatible tests private static Function newOneTimeErrorThrower() { return new Function() { int calls = 0; @Override public Integer apply(Integer from) { if (++calls > 1) { fail(); } throw new MyError(); } }; } // TODO(cpovirk): top-level class? static class ExecutorSpy implements Executor { Executor delegate; boolean wasExecuted; public ExecutorSpy(Executor delegate) { this.delegate = delegate; } @Override public void execute(Runnable command) { delegate.execute(command); wasExecuted = true; } } public void testTransform_Executor() throws Exception { Object value = new Object(); ExecutorSpy spy = new ExecutorSpy(directExecutor()); assertFalse(spy.wasExecuted); ListenableFuture future = transform(immediateFuture(value), identity(), spy); assertSame(value, getDone(future)); assertTrue(spy.wasExecuted); } @GwtIncompatible // Threads public void testTransformAsync_functionToString() throws Exception { final CountDownLatch functionCalled = new CountDownLatch(1); final CountDownLatch functionBlocking = new CountDownLatch(1); AsyncFunction function = tagged( "Called my toString", new AsyncFunction() { @Override public ListenableFuture apply(Object input) throws Exception { functionCalled.countDown(); functionBlocking.await(); return immediateFuture(null); } }); ExecutorService executor = Executors.newSingleThreadExecutor(); try { ListenableFuture output = Futures.transformAsync(immediateFuture(null), function, executor); functionCalled.await(); assertThat(output.toString()).contains(function.toString()); } finally { functionBlocking.countDown(); executor.shutdown(); } } @GwtIncompatible // lazyTransform public void testLazyTransform() throws Exception { FunctionSpy spy = new FunctionSpy<>(constant("bar")); Future input = immediateFuture("foo"); Future transformed = lazyTransform(input, spy); spy.verifyCallCount(0); assertEquals("bar", getDone(transformed)); spy.verifyCallCount(1); assertEquals("bar", getDone(transformed)); spy.verifyCallCount(2); } @GwtIncompatible // lazyTransform public void testLazyTransform_exception() throws Exception { final RuntimeException exception = new RuntimeException("deliberate"); Function function = new Function() { @Override public String apply(Integer input) { throw exception; } }; Future transformed = lazyTransform(immediateFuture(1), function); try { getDone(transformed); fail(); } catch (ExecutionException expected) { assertSame(exception, expected.getCause()); } try { getDoneFromTimeoutOverload(transformed); fail(); } catch (ExecutionException expected) { assertSame(exception, expected.getCause()); } } private static class FunctionSpy implements Function { private int applyCount; private final Function delegate; public FunctionSpy(Function delegate) { this.delegate = delegate; } @Override public O apply(I input) { applyCount++; return delegate.apply(input); } void verifyCallCount(int expected) { assertThat(applyCount).isEqualTo(expected); } } private static Function unexpectedFunction() { return new Function() { @Override public V apply(X t) { throw newAssertionError("Unexpected fallback", t); } }; } private static class AsyncFunctionSpy implements AsyncFunction { private int count; private final AsyncFunction delegate; public AsyncFunctionSpy(AsyncFunction delegate) { this.delegate = delegate; } @Override public final ListenableFuture apply(X t) throws Exception { count++; return delegate.apply(t); } void verifyCallCount(int expected) { assertThat(count).isEqualTo(expected); } } private static FunctionSpy spy(Function delegate) { return new FunctionSpy<>(delegate); } private static AsyncFunctionSpy spy(AsyncFunction delegate) { return new AsyncFunctionSpy<>(delegate); } private static AsyncFunction unexpectedAsyncFunction() { return new AsyncFunction() { @Override public ListenableFuture apply(X t) { throw newAssertionError("Unexpected fallback", t); } }; } /** Alternative to AssertionError(String, Throwable), which doesn't exist in GWT 2.6.1. */ private static AssertionError newAssertionError(String message, Throwable cause) { AssertionError e = new AssertionError(message); e.initCause(cause); return e; } // catchingAsync tests cloned from the old withFallback tests: public void testCatchingAsync_inputDoesNotRaiseException() throws Exception { AsyncFunction fallback = unexpectedAsyncFunction(); ListenableFuture originalFuture = immediateFuture(7); ListenableFuture faultTolerantFuture = catchingAsync(originalFuture, Throwable.class, fallback, directExecutor()); assertEquals(7, getDone(faultTolerantFuture).intValue()); } public void testCatchingAsync_inputRaisesException() throws Exception { final RuntimeException raisedException = new RuntimeException(); AsyncFunctionSpy fallback = spy( new AsyncFunction() { @Override public ListenableFuture apply(Throwable t) throws Exception { assertThat(t).isSameInstanceAs(raisedException); return immediateFuture(20); } }); ListenableFuture failingFuture = immediateFailedFuture(raisedException); ListenableFuture faultTolerantFuture = catchingAsync(failingFuture, Throwable.class, fallback, directExecutor()); assertEquals(20, getDone(faultTolerantFuture).intValue()); fallback.verifyCallCount(1); } @GwtIncompatible // non-Throwable exceptionType public void testCatchingAsync_inputCancelledWithoutFallback() throws Exception { AsyncFunction fallback = unexpectedAsyncFunction(); ListenableFuture originalFuture = immediateCancelledFuture(); ListenableFuture faultTolerantFuture = catchingAsync(originalFuture, IOException.class, fallback, directExecutor()); assertTrue(faultTolerantFuture.isCancelled()); } public void testCatchingAsync_fallbackGeneratesRuntimeException() throws Exception { RuntimeException expectedException = new RuntimeException(); runExpectedExceptionCatchingAsyncTest(expectedException, false); } public void testCatchingAsync_fallbackGeneratesCheckedException() throws Exception { Exception expectedException = new Exception() {}; runExpectedExceptionCatchingAsyncTest(expectedException, false); } public void testCatchingAsync_fallbackGeneratesError() throws Exception { final Error error = new Error("deliberate"); AsyncFunction fallback = new AsyncFunction() { @Override public ListenableFuture apply(Throwable t) throws Exception { throw error; } }; ListenableFuture failingFuture = immediateFailedFuture(new RuntimeException()); try { getDone(catchingAsync(failingFuture, Throwable.class, fallback, directExecutor())); fail(); } catch (ExecutionException expected) { assertSame(error, expected.getCause()); } } public void testCatchingAsync_fallbackReturnsRuntimeException() throws Exception { RuntimeException expectedException = new RuntimeException(); runExpectedExceptionCatchingAsyncTest(expectedException, true); } public void testCatchingAsync_fallbackReturnsCheckedException() throws Exception { Exception expectedException = new Exception() {}; runExpectedExceptionCatchingAsyncTest(expectedException, true); } private void runExpectedExceptionCatchingAsyncTest( final Exception expectedException, final boolean wrapInFuture) throws Exception { AsyncFunctionSpy fallback = spy( new AsyncFunction() { @Override public ListenableFuture apply(Throwable t) throws Exception { if (!wrapInFuture) { throw expectedException; } else { return immediateFailedFuture(expectedException); } } }); ListenableFuture failingFuture = immediateFailedFuture(new RuntimeException()); ListenableFuture faultTolerantFuture = catchingAsync(failingFuture, Throwable.class, fallback, directExecutor()); try { getDone(faultTolerantFuture); fail(); } catch (ExecutionException expected) { assertSame(expectedException, expected.getCause()); } fallback.verifyCallCount(1); } public void testCatchingAsync_fallbackNotReady() throws Exception { ListenableFuture primary = immediateFailedFuture(new Exception()); final SettableFuture secondary = SettableFuture.create(); AsyncFunction fallback = new AsyncFunction() { @Override public ListenableFuture apply(Throwable t) { return secondary; } }; ListenableFuture derived = catchingAsync(primary, Throwable.class, fallback, directExecutor()); secondary.set(1); assertEquals(1, (int) getDone(derived)); } public void testCatchingAsync_resultInterruptedBeforeFallback() throws Exception { SettableFuture primary = SettableFuture.create(); AsyncFunction fallback = unexpectedAsyncFunction(); ListenableFuture derived = catchingAsync(primary, Throwable.class, fallback, directExecutor()); derived.cancel(true); assertTrue(primary.isCancelled()); assertTrue(primary.wasInterrupted()); } public void testCatchingAsync_resultCancelledBeforeFallback() throws Exception { SettableFuture primary = SettableFuture.create(); AsyncFunction fallback = unexpectedAsyncFunction(); ListenableFuture derived = catchingAsync(primary, Throwable.class, fallback, directExecutor()); derived.cancel(false); assertTrue(primary.isCancelled()); assertFalse(primary.wasInterrupted()); } @GwtIncompatible // mocks // TODO(cpovirk): eliminate use of mocks @SuppressWarnings("unchecked") public void testCatchingAsync_resultCancelledAfterFallback() throws Exception { final SettableFuture secondary = SettableFuture.create(); final RuntimeException raisedException = new RuntimeException(); AsyncFunctionSpy fallback = spy( new AsyncFunction() { @Override public ListenableFuture apply(Throwable t) throws Exception { assertThat(t).isSameInstanceAs(raisedException); return secondary; } }); ListenableFuture failingFuture = immediateFailedFuture(raisedException); ListenableFuture derived = catchingAsync(failingFuture, Throwable.class, fallback, directExecutor()); derived.cancel(false); assertTrue(secondary.isCancelled()); assertFalse(secondary.wasInterrupted()); fallback.verifyCallCount(1); } public void testCatchingAsync_nullInsteadOfFuture() throws Exception { ListenableFuture inputFuture = immediateFailedFuture(new Exception()); ListenableFuture chainedFuture = catchingAsync( inputFuture, Throwable.class, new AsyncFunction() { @Override @SuppressWarnings("AsyncFunctionReturnsNull") public ListenableFuture apply(Throwable t) { return null; } }, directExecutor()); try { getDone(chainedFuture); fail(); } catch (ExecutionException expected) { NullPointerException cause = (NullPointerException) expected.getCause(); assertThat(cause) .hasMessageThat() .contains( "AsyncFunction.apply returned null instead of a Future. " + "Did you mean to return immediateFuture(null)?"); } } @GwtIncompatible // threads public void testCatchingAsync_interruptPropagatesToTransformingThread() throws Exception { SettableFuture input = SettableFuture.create(); final CountDownLatch inFunction = new CountDownLatch(1); final CountDownLatch shouldCompleteFunction = new CountDownLatch(1); final CountDownLatch gotException = new CountDownLatch(1); AsyncFunction function = new AsyncFunction() { @Override public ListenableFuture apply(Throwable t) throws Exception { inFunction.countDown(); try { shouldCompleteFunction.await(); } catch (InterruptedException expected) { gotException.countDown(); throw expected; } return immediateFuture("a"); } }; ListenableFuture futureResult = catchingAsync(input, Exception.class, function, newSingleThreadExecutor()); input.setException(new Exception()); inFunction.await(); futureResult.cancel(true); shouldCompleteFunction.countDown(); try { futureResult.get(); fail(); } catch (CancellationException expected) { } // TODO(cpovirk): implement interruption, updating this test: // https://github.com/google/guava/issues/1989 assertEquals(1, gotException.getCount()); // gotException.await(); } @GwtIncompatible // Threads public void testCatchingAsync_functionToString() throws Exception { final CountDownLatch functionCalled = new CountDownLatch(1); final CountDownLatch functionBlocking = new CountDownLatch(1); AsyncFunction function = tagged( "Called my toString", new AsyncFunction() { @Override public ListenableFuture apply(Object input) throws Exception { functionCalled.countDown(); functionBlocking.await(); return immediateFuture(null); } }); ExecutorService executor = Executors.newSingleThreadExecutor(); try { ListenableFuture output = Futures.catchingAsync( immediateFailedFuture(new RuntimeException()), Throwable.class, function, executor); functionCalled.await(); assertThat(output.toString()).contains(function.toString()); } finally { functionBlocking.countDown(); executor.shutdown(); } } public void testCatchingAsync_futureToString() throws Exception { final SettableFuture toReturn = SettableFuture.create(); AsyncFunction function = tagged( "Called my toString", new AsyncFunction() { @Override public ListenableFuture apply(Object input) throws Exception { return toReturn; } }); ListenableFuture output = Futures.catchingAsync( immediateFailedFuture(new RuntimeException()), Throwable.class, function, directExecutor()); assertThat(output.toString()).contains(toReturn.toString()); } // catching tests cloned from the old withFallback tests: public void testCatching_inputDoesNotRaiseException() throws Exception { Function fallback = unexpectedFunction(); ListenableFuture originalFuture = immediateFuture(7); ListenableFuture faultTolerantFuture = catching(originalFuture, Throwable.class, fallback, directExecutor()); assertEquals(7, getDone(faultTolerantFuture).intValue()); } public void testCatching_inputRaisesException() throws Exception { final RuntimeException raisedException = new RuntimeException(); FunctionSpy fallback = spy( new Function() { @Override public Integer apply(Throwable t) { assertThat(t).isSameInstanceAs(raisedException); return 20; } }); ListenableFuture failingFuture = immediateFailedFuture(raisedException); ListenableFuture faultTolerantFuture = catching(failingFuture, Throwable.class, fallback, directExecutor()); assertEquals(20, getDone(faultTolerantFuture).intValue()); fallback.verifyCallCount(1); } @GwtIncompatible // non-Throwable exceptionType public void testCatching_inputCancelledWithoutFallback() throws Exception { Function fallback = unexpectedFunction(); ListenableFuture originalFuture = immediateCancelledFuture(); ListenableFuture faultTolerantFuture = catching(originalFuture, IOException.class, fallback, directExecutor()); assertTrue(faultTolerantFuture.isCancelled()); } public void testCatching_fallbackGeneratesRuntimeException() throws Exception { RuntimeException expectedException = new RuntimeException(); runExpectedExceptionCatchingTest(expectedException); } /* * catching() uses a plain Function, so there's no * testCatching_fallbackGeneratesCheckedException(). */ public void testCatching_fallbackGeneratesError() throws Exception { final Error error = new Error("deliberate"); Function fallback = new Function() { @Override public Integer apply(Throwable t) { throw error; } }; ListenableFuture failingFuture = immediateFailedFuture(new RuntimeException()); try { getDone(catching(failingFuture, Throwable.class, fallback, directExecutor())); fail(); } catch (ExecutionException expected) { assertSame(error, expected.getCause()); } } /* * catching() uses a plain Function, so there's no testCatching_fallbackReturnsRuntimeException() * or testCatching_fallbackReturnsCheckedException(). */ private void runExpectedExceptionCatchingTest(final RuntimeException expectedException) throws Exception { FunctionSpy fallback = spy( new Function() { @Override public Integer apply(Throwable t) { throw expectedException; } }); ListenableFuture failingFuture = immediateFailedFuture(new RuntimeException()); ListenableFuture faultTolerantFuture = catching(failingFuture, Throwable.class, fallback, directExecutor()); try { getDone(faultTolerantFuture); fail(); } catch (ExecutionException expected) { assertSame(expectedException, expected.getCause()); } fallback.verifyCallCount(1); } // catching() uses a plain Function, so there's no testCatching_fallbackNotReady(). public void testCatching_resultInterruptedBeforeFallback() throws Exception { SettableFuture primary = SettableFuture.create(); Function fallback = unexpectedFunction(); ListenableFuture derived = catching(primary, Throwable.class, fallback, directExecutor()); derived.cancel(true); assertTrue(primary.isCancelled()); assertTrue(primary.wasInterrupted()); } public void testCatching_resultCancelledBeforeFallback() throws Exception { SettableFuture primary = SettableFuture.create(); Function fallback = unexpectedFunction(); ListenableFuture derived = catching(primary, Throwable.class, fallback, directExecutor()); derived.cancel(false); assertTrue(primary.isCancelled()); assertFalse(primary.wasInterrupted()); } // catching() uses a plain Function, so there's no testCatching_resultCancelledAfterFallback(). // catching() uses a plain Function, so there's no testCatching_nullInsteadOfFuture(). // Some tests of the exceptionType parameter: public void testCatching_Throwable() throws Exception { Function fallback = functionReturningOne(); ListenableFuture originalFuture = immediateFailedFuture(new IOException()); ListenableFuture faultTolerantFuture = catching(originalFuture, Throwable.class, fallback, directExecutor()); assertEquals(1, (int) getDone(faultTolerantFuture)); } @GwtIncompatible // non-Throwable exceptionType public void testCatching_customTypeMatch() throws Exception { Function fallback = functionReturningOne(); ListenableFuture originalFuture = immediateFailedFuture(new FileNotFoundException()); ListenableFuture faultTolerantFuture = catching(originalFuture, IOException.class, fallback, directExecutor()); assertEquals(1, (int) getDone(faultTolerantFuture)); } @GwtIncompatible // non-Throwable exceptionType public void testCatching_customTypeNoMatch() throws Exception { Function fallback = functionReturningOne(); ListenableFuture originalFuture = immediateFailedFuture(new RuntimeException()); ListenableFuture faultTolerantFuture = catching(originalFuture, IOException.class, fallback, directExecutor()); try { getDone(faultTolerantFuture); fail(); } catch (ExecutionException expected) { assertThat(expected.getCause()).isInstanceOf(RuntimeException.class); } } @GwtIncompatible // StackOverflowError public void testCatching_StackOverflow() throws Exception { { /* * Initialize all relevant classes before running the test, which may otherwise poison any * classes it is trying to load during its stack overflow. */ SettableFuture root = SettableFuture.create(); ListenableFuture unused = catching(root, MyException.class, identity(), directExecutor()); root.setException(new MyException()); } SettableFuture root = SettableFuture.create(); ListenableFuture output = root; for (int i = 0; i < 10000; i++) { output = catching(output, MyException.class, identity(), directExecutor()); } try { root.setException(new MyException()); fail(); } catch (StackOverflowError expected) { } } public void testCatching_ErrorAfterCancellation() throws Exception { class Fallback implements Function { ListenableFuture output; @Override public Object apply(Throwable input) { output.cancel(false); throw new MyError(); } } Fallback fallback = new Fallback(); SettableFuture input = SettableFuture.create(); ListenableFuture output = catching(input, Throwable.class, fallback, directExecutor()); fallback.output = output; input.setException(new MyException()); assertTrue(output.isCancelled()); } public void testCatching_ExceptionAfterCancellation() throws Exception { class Fallback implements Function { ListenableFuture output; @Override public Object apply(Throwable input) { output.cancel(false); throw new MyRuntimeException(); } } Fallback fallback = new Fallback(); SettableFuture input = SettableFuture.create(); ListenableFuture output = catching(input, Throwable.class, fallback, directExecutor()); fallback.output = output; input.setException(new MyException()); assertTrue(output.isCancelled()); } public void testCatching_getThrowsRuntimeException() throws Exception { ListenableFuture input = UncheckedThrowingFuture.throwingRuntimeException(new MyRuntimeException()); // We'd catch only MyRuntimeException.class here, but then the test won't compile under GWT. ListenableFuture output = catching(input, Throwable.class, identity(), directExecutor()); assertThat(getDone(output)).isInstanceOf(MyRuntimeException.class); } public void testCatching_getThrowsError() throws Exception { ListenableFuture input = UncheckedThrowingFuture.throwingError(new MyError()); // We'd catch only MyError.class here, but then the test won't compile under GWT. ListenableFuture output = catching(input, Throwable.class, identity(), directExecutor()); assertThat(getDone(output)).isInstanceOf(MyError.class); } public void testCatching_listenerThrowsError() throws Exception { SettableFuture input = SettableFuture.create(); ListenableFuture output = catching(input, Throwable.class, identity(), directExecutor()); output.addListener( new Runnable() { @Override public void run() { throw new MyError(); } }, directExecutor()); try { input.setException(new MyException()); fail(); } catch (MyError expected) { } } public void testCatchingAsync_Throwable() throws Exception { AsyncFunction fallback = asyncFunctionReturningOne(); ListenableFuture originalFuture = immediateFailedFuture(new IOException()); ListenableFuture faultTolerantFuture = catchingAsync(originalFuture, Throwable.class, fallback, directExecutor()); assertEquals(1, (int) getDone(faultTolerantFuture)); } @GwtIncompatible // non-Throwable exceptionType public void testCatchingAsync_customTypeMatch() throws Exception { AsyncFunction fallback = asyncFunctionReturningOne(); ListenableFuture originalFuture = immediateFailedFuture(new FileNotFoundException()); ListenableFuture faultTolerantFuture = catchingAsync(originalFuture, IOException.class, fallback, directExecutor()); assertEquals(1, (int) getDone(faultTolerantFuture)); } @GwtIncompatible // non-Throwable exceptionType public void testCatchingAsync_customTypeNoMatch() throws Exception { AsyncFunction fallback = asyncFunctionReturningOne(); ListenableFuture originalFuture = immediateFailedFuture(new RuntimeException()); ListenableFuture faultTolerantFuture = catchingAsync(originalFuture, IOException.class, fallback, directExecutor()); try { getDone(faultTolerantFuture); fail(); } catch (ExecutionException expected) { assertThat(expected.getCause()).isInstanceOf(RuntimeException.class); } } @GwtIncompatible // StackOverflowError public void testCatchingAsync_StackOverflow() throws Exception { { /* * Initialize all relevant classes before running the test, which may otherwise poison any * classes it is trying to load during its stack overflow. */ SettableFuture root = SettableFuture.create(); ListenableFuture unused = catchingAsync(root, MyException.class, asyncIdentity(), directExecutor()); root.setException(new MyException()); } SettableFuture root = SettableFuture.create(); ListenableFuture output = root; for (int i = 0; i < 10000; i++) { output = catchingAsync(output, MyException.class, asyncIdentity(), directExecutor()); } try { root.setException(new MyException()); fail(); } catch (StackOverflowError expected) { } } public void testCatchingAsync_ErrorAfterCancellation() throws Exception { class Fallback implements AsyncFunction { ListenableFuture output; @Override public ListenableFuture apply(Throwable input) { output.cancel(false); throw new MyError(); } } Fallback fallback = new Fallback(); SettableFuture input = SettableFuture.create(); ListenableFuture output = catchingAsync(input, Throwable.class, fallback, directExecutor()); fallback.output = output; input.setException(new MyException()); assertTrue(output.isCancelled()); } public void testCatchingAsync_ExceptionAfterCancellation() throws Exception { class Fallback implements AsyncFunction { ListenableFuture output; @Override public ListenableFuture apply(Throwable input) { output.cancel(false); throw new MyRuntimeException(); } } Fallback fallback = new Fallback(); SettableFuture input = SettableFuture.create(); ListenableFuture output = catchingAsync(input, Throwable.class, fallback, directExecutor()); fallback.output = output; input.setException(new MyException()); assertTrue(output.isCancelled()); } public void testCatchingAsync_getThrowsRuntimeException() throws Exception { ListenableFuture input = UncheckedThrowingFuture.throwingRuntimeException(new MyRuntimeException()); // We'd catch only MyRuntimeException.class here, but then the test won't compile under GWT. ListenableFuture output = catchingAsync(input, Throwable.class, asyncIdentity(), directExecutor()); assertThat(getDone(output)).isInstanceOf(MyRuntimeException.class); } public void testCatchingAsync_getThrowsError() throws Exception { ListenableFuture input = UncheckedThrowingFuture.throwingError(new MyError()); // We'd catch only MyError.class here, but then the test won't compile under GWT. ListenableFuture output = catchingAsync(input, Throwable.class, asyncIdentity(), directExecutor()); assertThat(getDone(output)).isInstanceOf(MyError.class); } public void testCatchingAsync_listenerThrowsError() throws Exception { SettableFuture input = SettableFuture.create(); ListenableFuture output = catchingAsync(input, Throwable.class, asyncIdentity(), directExecutor()); output.addListener( new Runnable() { @Override public void run() { throw new MyError(); } }, directExecutor()); try { input.setException(new MyException()); fail(); } catch (MyError expected) { } } public void testCatching_rejectionPropagatesToOutput() throws Exception { SettableFuture input = SettableFuture.create(); ListenableFuture transformed = catching(input, Throwable.class, constant("foo"), REJECTING_EXECUTOR); input.setException(new Exception()); try { getDone(transformed); fail(); } catch (ExecutionException expected) { assertThat(expected.getCause()).isInstanceOf(RejectedExecutionException.class); } } public void testCatchingAsync_rejectionPropagatesToOutput() throws Exception { SettableFuture input = SettableFuture.create(); ListenableFuture transformed = catchingAsync( input, Throwable.class, constantAsyncFunction(immediateFuture("foo")), REJECTING_EXECUTOR); input.setException(new Exception()); try { getDone(transformed); fail(); } catch (ExecutionException expected) { assertThat(expected.getCause()).isInstanceOf(RejectedExecutionException.class); } } private Function functionReturningOne() { return new Function() { @Override public Integer apply(X t) { return 1; } }; } private AsyncFunction asyncFunctionReturningOne() { return new AsyncFunction() { @Override public ListenableFuture apply(X t) { return immediateFuture(1); } }; } private static AsyncFunction constantAsyncFunction( final ListenableFuture output) { return new AsyncFunction() { @Override public ListenableFuture apply(I input) { return output; } }; } public void testTransformAsync_genericsWildcard_AsyncFunction() throws Exception { ListenableFuture nullFuture = immediateFuture(null); ListenableFuture chainedFuture = transformAsync(nullFuture, constantAsyncFunction(nullFuture), directExecutor()); assertNull(getDone(chainedFuture)); } public void testTransformAsync_genericsHierarchy_AsyncFunction() throws Exception { ListenableFuture future = immediateFuture(null); final BarChild barChild = new BarChild(); AsyncFunction function = new AsyncFunction() { @Override public AbstractFuture apply(Foo unused) { AbstractFuture future = new AbstractFuture() {}; future.set(barChild); return future; } }; Bar bar = getDone(transformAsync(future, function, directExecutor())); assertSame(barChild, bar); } @GwtIncompatible // get() timeout public void testTransformAsync_asyncFunction_timeout() throws InterruptedException, ExecutionException { AsyncFunction function = constantAsyncFunction(immediateFuture(1)); ListenableFuture future = transformAsync(SettableFuture.create(), function, directExecutor()); try { future.get(1, MILLISECONDS); fail(); } catch (TimeoutException expected) { } } public void testTransformAsync_asyncFunction_error() throws InterruptedException { final Error error = new Error("deliberate"); AsyncFunction function = new AsyncFunction() { @Override public ListenableFuture apply(String input) { throw error; } }; SettableFuture inputFuture = SettableFuture.create(); ListenableFuture outputFuture = transformAsync(inputFuture, function, directExecutor()); inputFuture.set("value"); try { getDone(outputFuture); fail(); } catch (ExecutionException expected) { assertSame(error, expected.getCause()); } } public void testTransformAsync_asyncFunction_nullInsteadOfFuture() throws Exception { ListenableFuture inputFuture = immediateFuture("a"); ListenableFuture chainedFuture = transformAsync(inputFuture, constantAsyncFunction(null), directExecutor()); try { getDone(chainedFuture); fail(); } catch (ExecutionException expected) { NullPointerException cause = (NullPointerException) expected.getCause(); assertThat(cause) .hasMessageThat() .contains( "AsyncFunction.apply returned null instead of a Future. " + "Did you mean to return immediateFuture(null)?"); } } @GwtIncompatible // threads public void testTransformAsync_asyncFunction_cancelledWhileApplyingFunction() throws InterruptedException, ExecutionException { final CountDownLatch inFunction = new CountDownLatch(1); final CountDownLatch functionDone = new CountDownLatch(1); final SettableFuture resultFuture = SettableFuture.create(); AsyncFunction function = new AsyncFunction() { @Override public ListenableFuture apply(String input) throws Exception { inFunction.countDown(); functionDone.await(); return resultFuture; } }; SettableFuture inputFuture = SettableFuture.create(); ListenableFuture future = transformAsync(inputFuture, function, newSingleThreadExecutor()); inputFuture.set("value"); inFunction.await(); future.cancel(false); functionDone.countDown(); try { future.get(); fail(); } catch (CancellationException expected) { } try { resultFuture.get(); fail(); } catch (CancellationException expected) { } } @GwtIncompatible // threads public void testTransformAsync_asyncFunction_cancelledBeforeApplyingFunction() throws InterruptedException { final AtomicBoolean functionCalled = new AtomicBoolean(); AsyncFunction function = new AsyncFunction() { @Override public ListenableFuture apply(String input) throws Exception { functionCalled.set(true); return immediateFuture(1); } }; SettableFuture inputFuture = SettableFuture.create(); ExecutorService executor = newSingleThreadExecutor(); ListenableFuture future = transformAsync(inputFuture, function, executor); // Pause the executor. final CountDownLatch beforeFunction = new CountDownLatch(1); executor.execute( new Runnable() { @Override public void run() { awaitUninterruptibly(beforeFunction); } }); // Cancel the future after making input available. inputFuture.set("value"); future.cancel(false); // Unpause the executor. beforeFunction.countDown(); executor.shutdown(); assertTrue(executor.awaitTermination(5, SECONDS)); assertFalse(functionCalled.get()); } public void testSubmitAsync_asyncCallable_error() throws InterruptedException { final Error error = new Error("deliberate"); AsyncCallable callable = new AsyncCallable() { @Override public ListenableFuture call() { throw error; } }; SettableFuture inputFuture = SettableFuture.create(); ListenableFuture outputFuture = submitAsync(callable, directExecutor()); inputFuture.set("value"); try { getDone(outputFuture); fail(); } catch (ExecutionException expected) { assertSame(error, expected.getCause()); } } public void testSubmitAsync_asyncCallable_nullInsteadOfFuture() throws Exception { ListenableFuture chainedFuture = submitAsync(constantAsyncCallable(null), directExecutor()); try { getDone(chainedFuture); fail(); } catch (ExecutionException expected) { NullPointerException cause = (NullPointerException) expected.getCause(); assertThat(cause) .hasMessageThat() .contains( "AsyncCallable.call returned null instead of a Future. " + "Did you mean to return immediateFuture(null)?"); } } @GwtIncompatible // threads public void testSubmitAsync_asyncCallable_cancelledWhileApplyingFunction() throws InterruptedException, ExecutionException { final CountDownLatch inFunction = new CountDownLatch(1); final CountDownLatch callableDone = new CountDownLatch(1); final SettableFuture resultFuture = SettableFuture.create(); AsyncCallable callable = new AsyncCallable() { @Override public ListenableFuture call() throws InterruptedException { inFunction.countDown(); callableDone.await(); return resultFuture; } }; SettableFuture inputFuture = SettableFuture.create(); ListenableFuture future = submitAsync(callable, newSingleThreadExecutor()); inputFuture.set("value"); inFunction.await(); future.cancel(false); callableDone.countDown(); try { future.get(); fail(); } catch (CancellationException expected) { } try { resultFuture.get(); fail(); } catch (CancellationException expected) { } } @GwtIncompatible // threads public void testSubmitAsync_asyncCallable_cancelledBeforeApplyingFunction() throws InterruptedException { final AtomicBoolean callableCalled = new AtomicBoolean(); AsyncCallable callable = new AsyncCallable() { @Override public ListenableFuture call() { callableCalled.set(true); return immediateFuture(1); } }; ExecutorService executor = newSingleThreadExecutor(); // Pause the executor. final CountDownLatch beforeFunction = new CountDownLatch(1); executor.execute( new Runnable() { @Override public void run() { awaitUninterruptibly(beforeFunction); } }); ListenableFuture future = submitAsync(callable, executor); future.cancel(false); // Unpause the executor. beforeFunction.countDown(); executor.shutdown(); assertTrue(executor.awaitTermination(5, SECONDS)); assertFalse(callableCalled.get()); } @GwtIncompatible // threads public void testSubmitAsync_asyncCallable_returnsInterruptedFuture() throws InterruptedException { assertThat(Thread.interrupted()).isFalse(); SettableFuture cancelledFuture = SettableFuture.create(); cancelledFuture.cancel(true); assertThat(Thread.interrupted()).isFalse(); ListenableFuture future = submitAsync(constantAsyncCallable(cancelledFuture), directExecutor()); assertThat(future.isDone()).isTrue(); assertThat(Thread.interrupted()).isFalse(); } public void testSubmit_callable_returnsValue() throws Exception { Callable callable = new Callable() { @Override public Integer call() { return 42; } }; ListenableFuture future = submit(callable, directExecutor()); assertThat(future.isDone()).isTrue(); assertThat(getDone(future)).isEqualTo(42); } public void testSubmit_callable_throwsException() { final Exception exception = new Exception("Exception for testing"); Callable callable = new Callable() { @Override public Integer call() throws Exception { throw exception; } }; ListenableFuture future = submit(callable, directExecutor()); try { getDone(future); fail(); } catch (ExecutionException expected) { assertThat(expected).hasCauseThat().isSameInstanceAs(exception); } } public void testSubmit_runnable_completesAfterRun() throws Exception { final List pendingRunnables = newArrayList(); final List executedRunnables = newArrayList(); Runnable runnable = new Runnable() { @Override public void run() { executedRunnables.add(this); } }; Executor executor = new Executor() { @Override public void execute(Runnable runnable) { pendingRunnables.add(runnable); } }; ListenableFuture future = submit(runnable, executor); assertThat(future.isDone()).isFalse(); assertThat(executedRunnables).isEmpty(); assertThat(pendingRunnables).hasSize(1); pendingRunnables.remove(0).run(); assertThat(future.isDone()).isTrue(); assertThat(executedRunnables).containsExactly(runnable); assertThat(pendingRunnables).isEmpty(); } public void testSubmit_runnable_throwsException() throws Exception { final RuntimeException exception = new RuntimeException("Exception for testing"); Runnable runnable = new Runnable() { @Override public void run() { throw exception; } }; ListenableFuture future = submit(runnable, directExecutor()); try { getDone(future); fail(); } catch (ExecutionException expected) { assertThat(expected).hasCauseThat().isSameInstanceAs(exception); } } @GwtIncompatible // threads public void testScheduleAsync_asyncCallable_error() throws InterruptedException { final Error error = new Error("deliberate"); AsyncCallable callable = new AsyncCallable() { @Override public ListenableFuture call() { throw error; } }; SettableFuture inputFuture = SettableFuture.create(); ListenableFuture outputFuture = submitAsync(callable, directExecutor()); inputFuture.set("value"); try { getDone(outputFuture); fail(); } catch (ExecutionException expected) { assertSame(error, expected.getCause()); } } @GwtIncompatible // threads public void testScheduleAsync_asyncCallable_nullInsteadOfFuture() throws Exception { ListenableFuture chainedFuture = scheduleAsync( constantAsyncCallable(null), 1, TimeUnit.NANOSECONDS, newSingleThreadScheduledExecutor()); try { chainedFuture.get(); fail(); } catch (ExecutionException expected) { NullPointerException cause = (NullPointerException) expected.getCause(); assertThat(cause) .hasMessageThat() .contains( "AsyncCallable.call returned null instead of a Future. " + "Did you mean to return immediateFuture(null)?"); } } @GwtIncompatible // threads public void testScheduleAsync_asyncCallable_cancelledWhileApplyingFunction() throws InterruptedException, ExecutionException { final CountDownLatch inFunction = new CountDownLatch(1); final CountDownLatch callableDone = new CountDownLatch(1); final SettableFuture resultFuture = SettableFuture.create(); AsyncCallable callable = new AsyncCallable() { @Override public ListenableFuture call() throws InterruptedException { inFunction.countDown(); callableDone.await(); return resultFuture; } }; ListenableFuture future = scheduleAsync(callable, 1, TimeUnit.NANOSECONDS, newSingleThreadScheduledExecutor()); inFunction.await(); future.cancel(false); callableDone.countDown(); try { future.get(); fail(); } catch (CancellationException expected) { } try { resultFuture.get(); fail(); } catch (CancellationException expected) { } } @GwtIncompatible // threads public void testScheduleAsync_asyncCallable_cancelledBeforeCallingFunction() throws InterruptedException { final AtomicBoolean callableCalled = new AtomicBoolean(); AsyncCallable callable = new AsyncCallable() { @Override public ListenableFuture call() { callableCalled.set(true); return immediateFuture(1); } }; ScheduledExecutorService executor = newSingleThreadScheduledExecutor(); // Pause the executor. final CountDownLatch beforeFunction = new CountDownLatch(1); executor.execute( new Runnable() { @Override public void run() { awaitUninterruptibly(beforeFunction); } }); ListenableFuture future = scheduleAsync(callable, 1, TimeUnit.NANOSECONDS, executor); future.cancel(false); // Unpause the executor. beforeFunction.countDown(); executor.shutdown(); assertTrue(executor.awaitTermination(5, SECONDS)); assertFalse(callableCalled.get()); } private static AsyncCallable constantAsyncCallable(final ListenableFuture returnValue) { return new AsyncCallable() { @Override public ListenableFuture call() { return returnValue; } }; } /** Runnable which can be called a single time, and only after {@link #expectCall} is called. */ // TODO(cpovirk): top-level class? private static class SingleCallListener implements Runnable { private boolean expectCall = false; private final AtomicBoolean called = new AtomicBoolean(); @Override public void run() { assertTrue("Listener called before it was expected", expectCall); assertFalse("Listener called more than once", wasCalled()); called.set(true); } public void expectCall() { assertFalse("expectCall is already true", expectCall); expectCall = true; } public boolean wasCalled() { return called.get(); } } public void testAllAsList() throws Exception { // Create input and output SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); SettableFuture future3 = SettableFuture.create(); @SuppressWarnings("unchecked") // array is never modified ListenableFuture> compound = allAsList(future1, future2, future3); // Attach a listener SingleCallListener listener = new SingleCallListener(); compound.addListener(listener, directExecutor()); // Satisfy each input and check the output assertFalse(compound.isDone()); future1.set(DATA1); assertFalse(compound.isDone()); future2.set(DATA2); assertFalse(compound.isDone()); listener.expectCall(); future3.set(DATA3); assertTrue(listener.wasCalled()); List results = getDone(compound); assertThat(results).containsExactly(DATA1, DATA2, DATA3).inOrder(); } public void testAllAsList_emptyList() throws Exception { SingleCallListener listener = new SingleCallListener(); listener.expectCall(); List> futures = ImmutableList.of(); ListenableFuture> compound = allAsList(futures); compound.addListener(listener, directExecutor()); assertThat(getDone(compound)).isEmpty(); assertTrue(listener.wasCalled()); } public void testAllAsList_emptyArray() throws Exception { SingleCallListener listener = new SingleCallListener(); listener.expectCall(); @SuppressWarnings("unchecked") // array is never modified ListenableFuture> compound = allAsList(); compound.addListener(listener, directExecutor()); assertThat(getDone(compound)).isEmpty(); assertTrue(listener.wasCalled()); } public void testAllAsList_failure() throws Exception { SingleCallListener listener = new SingleCallListener(); SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); @SuppressWarnings("unchecked") // array is never modified ListenableFuture> compound = allAsList(future1, future2); compound.addListener(listener, directExecutor()); listener.expectCall(); Throwable exception = new Throwable("failed1"); future1.setException(exception); assertTrue(compound.isDone()); assertTrue(listener.wasCalled()); assertFalse(future2.isDone()); try { getDone(compound); fail(); } catch (ExecutionException expected) { assertSame(exception, expected.getCause()); } } public void testAllAsList_singleFailure() throws Exception { Throwable exception = new Throwable("failed"); ListenableFuture future = immediateFailedFuture(exception); ListenableFuture> compound = allAsList(ImmutableList.of(future)); try { getDone(compound); fail(); } catch (ExecutionException expected) { assertSame(exception, expected.getCause()); } } public void testAllAsList_immediateFailure() throws Exception { Throwable exception = new Throwable("failed"); ListenableFuture future1 = immediateFailedFuture(exception); ListenableFuture future2 = immediateFuture("results"); ListenableFuture> compound = allAsList(ImmutableList.of(future1, future2)); try { getDone(compound); fail(); } catch (ExecutionException expected) { assertSame(exception, expected.getCause()); } } public void testAllAsList_error() throws Exception { Error error = new Error("deliberate"); SettableFuture future1 = SettableFuture.create(); ListenableFuture future2 = immediateFuture("results"); ListenableFuture> compound = allAsList(ImmutableList.of(future1, future2)); future1.setException(error); try { getDone(compound); fail(); } catch (ExecutionException expected) { assertSame(error, expected.getCause()); } } public void testAllAsList_cancelled() throws Exception { SingleCallListener listener = new SingleCallListener(); SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); @SuppressWarnings("unchecked") // array is never modified ListenableFuture> compound = allAsList(future1, future2); compound.addListener(listener, directExecutor()); listener.expectCall(); future1.cancel(true); assertTrue(compound.isDone()); assertTrue(listener.wasCalled()); assertFalse(future2.isDone()); try { getDone(compound); fail(); } catch (CancellationException expected) { } } public void testAllAsList_resultCancelled() throws Exception { SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); @SuppressWarnings("unchecked") // array is never modified ListenableFuture> compound = allAsList(future1, future2); future2.set(DATA2); assertFalse(compound.isDone()); assertTrue(compound.cancel(false)); assertTrue(compound.isCancelled()); assertTrue(future1.isCancelled()); assertFalse(future1.wasInterrupted()); } public void testAllAsList_resultCancelledInterrupted_withSecondaryListFuture() throws Exception { SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); ListenableFuture> compound = allAsList(future1, future2); // There was a bug where the event listener for the combined future would // result in the sub-futures being cancelled without being interrupted. ListenableFuture> otherCompound = allAsList(future1, future2); assertTrue(compound.cancel(true)); assertTrue(future1.isCancelled()); assertTrue(future1.wasInterrupted()); assertTrue(future2.isCancelled()); assertTrue(future2.wasInterrupted()); assertTrue(otherCompound.isCancelled()); } public void testAllAsList_resultCancelled_withSecondaryListFuture() throws Exception { SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); ListenableFuture> compound = allAsList(future1, future2); // This next call is "unused," but it is an important part of the test. Don't remove it! ListenableFuture> unused = allAsList(future1, future2); assertTrue(compound.cancel(false)); assertTrue(future1.isCancelled()); assertFalse(future1.wasInterrupted()); assertTrue(future2.isCancelled()); assertFalse(future2.wasInterrupted()); } public void testAllAsList_resultInterrupted() throws Exception { SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); @SuppressWarnings("unchecked") // array is never modified ListenableFuture> compound = allAsList(future1, future2); future2.set(DATA2); assertFalse(compound.isDone()); assertTrue(compound.cancel(true)); assertTrue(compound.isCancelled()); assertTrue(future1.isCancelled()); assertTrue(future1.wasInterrupted()); } /** * Test the case where the futures are fulfilled prior to constructing the ListFuture. There was a * bug where the loop that connects a Listener to each of the futures would die on the last * loop-check as done() on ListFuture nulled out the variable being looped over (the list of * futures). */ public void testAllAsList_doneFutures() throws Exception { // Create input and output SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); SettableFuture future3 = SettableFuture.create(); // Satisfy each input prior to creating compound and check the output future1.set(DATA1); future2.set(DATA2); future3.set(DATA3); @SuppressWarnings("unchecked") // array is never modified ListenableFuture> compound = allAsList(future1, future2, future3); // Attach a listener SingleCallListener listener = new SingleCallListener(); listener.expectCall(); compound.addListener(listener, directExecutor()); assertTrue(listener.wasCalled()); List results = getDone(compound); assertThat(results).containsExactly(DATA1, DATA2, DATA3).inOrder(); } /** A single non-error failure is not logged because it is reported via the output future. */ @SuppressWarnings("unchecked") public void testAllAsList_logging_exception() throws Exception { try { getDone(allAsList(immediateFailedFuture(new MyException()))); fail(); } catch (ExecutionException expected) { assertThat(expected.getCause()).isInstanceOf(MyException.class); assertEquals( "Nothing should be logged", 0, aggregateFutureLogHandler.getStoredLogRecords().size()); } } /** Ensure that errors are always logged. */ @SuppressWarnings("unchecked") public void testAllAsList_logging_error() throws Exception { try { getDone(allAsList(immediateFailedFuture(new MyError()))); fail(); } catch (ExecutionException expected) { assertThat(expected.getCause()).isInstanceOf(MyError.class); List logged = aggregateFutureLogHandler.getStoredLogRecords(); assertThat(logged).hasSize(1); // errors are always logged assertThat(logged.get(0).getThrown()).isInstanceOf(MyError.class); } } /** All as list will log extra exceptions that have already occurred. */ @SuppressWarnings("unchecked") public void testAllAsList_logging_multipleExceptions_alreadyDone() throws Exception { try { getDone( allAsList( immediateFailedFuture(new MyException()), immediateFailedFuture(new MyException()))); fail(); } catch (ExecutionException expected) { assertThat(expected.getCause()).isInstanceOf(MyException.class); List logged = aggregateFutureLogHandler.getStoredLogRecords(); assertThat(logged).hasSize(1); // the second failure is logged assertThat(logged.get(0).getThrown()).isInstanceOf(MyException.class); } } /** All as list will log extra exceptions that occur later. */ @SuppressWarnings("unchecked") public void testAllAsList_logging_multipleExceptions_doneLater() throws Exception { SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); SettableFuture future3 = SettableFuture.create(); ListenableFuture> all = allAsList(future1, future2, future3); future1.setException(new MyException()); future2.setException(new MyException()); future3.setException(new MyException()); try { getDone(all); fail(); } catch (ExecutionException expected) { List logged = aggregateFutureLogHandler.getStoredLogRecords(); assertThat(logged).hasSize(2); // failures after the first are logged assertThat(logged.get(0).getThrown()).isInstanceOf(MyException.class); assertThat(logged.get(1).getThrown()).isInstanceOf(MyException.class); } } /** The same exception happening on multiple futures should not be logged. */ @SuppressWarnings("unchecked") public void testAllAsList_logging_same_exception() throws Exception { try { MyException sameInstance = new MyException(); getDone(allAsList(immediateFailedFuture(sameInstance), immediateFailedFuture(sameInstance))); fail(); } catch (ExecutionException expected) { assertThat(expected.getCause()).isInstanceOf(MyException.class); assertEquals( "Nothing should be logged", 0, aggregateFutureLogHandler.getStoredLogRecords().size()); } } public void testAllAsList_logging_seenExceptionUpdateRace() throws Exception { final MyException sameInstance = new MyException(); SettableFuture firstFuture = SettableFuture.create(); final SettableFuture secondFuture = SettableFuture.create(); ListenableFuture> bulkFuture = allAsList(firstFuture, secondFuture); bulkFuture.addListener( new Runnable() { @Override public void run() { /* * firstFuture just completed, but AggregateFuture hasn't yet had time to record the * exception in seenExceptions. When we complete secondFuture with the same exception, * we want for AggregateFuture to still detect that it's been previously seen. */ secondFuture.setException(sameInstance); } }, directExecutor()); firstFuture.setException(sameInstance); try { getDone(bulkFuture); fail(); } catch (ExecutionException expected) { assertThat(expected.getCause()).isInstanceOf(MyException.class); assertThat(aggregateFutureLogHandler.getStoredLogRecords()).isEmpty(); } } public void testAllAsList_logging_seenExceptionUpdateCancelRace() throws Exception { final MyException subsequentFailure = new MyException(); SettableFuture firstFuture = SettableFuture.create(); final SettableFuture secondFuture = SettableFuture.create(); ListenableFuture> bulkFuture = allAsList(firstFuture, secondFuture); bulkFuture.addListener( new Runnable() { @Override public void run() { /* * This is similar to the above test, but this time we're making sure that we recognize * that the output Future is done early not because of an exception but because of a * cancellation. */ secondFuture.setException(subsequentFailure); } }, directExecutor()); firstFuture.cancel(false); try { getDone(bulkFuture); fail(); } catch (CancellationException expected) { assertThat(getOnlyElement(aggregateFutureLogHandler.getStoredLogRecords()).getThrown()) .isSameInstanceAs(subsequentFailure); } } /** * Different exceptions happening on multiple futures with the same cause should not be logged. */ @SuppressWarnings("unchecked") public void testAllAsList_logging_same_cause() throws Exception { try { MyException exception1 = new MyException(); MyException exception2 = new MyException(); MyException exception3 = new MyException(); MyException sameInstance = new MyException(); exception1.initCause(sameInstance); exception2.initCause(sameInstance); exception3.initCause(exception2); getDone(allAsList(immediateFailedFuture(exception1), immediateFailedFuture(exception3))); fail(); } catch (ExecutionException expected) { assertThat(expected.getCause()).isInstanceOf(MyException.class); assertEquals( "Nothing should be logged", 0, aggregateFutureLogHandler.getStoredLogRecords().size()); } } private static String createCombinedResult(Integer i, Boolean b) { return "-" + i + "-" + b; } @GwtIncompatible // threads public void testWhenAllComplete_noLeakInterruption() throws Exception { final SettableFuture stringFuture = SettableFuture.create(); AsyncCallable combiner = new AsyncCallable() { @Override public ListenableFuture call() throws Exception { return stringFuture; } }; ListenableFuture futureResult = whenAllComplete().callAsync(combiner, directExecutor()); assertThat(Thread.interrupted()).isFalse(); futureResult.cancel(true); assertThat(Thread.interrupted()).isFalse(); } public void testWhenAllComplete_wildcard() throws Exception { ListenableFuture futureA = immediateFuture("a"); ListenableFuture futureB = immediateFuture("b"); ListenableFuture[] futures = new ListenableFuture[0]; Callable combiner = new Callable() { @Override public String call() throws Exception { return "hi"; } }; // We'd like for all the following to compile. ListenableFuture unused; // Compiles: unused = whenAllComplete(futureA, futureB).call(combiner, directExecutor()); // Does not compile: // unused = whenAllComplete(futures).call(combiner); // Workaround for the above: unused = whenAllComplete(asList(futures)).call(combiner, directExecutor()); } @GwtIncompatible // threads public void testWhenAllComplete_asyncResult() throws Exception { SettableFuture futureInteger = SettableFuture.create(); SettableFuture futureBoolean = SettableFuture.create(); final ExecutorService executor = newSingleThreadExecutor(); final CountDownLatch callableBlocking = new CountDownLatch(1); final SettableFuture resultOfCombiner = SettableFuture.create(); AsyncCallable combiner = tagged( "Called my toString", new AsyncCallable() { @Override public ListenableFuture call() throws Exception { // Make this executor terminate after this task so that the test can tell when // futureResult has received resultOfCombiner. executor.shutdown(); callableBlocking.await(); return resultOfCombiner; } }); ListenableFuture futureResult = whenAllComplete(futureInteger, futureBoolean).callAsync(combiner, executor); // Waiting on backing futures assertThat(futureResult.toString()) .matches( "CombinedFuture@\\w+\\[status=PENDING," + " info=\\[futures=\\[SettableFuture@\\w+\\[status=PENDING]," + " SettableFuture@\\w+\\[status=PENDING]]]]"); Integer integerPartial = 1; futureInteger.set(integerPartial); assertThat(futureResult.toString()) .matches( "CombinedFuture@\\w+\\[status=PENDING," + " info=\\[futures=\\[SettableFuture@\\w+\\[status=SUCCESS," + " result=\\[java.lang.Integer@\\w+]], SettableFuture@\\w+\\[status=PENDING]]]]"); // Backing futures complete Boolean booleanPartial = true; futureBoolean.set(booleanPartial); // Once the backing futures are done there's a (brief) moment where we know nothing assertThat(futureResult.toString()).matches("CombinedFuture@\\w+\\[status=PENDING]"); callableBlocking.countDown(); // Need to wait for resultFuture to be returned. assertTrue(executor.awaitTermination(10, SECONDS)); // But once the async function has returned a future we can include that in the toString assertThat(futureResult.toString()) .matches( "CombinedFuture@\\w+\\[status=PENDING," + " setFuture=\\[SettableFuture@\\w+\\[status=PENDING]]]"); // Future complete resultOfCombiner.set(createCombinedResult(getDone(futureInteger), getDone(futureBoolean))); String expectedResult = createCombinedResult(integerPartial, booleanPartial); assertEquals(expectedResult, futureResult.get()); assertThat(futureResult.toString()) .matches("CombinedFuture@\\w+\\[status=SUCCESS, result=\\[java.lang.String@\\w+]]"); } public void testWhenAllComplete_asyncError() throws Exception { final Exception thrown = new RuntimeException("test"); final SettableFuture futureInteger = SettableFuture.create(); final SettableFuture futureBoolean = SettableFuture.create(); AsyncCallable combiner = new AsyncCallable() { @Override public ListenableFuture call() throws Exception { assertTrue(futureInteger.isDone()); assertTrue(futureBoolean.isDone()); return immediateFailedFuture(thrown); } }; ListenableFuture futureResult = whenAllComplete(futureInteger, futureBoolean).callAsync(combiner, directExecutor()); Integer integerPartial = 1; futureInteger.set(integerPartial); Boolean booleanPartial = true; futureBoolean.set(booleanPartial); try { getDone(futureResult); fail(); } catch (ExecutionException expected) { assertSame(thrown, expected.getCause()); } } @GwtIncompatible // threads public void testWhenAllComplete_cancelledNotInterrupted() throws Exception { SettableFuture stringFuture = SettableFuture.create(); SettableFuture booleanFuture = SettableFuture.create(); final CountDownLatch inFunction = new CountDownLatch(1); final CountDownLatch shouldCompleteFunction = new CountDownLatch(1); final SettableFuture resultFuture = SettableFuture.create(); AsyncCallable combiner = new AsyncCallable() { @Override public ListenableFuture call() throws Exception { inFunction.countDown(); shouldCompleteFunction.await(); return resultFuture; } }; ListenableFuture futureResult = whenAllComplete(stringFuture, booleanFuture).callAsync(combiner, newSingleThreadExecutor()); stringFuture.set("value"); booleanFuture.set(true); inFunction.await(); futureResult.cancel(false); shouldCompleteFunction.countDown(); try { futureResult.get(); fail(); } catch (CancellationException expected) { } try { resultFuture.get(); fail(); } catch (CancellationException expected) { } } @GwtIncompatible // threads public void testWhenAllComplete_interrupted() throws Exception { SettableFuture stringFuture = SettableFuture.create(); SettableFuture booleanFuture = SettableFuture.create(); final CountDownLatch inFunction = new CountDownLatch(1); final CountDownLatch gotException = new CountDownLatch(1); AsyncCallable combiner = new AsyncCallable() { @Override public ListenableFuture call() throws Exception { inFunction.countDown(); try { new CountDownLatch(1).await(); // wait for interrupt } catch (InterruptedException expected) { gotException.countDown(); throw expected; } return immediateFuture("a"); } }; ListenableFuture futureResult = whenAllComplete(stringFuture, booleanFuture).callAsync(combiner, newSingleThreadExecutor()); stringFuture.set("value"); booleanFuture.set(true); inFunction.await(); futureResult.cancel(true); try { futureResult.get(); fail(); } catch (CancellationException expected) { } gotException.await(); } public void testWhenAllComplete_runnableResult() throws Exception { final SettableFuture futureInteger = SettableFuture.create(); final SettableFuture futureBoolean = SettableFuture.create(); final String[] result = new String[1]; Runnable combiner = new Runnable() { @Override public void run() { assertTrue(futureInteger.isDone()); assertTrue(futureBoolean.isDone()); result[0] = createCombinedResult( Futures.getUnchecked(futureInteger), Futures.getUnchecked(futureBoolean)); } }; ListenableFuture futureResult = whenAllComplete(futureInteger, futureBoolean).run(combiner, directExecutor()); Integer integerPartial = 1; futureInteger.set(integerPartial); Boolean booleanPartial = true; futureBoolean.set(booleanPartial); futureResult.get(); assertEquals(createCombinedResult(integerPartial, booleanPartial), result[0]); } public void testWhenAllComplete_runnableError() throws Exception { final RuntimeException thrown = new RuntimeException("test"); final SettableFuture futureInteger = SettableFuture.create(); final SettableFuture futureBoolean = SettableFuture.create(); Runnable combiner = new Runnable() { @Override public void run() { assertTrue(futureInteger.isDone()); assertTrue(futureBoolean.isDone()); throw thrown; } }; ListenableFuture futureResult = whenAllComplete(futureInteger, futureBoolean).run(combiner, directExecutor()); Integer integerPartial = 1; futureInteger.set(integerPartial); Boolean booleanPartial = true; futureBoolean.set(booleanPartial); try { getDone(futureResult); fail(); } catch (ExecutionException expected) { assertSame(thrown, expected.getCause()); } } @GwtIncompatible // threads public void testWhenAllCompleteRunnable_resultCanceledWithoutInterrupt_doesNotInterruptRunnable() throws Exception { SettableFuture stringFuture = SettableFuture.create(); SettableFuture booleanFuture = SettableFuture.create(); final CountDownLatch inFunction = new CountDownLatch(1); final CountDownLatch shouldCompleteFunction = new CountDownLatch(1); final CountDownLatch combinerCompletedWithoutInterrupt = new CountDownLatch(1); Runnable combiner = new Runnable() { @Override public void run() { inFunction.countDown(); try { shouldCompleteFunction.await(); combinerCompletedWithoutInterrupt.countDown(); } catch (InterruptedException e) { // Ensure the thread's interrupt status is preserved. Thread.currentThread().interrupt(); throw new RuntimeException(e); } } }; ListenableFuture futureResult = whenAllComplete(stringFuture, booleanFuture).run(combiner, newSingleThreadExecutor()); stringFuture.set("value"); booleanFuture.set(true); inFunction.await(); futureResult.cancel(false); shouldCompleteFunction.countDown(); try { futureResult.get(); fail(); } catch (CancellationException expected) { } combinerCompletedWithoutInterrupt.await(); } @GwtIncompatible // threads public void testWhenAllCompleteRunnable_resultCanceledWithInterrupt_InterruptsRunnable() throws Exception { SettableFuture stringFuture = SettableFuture.create(); SettableFuture booleanFuture = SettableFuture.create(); final CountDownLatch inFunction = new CountDownLatch(1); final CountDownLatch gotException = new CountDownLatch(1); Runnable combiner = new Runnable() { @Override public void run() { inFunction.countDown(); try { new CountDownLatch(1).await(); // wait for interrupt } catch (InterruptedException expected) { // Ensure the thread's interrupt status is preserved. Thread.currentThread().interrupt(); gotException.countDown(); } } }; ListenableFuture futureResult = whenAllComplete(stringFuture, booleanFuture).run(combiner, newSingleThreadExecutor()); stringFuture.set("value"); booleanFuture.set(true); inFunction.await(); futureResult.cancel(true); try { futureResult.get(); fail(); } catch (CancellationException expected) { } gotException.await(); } public void testWhenAllSucceed() throws Exception { class PartialResultException extends Exception {} final SettableFuture futureInteger = SettableFuture.create(); final SettableFuture futureBoolean = SettableFuture.create(); AsyncCallable combiner = new AsyncCallable() { @Override public ListenableFuture call() throws Exception { throw new AssertionFailedError("AsyncCallable should not have been called."); } }; ListenableFuture futureResult = whenAllSucceed(futureInteger, futureBoolean).callAsync(combiner, directExecutor()); PartialResultException partialResultException = new PartialResultException(); futureInteger.setException(partialResultException); Boolean booleanPartial = true; futureBoolean.set(booleanPartial); try { getDone(futureResult); fail(); } catch (ExecutionException expected) { assertSame(partialResultException, expected.getCause()); } } @AndroidIncompatible @GwtIncompatible public void testWhenAllSucceed_releasesInputFuturesUponSubmission() throws Exception { SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); WeakReference> future1Ref = new WeakReference<>(future1); WeakReference> future2Ref = new WeakReference<>(future2); Callable combiner = new Callable() { @Override public Long call() { throw new AssertionError(); } }; ListenableFuture unused = whenAllSucceed(future1, future2).call(combiner, noOpScheduledExecutor()); future1.set(1L); future1 = null; future2.set(2L); future2 = null; /* * Futures should be collected even if combiner never runs. This is kind of a silly test, since * the combiner is almost certain to hold its own reference to the futures, and a real app would * hold a reference to the executor and thus to the combiner. What we really care about is that * the futures are released once the combiner is done running. But we happen to provide this * earlier cleanup at the moment, so we're testing it. */ GcFinalization.awaitClear(future1Ref); GcFinalization.awaitClear(future2Ref); } @AndroidIncompatible @GwtIncompatible public void testWhenAllComplete_releasesInputFuturesUponCancellation() throws Exception { SettableFuture future = SettableFuture.create(); WeakReference> futureRef = new WeakReference<>(future); Callable combiner = new Callable() { @Override public Long call() { throw new AssertionError(); } }; ListenableFuture unused = whenAllComplete(future).call(combiner, noOpScheduledExecutor()); unused.cancel(false); future = null; // Future should be collected because whenAll*Complete* doesn't need to look at its result. GcFinalization.awaitClear(futureRef); } @AndroidIncompatible @GwtIncompatible public void testWhenAllSucceed_releasesCallable() throws Exception { AsyncCallable combiner = new AsyncCallable() { @Override public ListenableFuture call() { return SettableFuture.create(); } }; WeakReference> combinerRef = new WeakReference<>(combiner); ListenableFuture unused = whenAllSucceed(immediateFuture(1L)).callAsync(combiner, directExecutor()); combiner = null; // combiner should be collected even if the future it returns never completes. GcFinalization.awaitClear(combinerRef); } /* * TODO(cpovirk): maybe pass around TestFuture instances instead of * ListenableFuture instances */ /** * A future in {@link TestFutureBatch} that also has a name for debugging purposes and a {@code * finisher}, a task that will complete the future in some fashion when it is called, allowing for * testing both before and after the completion of the future. */ @GwtIncompatible // used only in GwtIncompatible tests private static final class TestFuture { final ListenableFuture future; final String name; final Runnable finisher; TestFuture(ListenableFuture future, String name, Runnable finisher) { this.future = future; this.name = name; this.finisher = finisher; } } /** * A collection of several futures, covering cancellation, success, and failure (both {@link * ExecutionException} and {@link RuntimeException}), both immediate and delayed. We use each * possible pair of these futures in {@link FuturesTest#runExtensiveMergerTest}. * *

Each test requires a new {@link TestFutureBatch} because we need new delayed futures each * time, as the old delayed futures were completed as part of the old test. */ @GwtIncompatible // used only in GwtIncompatible tests private static final class TestFutureBatch { final ListenableFuture doneSuccess = immediateFuture("a"); final ListenableFuture doneFailed = immediateFailedFuture(new Exception()); final SettableFuture doneCancelled = SettableFuture.create(); { doneCancelled.cancel(true); } final ListenableFuture doneRuntimeException = new ForwardingListenableFuture() { final ListenableFuture delegate = immediateFuture("Should never be seen"); @Override protected ListenableFuture delegate() { return delegate; } @Override public String get() { throw new RuntimeException(); } @Override public String get(long timeout, TimeUnit unit) { throw new RuntimeException(); } }; final SettableFuture delayedSuccess = SettableFuture.create(); final SettableFuture delayedFailed = SettableFuture.create(); final SettableFuture delayedCancelled = SettableFuture.create(); final SettableFuture delegateForDelayedRuntimeException = SettableFuture.create(); final ListenableFuture delayedRuntimeException = new ForwardingListenableFuture() { @Override protected ListenableFuture delegate() { return delegateForDelayedRuntimeException; } @Override public String get() throws ExecutionException, InterruptedException { delegateForDelayedRuntimeException.get(); throw new RuntimeException(); } @Override public String get(long timeout, TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException { delegateForDelayedRuntimeException.get(timeout, unit); throw new RuntimeException(); } }; final Runnable doNothing = new Runnable() { @Override public void run() {} }; final Runnable finishSuccess = new Runnable() { @Override public void run() { delayedSuccess.set("b"); } }; final Runnable finishFailure = new Runnable() { @Override public void run() { delayedFailed.setException(new Exception()); } }; final Runnable finishCancelled = new Runnable() { @Override public void run() { delayedCancelled.cancel(true); } }; final Runnable finishRuntimeException = new Runnable() { @Override public void run() { delegateForDelayedRuntimeException.set("Should never be seen"); } }; /** All the futures, together with human-readable names for use by {@link #smartToString}. */ final ImmutableList allFutures = ImmutableList.of( new TestFuture(doneSuccess, "doneSuccess", doNothing), new TestFuture(doneFailed, "doneFailed", doNothing), new TestFuture(doneCancelled, "doneCancelled", doNothing), new TestFuture(doneRuntimeException, "doneRuntimeException", doNothing), new TestFuture(delayedSuccess, "delayedSuccess", finishSuccess), new TestFuture(delayedFailed, "delayedFailed", finishFailure), new TestFuture(delayedCancelled, "delayedCancelled", finishCancelled), new TestFuture( delayedRuntimeException, "delayedRuntimeException", finishRuntimeException)); final Function, String> nameGetter = new Function, String>() { @Override public String apply(ListenableFuture input) { for (TestFuture future : allFutures) { if (future.future == input) { return future.name; } } throw new IllegalArgumentException(input.toString()); } }; static boolean intersect(Set a, Set b) { return !intersection(a, b).isEmpty(); } /** * Like {@code inputs.toString()}, but with the nonsense {@code toString} representations * replaced with the name of each future from {@link #allFutures}. */ String smartToString(ImmutableSet> inputs) { Iterable inputNames = Iterables.transform(inputs, nameGetter); return Joiner.on(", ").join(inputNames); } void smartAssertTrue( ImmutableSet> inputs, Exception cause, boolean expression) { if (!expression) { throw failureWithCause(cause, smartToString(inputs)); } } boolean hasDelayed(ListenableFuture a, ListenableFuture b) { ImmutableSet> inputs = ImmutableSet.of(a, b); return intersect( inputs, ImmutableSet.of( delayedSuccess, delayedFailed, delayedCancelled, delayedRuntimeException)); } void assertHasDelayed(ListenableFuture a, ListenableFuture b, Exception e) { ImmutableSet> inputs = ImmutableSet.of(a, b); smartAssertTrue(inputs, e, hasDelayed(a, b)); } void assertHasFailure(ListenableFuture a, ListenableFuture b, Exception e) { ImmutableSet> inputs = ImmutableSet.of(a, b); smartAssertTrue( inputs, e, intersect( inputs, ImmutableSet.of( doneFailed, doneRuntimeException, delayedFailed, delayedRuntimeException))); } void assertHasCancel(ListenableFuture a, ListenableFuture b, Exception e) { ImmutableSet> inputs = ImmutableSet.of(a, b); smartAssertTrue( inputs, e, intersect(inputs, ImmutableSet.of(doneCancelled, delayedCancelled))); } void assertHasImmediateFailure( ListenableFuture a, ListenableFuture b, Exception e) { ImmutableSet> inputs = ImmutableSet.of(a, b); smartAssertTrue( inputs, e, intersect(inputs, ImmutableSet.of(doneFailed, doneRuntimeException))); } void assertHasImmediateCancel( ListenableFuture a, ListenableFuture b, Exception e) { ImmutableSet> inputs = ImmutableSet.of(a, b); smartAssertTrue(inputs, e, intersect(inputs, ImmutableSet.of(doneCancelled))); } } /** * {@link Futures#allAsList(Iterable)} or {@link Futures#successfulAsList(Iterable)}, hidden * behind a common interface for testing. */ @GwtIncompatible // used only in GwtIncompatible tests private interface Merger { ListenableFuture> merged(ListenableFuture a, ListenableFuture b); Merger allMerger = new Merger() { @Override public ListenableFuture> merged( ListenableFuture a, ListenableFuture b) { return allAsList(ImmutableSet.of(a, b)); } }; Merger successMerger = new Merger() { @Override public ListenableFuture> merged( ListenableFuture a, ListenableFuture b) { return successfulAsList(ImmutableSet.of(a, b)); } }; } /** * Very rough equivalent of a timed get, produced by calling the no-arg get method in another * thread and waiting a short time for it. * *

We need this to test the behavior of no-arg get methods without hanging the main test thread * forever in the case of failure. */ @CanIgnoreReturnValue @GwtIncompatible // threads static V pseudoTimedGetUninterruptibly(final Future input, long timeout, TimeUnit unit) throws ExecutionException, TimeoutException { ExecutorService executor = newSingleThreadExecutor(); Future waiter = executor.submit( new Callable() { @Override public V call() throws Exception { return input.get(); } }); try { return getUninterruptibly(waiter, timeout, unit); } catch (ExecutionException e) { propagateIfInstanceOf(e.getCause(), ExecutionException.class); propagateIfInstanceOf(e.getCause(), CancellationException.class); throw failureWithCause(e, "Unexpected exception"); } finally { executor.shutdownNow(); // TODO(cpovirk): assertTrue(awaitTerminationUninterruptibly(executor, 10, SECONDS)); } } /** * For each possible pair of futures from {@link TestFutureBatch}, for each possible completion * order of those futures, test that various get calls (timed before future completion, untimed * before future completion, and untimed after future completion) return or throw the proper * values. */ @GwtIncompatible // used only in GwtIncompatible tests private static void runExtensiveMergerTest(Merger merger) throws InterruptedException { int inputCount = new TestFutureBatch().allFutures.size(); for (int i = 0; i < inputCount; i++) { for (int j = 0; j < inputCount; j++) { for (boolean iBeforeJ : new boolean[] {true, false}) { TestFutureBatch inputs = new TestFutureBatch(); ListenableFuture iFuture = inputs.allFutures.get(i).future; ListenableFuture jFuture = inputs.allFutures.get(j).future; ListenableFuture> future = merger.merged(iFuture, jFuture); // Test timed get before we've completed any delayed futures. try { List result = future.get(0, MILLISECONDS); assertTrue("Got " + result, asList("a", null).containsAll(result)); } catch (CancellationException e) { assertTrue(merger == Merger.allMerger); inputs.assertHasImmediateCancel(iFuture, jFuture, e); } catch (ExecutionException e) { assertTrue(merger == Merger.allMerger); inputs.assertHasImmediateFailure(iFuture, jFuture, e); } catch (TimeoutException e) { inputs.assertHasDelayed(iFuture, jFuture, e); } // Same tests with pseudoTimedGet. try { List result = conditionalPseudoTimedGetUninterruptibly( inputs, iFuture, jFuture, future, 20, MILLISECONDS); assertTrue("Got " + result, asList("a", null).containsAll(result)); } catch (CancellationException e) { assertTrue(merger == Merger.allMerger); inputs.assertHasImmediateCancel(iFuture, jFuture, e); } catch (ExecutionException e) { assertTrue(merger == Merger.allMerger); inputs.assertHasImmediateFailure(iFuture, jFuture, e); } catch (TimeoutException e) { inputs.assertHasDelayed(iFuture, jFuture, e); } // Finish the two futures in the currently specified order: inputs.allFutures.get(iBeforeJ ? i : j).finisher.run(); inputs.allFutures.get(iBeforeJ ? j : i).finisher.run(); // Test untimed get now that we've completed any delayed futures. try { List result = getDone(future); assertTrue("Got " + result, asList("a", "b", null).containsAll(result)); } catch (CancellationException e) { assertTrue(merger == Merger.allMerger); inputs.assertHasCancel(iFuture, jFuture, e); } catch (ExecutionException e) { assertTrue(merger == Merger.allMerger); inputs.assertHasFailure(iFuture, jFuture, e); } } } } } /** * Call the non-timed {@link Future#get()} in a way that allows us to abort if it's expected to * hang forever. More precisely, if it's expected to return, we simply call it[*], but if it's * expected to hang (because one of the input futures that we know makes it up isn't done yet), * then we call it in a separate thread (using pseudoTimedGet). The result is that we wait as long * as necessary when the method is expected to return (at the cost of hanging forever if there is * a bug in the class under test) but that we time out fairly promptly when the method is expected * to hang (possibly too quickly, but too-quick failures should be very unlikely, given that we * used to bail after 20ms during the expected-successful tests, and there we saw a failure rate * of ~1/5000, meaning that the other thread's get() call nearly always completes within 20ms if * it's going to complete at all). * *

[*] To avoid hangs, I've disabled the in-thread calls. This makes the test take (very * roughly) 2.5s longer. (2.5s is also the maximum length of time we will wait for a timed get * that is expected to succeed; the fact that the numbers match is only a coincidence.) See the * comment below for how to restore the fast but hang-y version. */ @GwtIncompatible // used only in GwtIncompatible tests private static List conditionalPseudoTimedGetUninterruptibly( TestFutureBatch inputs, ListenableFuture iFuture, ListenableFuture jFuture, ListenableFuture> future, int timeout, TimeUnit unit) throws ExecutionException, TimeoutException { /* * For faster tests (that may hang indefinitely if the class under test has * a bug!), switch the second branch to call untimed future.get() instead of * pseudoTimedGet. */ return (inputs.hasDelayed(iFuture, jFuture)) ? pseudoTimedGetUninterruptibly(future, timeout, unit) : pseudoTimedGetUninterruptibly(future, 2500, MILLISECONDS); } @GwtIncompatible // threads public void testAllAsList_extensive() throws InterruptedException { runExtensiveMergerTest(Merger.allMerger); } @GwtIncompatible // threads public void testSuccessfulAsList_extensive() throws InterruptedException { runExtensiveMergerTest(Merger.successMerger); } public void testSuccessfulAsList() throws Exception { // Create input and output SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); SettableFuture future3 = SettableFuture.create(); @SuppressWarnings("unchecked") // array is never modified ListenableFuture> compound = successfulAsList(future1, future2, future3); // Attach a listener SingleCallListener listener = new SingleCallListener(); compound.addListener(listener, directExecutor()); // Satisfy each input and check the output assertFalse(compound.isDone()); future1.set(DATA1); assertFalse(compound.isDone()); future2.set(DATA2); assertFalse(compound.isDone()); listener.expectCall(); future3.set(DATA3); assertTrue(listener.wasCalled()); List results = getDone(compound); assertThat(results).containsExactly(DATA1, DATA2, DATA3).inOrder(); } public void testSuccessfulAsList_emptyList() throws Exception { SingleCallListener listener = new SingleCallListener(); listener.expectCall(); List> futures = ImmutableList.of(); ListenableFuture> compound = successfulAsList(futures); compound.addListener(listener, directExecutor()); assertThat(getDone(compound)).isEmpty(); assertTrue(listener.wasCalled()); } public void testSuccessfulAsList_emptyArray() throws Exception { SingleCallListener listener = new SingleCallListener(); listener.expectCall(); @SuppressWarnings("unchecked") // array is never modified ListenableFuture> compound = successfulAsList(); compound.addListener(listener, directExecutor()); assertThat(getDone(compound)).isEmpty(); assertTrue(listener.wasCalled()); } public void testSuccessfulAsList_partialFailure() throws Exception { SingleCallListener listener = new SingleCallListener(); SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); @SuppressWarnings("unchecked") // array is never modified ListenableFuture> compound = successfulAsList(future1, future2); compound.addListener(listener, directExecutor()); assertFalse(compound.isDone()); future1.setException(new Throwable("failed1")); assertFalse(compound.isDone()); listener.expectCall(); future2.set(DATA2); assertTrue(listener.wasCalled()); List results = getDone(compound); assertThat(results).containsExactly(null, DATA2).inOrder(); } public void testSuccessfulAsList_totalFailure() throws Exception { SingleCallListener listener = new SingleCallListener(); SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); @SuppressWarnings("unchecked") // array is never modified ListenableFuture> compound = successfulAsList(future1, future2); compound.addListener(listener, directExecutor()); assertFalse(compound.isDone()); future1.setException(new Throwable("failed1")); assertFalse(compound.isDone()); listener.expectCall(); future2.setException(new Throwable("failed2")); assertTrue(listener.wasCalled()); List results = getDone(compound); assertThat(results).containsExactly(null, null).inOrder(); } public void testSuccessfulAsList_cancelled() throws Exception { SingleCallListener listener = new SingleCallListener(); SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); @SuppressWarnings("unchecked") // array is never modified ListenableFuture> compound = successfulAsList(future1, future2); compound.addListener(listener, directExecutor()); assertFalse(compound.isDone()); future1.cancel(true); assertFalse(compound.isDone()); listener.expectCall(); future2.set(DATA2); assertTrue(listener.wasCalled()); List results = getDone(compound); assertThat(results).containsExactly(null, DATA2).inOrder(); } public void testSuccessfulAsList_resultCancelled() throws Exception { SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); @SuppressWarnings("unchecked") // array is never modified ListenableFuture> compound = successfulAsList(future1, future2); future2.set(DATA2); assertFalse(compound.isDone()); assertTrue(compound.cancel(false)); assertTrue(compound.isCancelled()); assertTrue(future1.isCancelled()); assertFalse(future1.wasInterrupted()); } public void testSuccessfulAsList_resultCancelledRacingInputDone() throws Exception { TestLogHandler listenerLoggerHandler = new TestLogHandler(); Logger exceptionLogger = Logger.getLogger(AbstractFuture.class.getName()); exceptionLogger.addHandler(listenerLoggerHandler); try { doTestSuccessfulAsList_resultCancelledRacingInputDone(); assertWithMessage("Nothing should be logged") .that(listenerLoggerHandler.getStoredLogRecords()) .isEmpty(); } finally { exceptionLogger.removeHandler(listenerLoggerHandler); } } private static void doTestSuccessfulAsList_resultCancelledRacingInputDone() throws Exception { // Simple (combined.cancel -> input.cancel -> setOneValue): successfulAsList(ImmutableList.of(SettableFuture.create())).cancel(true); /* * Complex (combined.cancel -> input.cancel -> other.set -> setOneValue), * to show that this isn't just about problems with the input future we just * cancelled: */ final SettableFuture future1 = SettableFuture.create(); final SettableFuture future2 = SettableFuture.create(); @SuppressWarnings("unchecked") // array is never modified ListenableFuture> compound = successfulAsList(future1, future2); future1.addListener( new Runnable() { @Override public void run() { assertTrue(future1.isCancelled()); /* * This test relies on behavior that's unspecified but currently * guaranteed by the implementation: Cancellation of inputs is * performed in the order they were provided to the constructor. Verify * that as a sanity check: */ assertFalse(future2.isCancelled()); // Now attempt to trigger the exception: future2.set(DATA2); } }, directExecutor()); assertTrue(compound.cancel(false)); assertTrue(compound.isCancelled()); assertTrue(future1.isCancelled()); assertFalse(future2.isCancelled()); try { getDone(compound); fail(); } catch (CancellationException expected) { } } public void testSuccessfulAsList_resultInterrupted() throws Exception { SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); @SuppressWarnings("unchecked") // array is never modified ListenableFuture> compound = successfulAsList(future1, future2); future2.set(DATA2); assertFalse(compound.isDone()); assertTrue(compound.cancel(true)); assertTrue(compound.isCancelled()); assertTrue(future1.isCancelled()); assertTrue(future1.wasInterrupted()); } public void testSuccessfulAsList_mixed() throws Exception { SingleCallListener listener = new SingleCallListener(); SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); SettableFuture future3 = SettableFuture.create(); @SuppressWarnings("unchecked") // array is never modified ListenableFuture> compound = successfulAsList(future1, future2, future3); compound.addListener(listener, directExecutor()); // First is cancelled, second fails, third succeeds assertFalse(compound.isDone()); future1.cancel(true); assertFalse(compound.isDone()); future2.setException(new Throwable("failed2")); assertFalse(compound.isDone()); listener.expectCall(); future3.set(DATA3); assertTrue(listener.wasCalled()); List results = getDone(compound); assertThat(results).containsExactly(null, null, DATA3).inOrder(); } /** Non-Error exceptions are never logged. */ @SuppressWarnings("unchecked") public void testSuccessfulAsList_logging_exception() throws Exception { assertEquals( newArrayList((Object) null), getDone(successfulAsList(immediateFailedFuture(new MyException())))); assertWithMessage("Nothing should be logged") .that(aggregateFutureLogHandler.getStoredLogRecords()) .isEmpty(); // Not even if there are a bunch of failures. assertEquals( newArrayList(null, null, null), getDone( successfulAsList( immediateFailedFuture(new MyException()), immediateFailedFuture(new MyException()), immediateFailedFuture(new MyException())))); assertWithMessage("Nothing should be logged") .that(aggregateFutureLogHandler.getStoredLogRecords()) .isEmpty(); } /** Ensure that errors are always logged. */ @SuppressWarnings("unchecked") public void testSuccessfulAsList_logging_error() throws Exception { assertEquals( newArrayList((Object) null), getDone(successfulAsList(immediateFailedFuture(new MyError())))); List logged = aggregateFutureLogHandler.getStoredLogRecords(); assertThat(logged).hasSize(1); // errors are always logged assertThat(logged.get(0).getThrown()).isInstanceOf(MyError.class); } public void testSuccessfulAsList_failureLoggedEvenAfterOutputCancelled() throws Exception { ListenableFuture input = new CancelPanickingFuture<>(); ListenableFuture> output = successfulAsList(input); output.cancel(false); List logged = aggregateFutureLogHandler.getStoredLogRecords(); assertThat(logged).hasSize(1); assertThat(logged.get(0).getThrown()).hasMessageThat().isEqualTo("You can't fire me, I quit."); } private static final class CancelPanickingFuture extends AbstractFuture { @Override public boolean cancel(boolean mayInterruptIfRunning) { setException(new Error("You can't fire me, I quit.")); return false; } } public void testNonCancellationPropagating_successful() throws Exception { SettableFuture input = SettableFuture.create(); ListenableFuture wrapper = nonCancellationPropagating(input); Foo foo = new Foo(); assertFalse(wrapper.isDone()); input.set(foo); assertTrue(wrapper.isDone()); assertSame(foo, getDone(wrapper)); } public void testNonCancellationPropagating_failure() throws Exception { SettableFuture input = SettableFuture.create(); ListenableFuture wrapper = nonCancellationPropagating(input); Throwable failure = new Throwable("thrown"); assertFalse(wrapper.isDone()); input.setException(failure); try { getDone(wrapper); fail(); } catch (ExecutionException expected) { assertSame(failure, expected.getCause()); } } public void testNonCancellationPropagating_delegateCancelled() throws Exception { SettableFuture input = SettableFuture.create(); ListenableFuture wrapper = nonCancellationPropagating(input); assertFalse(wrapper.isDone()); assertTrue(input.cancel(false)); assertTrue(wrapper.isCancelled()); } public void testNonCancellationPropagating_doesNotPropagate() throws Exception { SettableFuture input = SettableFuture.create(); ListenableFuture wrapper = nonCancellationPropagating(input); assertTrue(wrapper.cancel(true)); assertTrue(wrapper.isCancelled()); assertTrue(wrapper.isDone()); assertFalse(input.isCancelled()); assertFalse(input.isDone()); } @GwtIncompatible // used only in GwtIncompatible tests private static class TestException extends Exception { TestException(@Nullable Throwable cause) { super(cause); } } @GwtIncompatible // used only in GwtIncompatible tests private interface MapperFunction extends Function {} public void testCompletionOrder() throws Exception { SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); SettableFuture future3 = SettableFuture.create(); SettableFuture future4 = SettableFuture.create(); SettableFuture future5 = SettableFuture.create(); ImmutableList> futures = inCompletionOrder( ImmutableList.>of(future1, future2, future3, future4, future5)); future2.set(1L); future5.set(2L); future1.set(3L); future3.set(4L); future4.set(5L); long expectedResult = 1L; for (ListenableFuture future : futures) { assertEquals((Long) expectedResult, getDone(future)); expectedResult++; } } public void testCompletionOrderExceptionThrown() throws Exception { SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); SettableFuture future3 = SettableFuture.create(); SettableFuture future4 = SettableFuture.create(); SettableFuture future5 = SettableFuture.create(); ImmutableList> futures = inCompletionOrder( ImmutableList.>of(future1, future2, future3, future4, future5)); future2.set(1L); future5.setException(new IllegalStateException("2L")); future1.set(3L); future3.set(4L); future4.set(5L); long expectedResult = 1L; for (ListenableFuture future : futures) { if (expectedResult != 2) { assertEquals((Long) expectedResult, getDone(future)); } else { try { getDone(future); fail(); } catch (ExecutionException expected) { assertThat(expected).hasCauseThat().hasMessageThat().isEqualTo("2L"); } } expectedResult++; } } public void testCompletionOrderFutureCancelled() throws Exception { SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); SettableFuture future3 = SettableFuture.create(); SettableFuture future4 = SettableFuture.create(); SettableFuture future5 = SettableFuture.create(); ImmutableList> futures = inCompletionOrder( ImmutableList.>of(future1, future2, future3, future4, future5)); future2.set(1L); future5.set(2L); future1.set(3L); future3.cancel(true); future4.set(5L); long expectedResult = 1L; for (ListenableFuture future : futures) { if (expectedResult != 4) { assertEquals((Long) expectedResult, getDone(future)); } else { try { getDone(future); fail(); } catch (CancellationException expected) { } } expectedResult++; } } public void testCompletionOrderFutureInterruption() throws Exception { SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); SettableFuture future3 = SettableFuture.create(); ImmutableList> futures = inCompletionOrder(ImmutableList.>of(future1, future2, future3)); future2.set(1L); futures.get(1).cancel(true); futures.get(2).cancel(false); assertTrue(future1.isCancelled()); assertFalse(future1.wasInterrupted()); assertTrue(future3.isCancelled()); assertFalse(future3.wasInterrupted()); } public void testCancellingADelegatePropagates() throws Exception { SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); SettableFuture future3 = SettableFuture.create(); ImmutableList> delegates = inCompletionOrder(ImmutableList.>of(future1, future2, future3)); future1.set(1L); // Cannot cancel a complete delegate assertFalse(delegates.get(0).cancel(true)); // Cancel the delegate before the input future is done assertTrue(delegates.get(1).cancel(true)); // Setting the future still works since cancellation didn't propagate assertTrue(future2.set(2L)); // Second check to ensure the input future was not cancelled assertEquals((Long) 2L, getDone(future2)); // All futures are now complete; outstanding inputs are cancelled assertTrue(future3.isCancelled()); assertTrue(future3.wasInterrupted()); } @AndroidIncompatible // runs out of memory under some versions of the emulator public void testCancellingAllDelegatesIsNotQuadratic() throws Exception { ImmutableList.Builder> builder = ImmutableList.builder(); for (int i = 0; i < 500_000; i++) { builder.add(SettableFuture.create()); } ImmutableList> inputs = builder.build(); ImmutableList> delegates = inCompletionOrder(inputs); for (ListenableFuture delegate : delegates) { delegate.cancel(true); } for (ListenableFuture input : inputs) { assertTrue(input.isDone()); } } @AndroidIncompatible // reference is never cleared under some versions of the emulator @GwtIncompatible public void testInputGCedIfUnreferenced() throws Exception { SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); WeakReference> future1Ref = new WeakReference<>(future1); WeakReference> future2Ref = new WeakReference<>(future2); ImmutableList> delegates = inCompletionOrder(ImmutableList.>of(future1, future2)); future1.set(1L); future1 = null; // First future is complete, should be unreferenced GcFinalization.awaitClear(future1Ref); ListenableFuture outputFuture1 = delegates.get(0); delegates = null; future2 = null; // No references to list or other output future, second future should be unreferenced GcFinalization.awaitClear(future2Ref); outputFuture1.get(); } // Mostly an example of how it would look like to use a list of mixed types public void testCompletionOrderMixedBagOTypes() throws Exception { SettableFuture future1 = SettableFuture.create(); SettableFuture future2 = SettableFuture.create(); SettableFuture future3 = SettableFuture.create(); ImmutableList> inputs = ImmutableList.>of(future1, future2, future3); ImmutableList> futures = inCompletionOrder(inputs); future2.set("1L"); future1.set(2L); future3.set(3); ImmutableList expected = ImmutableList.of("1L", 2L, 3); for (int i = 0; i < expected.size(); i++) { assertEquals(expected.get(i), getDone(futures.get(i))); } } @GwtIncompatible // ClassSanityTester public void testFutures_nullChecks() throws Exception { new ClassSanityTester() .forAllPublicStaticMethods(Futures.class) .thatReturn(Future.class) .testNulls(); } static AssertionFailedError failureWithCause(Throwable cause, String message) { AssertionFailedError failure = new AssertionFailedError(message); failure.initCause(cause); return failure; } // This test covers a bug where an Error thrown from a callback could cause the TimeoutFuture to // never complete when timing out. Notably, nothing would get logged since the Error would get // stuck in the ScheduledFuture inside of TimeoutFuture and nothing ever calls get on it. // Simulate a timeout that fires before the call the SES.schedule returns but the future is // already completed. // This test covers a bug where an Error thrown from a callback could cause the TimeoutFuture to // never complete when timing out. Notably, nothing would get logged since the Error would get // stuck in the ScheduledFuture inside of TimeoutFuture and nothing ever calls get on it. private static final Executor REJECTING_EXECUTOR = new Executor() { @Override public void execute(Runnable runnable) { throw new RejectedExecutionException(); } }; private static AsyncFunction asyncIdentity() { return new AsyncFunction() { @Override public ListenableFuture apply(V input) { return immediateFuture(input); } }; } private static AsyncFunction tagged( final String toString, final AsyncFunction function) { return new AsyncFunction() { @Override public ListenableFuture apply(I input) throws Exception { return function.apply(input); } @Override public String toString() { return toString; } }; } private static AsyncCallable tagged( final String toString, final AsyncCallable callable) { return new AsyncCallable() { @Override public ListenableFuture call() throws Exception { return callable.call(); } @Override public String toString() { return toString; } }; } }