1 /*
2  * Copyright (C) 2018 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5  * in compliance with the License. You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software distributed under the License
10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11  * or implied. See the License for the specific language governing permissions and limitations under
12  * the License.
13  */
14 
15 package com.google.common.util.concurrent;
16 
17 import static com.google.common.truth.Truth.assertThat;
18 import static com.google.common.util.concurrent.Futures.allAsList;
19 import static com.google.common.util.concurrent.Futures.getDone;
20 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
21 import static java.util.concurrent.TimeUnit.SECONDS;
22 
23 import com.google.common.annotations.GwtIncompatible;
24 import com.google.common.base.Function;
25 import com.google.common.testing.GcFinalization;
26 import com.google.common.testing.TestLogHandler;
27 import com.google.j2objc.annotations.J2ObjCIncompatible;
28 import java.lang.ref.WeakReference;
29 import java.util.ArrayList;
30 import java.util.List;
31 import java.util.concurrent.Callable;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.TimeUnit;
38 import java.util.logging.Logger;
39 import junit.framework.TestCase;
40 
41 /** Tests for {@link ExecutionSequencer} */
42 public class ExecutionSequencerTest extends TestCase {
43 
44   ExecutorService executor;
45 
46   private ExecutionSequencer serializer;
47   private SettableFuture<Void> firstFuture;
48   private TestCallable firstCallable;
49 
50   @Override
setUp()51   public void setUp() throws Exception {
52     executor = Executors.newCachedThreadPool();
53     serializer = ExecutionSequencer.create();
54     firstFuture = SettableFuture.create();
55     firstCallable = new TestCallable(firstFuture);
56   }
57 
58   @Override
tearDown()59   public void tearDown() throws Exception {
60     executor.shutdown();
61   }
62 
testCallableStartsAfterFirstFutureCompletes()63   public void testCallableStartsAfterFirstFutureCompletes() {
64     @SuppressWarnings({"unused", "nullness"})
65     Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor());
66     TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null));
67     @SuppressWarnings({"unused", "nullness"})
68     Future<?> possiblyIgnoredError1 = serializer.submitAsync(secondCallable, directExecutor());
69     assertThat(firstCallable.called).isTrue();
70     assertThat(secondCallable.called).isFalse();
71     firstFuture.set(null);
72     assertThat(secondCallable.called).isTrue();
73   }
74 
testCancellationDoesNotViolateSerialization()75   public void testCancellationDoesNotViolateSerialization() {
76     @SuppressWarnings({"unused", "nullness"})
77     Future<?> possiblyIgnoredError = serializer.submitAsync(firstCallable, directExecutor());
78     TestCallable secondCallable = new TestCallable(Futures.<Void>immediateFuture(null));
79     ListenableFuture<Void> secondFuture = serializer.submitAsync(secondCallable, directExecutor());
80     TestCallable thirdCallable = new TestCallable(Futures.<Void>immediateFuture(null));
81     @SuppressWarnings({"unused", "nullness"})
82     Future<?> possiblyIgnoredError1 = serializer.submitAsync(thirdCallable, directExecutor());
83     secondFuture.cancel(true);
84     assertThat(secondCallable.called).isFalse();
85     assertThat(thirdCallable.called).isFalse();
86     firstFuture.set(null);
87     assertThat(secondCallable.called).isFalse();
88     assertThat(thirdCallable.called).isTrue();
89   }
90 
testCancellationMultipleThreads()91   public void testCancellationMultipleThreads() throws Exception {
92     final BlockingCallable blockingCallable = new BlockingCallable();
93     ListenableFuture<Void> unused = serializer.submit(blockingCallable, executor);
94     ListenableFuture<Boolean> future2 =
95         serializer.submit(
96             new Callable<Boolean>() {
97               @Override
98               public Boolean call() {
99                 return blockingCallable.isRunning();
100               }
101             },
102             directExecutor());
103 
104     // Wait for the first task to be started in the background. It will block until we explicitly
105     // stop it.
106     blockingCallable.waitForStart();
107 
108     // Give the second task a chance to (incorrectly) start up while the first task is running.
109     assertThat(future2.isDone()).isFalse();
110 
111     // Stop the first task. The second task should then run.
112     blockingCallable.stop();
113     executor.shutdown();
114     assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
115     assertThat(getDone(future2)).isFalse();
116   }
117 
testSecondTaskWaitsForFirstEvenIfCancelled()118   public void testSecondTaskWaitsForFirstEvenIfCancelled() throws Exception {
119     final BlockingCallable blockingCallable = new BlockingCallable();
120     ListenableFuture<Void> future1 = serializer.submit(blockingCallable, executor);
121     ListenableFuture<Boolean> future2 =
122         serializer.submit(
123             new Callable<Boolean>() {
124               @Override
125               public Boolean call() {
126                 return blockingCallable.isRunning();
127               }
128             },
129             directExecutor());
130 
131     // Wait for the first task to be started in the background. It will block until we explicitly
132     // stop it.
133     blockingCallable.waitForStart();
134 
135     // This time, cancel the future for the first task. The task remains running, only the future
136     // is cancelled.
137     future1.cancel(false);
138 
139     // Give the second task a chance to (incorrectly) start up while the first task is running.
140     // (This is the assertion that fails.)
141     assertThat(future2.isDone()).isFalse();
142 
143     // Stop the first task. The second task should then run.
144     blockingCallable.stop();
145     executor.shutdown();
146     assertThat(executor.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
147     assertThat(getDone(future2)).isFalse();
148   }
149 
150   @GwtIncompatible
151   @J2ObjCIncompatible // gc
152   @AndroidIncompatible
testCancellationWithReferencedObject()153   public void testCancellationWithReferencedObject() throws Exception {
154     Object toBeGCed = new Object();
155     WeakReference<Object> ref = new WeakReference<>(toBeGCed);
156     final SettableFuture<Void> settableFuture = SettableFuture.create();
157     ListenableFuture<?> ignored =
158         serializer.submitAsync(
159             new AsyncCallable<Void>() {
160               @Override
161               public ListenableFuture<Void> call() {
162                 return settableFuture;
163               }
164             },
165             directExecutor());
166     serializer.submit(toStringCallable(toBeGCed), directExecutor()).cancel(true);
167     toBeGCed = null;
168     GcFinalization.awaitClear(ref);
169   }
170 
toStringCallable(final Object object)171   private static Callable<String> toStringCallable(final Object object) {
172     return new Callable<String>() {
173       @Override
174       public String call() {
175         return object.toString();
176       }
177     };
178   }
179 
180   public void testCancellationDuringReentrancy() throws Exception {
181     TestLogHandler logHandler = new TestLogHandler();
182     Logger.getLogger(AbstractFuture.class.getName()).addHandler(logHandler);
183 
184     List<Future<?>> results = new ArrayList<>();
185     final Runnable[] manualExecutorTask = new Runnable[1];
186     Executor manualExecutor =
187         new Executor() {
188           @Override
189           public void execute(Runnable task) {
190             manualExecutorTask[0] = task;
191           }
192         };
193 
194     results.add(serializer.submit(Callables.returning(null), manualExecutor));
195     final Future<?>[] thingToCancel = new Future<?>[1];
196     results.add(
197         serializer.submit(
198             new Callable<Void>() {
199               @Override
200               public Void call() {
201                 thingToCancel[0].cancel(false);
202                 return null;
203               }
204             },
205             directExecutor()));
206     thingToCancel[0] = serializer.submit(Callables.returning(null), directExecutor());
207     results.add(thingToCancel[0]);
208     // Enqueue more than enough tasks to force reentrancy.
209     for (int i = 0; i < 5; i++) {
210       results.add(serializer.submit(Callables.returning(null), directExecutor()));
211     }
212 
213     manualExecutorTask[0].run();
214 
215     for (Future<?> result : results) {
216       if (!result.isCancelled()) {
217         result.get(10, SECONDS);
218       }
219       // TODO(cpovirk): Verify that the cancelled futures are exactly ones that we expect.
220     }
221 
222     assertThat(logHandler.getStoredLogRecords()).isEmpty();
223   }
224 
225   public void testAvoidsStackOverflow_manySubmitted() throws Exception {
226     final SettableFuture<Void> settableFuture = SettableFuture.create();
227     ArrayList<ListenableFuture<Void>> results = new ArrayList<>(50_001);
228     results.add(
229         serializer.submitAsync(
230             new AsyncCallable<Void>() {
231               @Override
232               public ListenableFuture<Void> call() {
233                 return settableFuture;
234               }
235             },
236             directExecutor()));
237     for (int i = 0; i < 50_000; i++) {
238       results.add(serializer.submit(Callables.<Void>returning(null), directExecutor()));
239     }
240     settableFuture.set(null);
241     getDone(allAsList(results));
242   }
243 
244   public void testAvoidsStackOverflow_manyCancelled() throws Exception {
245     final SettableFuture<Void> settableFuture = SettableFuture.create();
246     ListenableFuture<Void> unused =
247         serializer.submitAsync(
248             new AsyncCallable<Void>() {
249               @Override
250               public ListenableFuture<Void> call() {
251                 return settableFuture;
252               }
253             },
254             directExecutor());
255     for (int i = 0; i < 50_000; i++) {
256       serializer.submit(Callables.<Void>returning(null), directExecutor()).cancel(true);
257     }
258     ListenableFuture<Integer> stackDepthCheck =
259         serializer.submit(
260             new Callable<Integer>() {
261               @Override
262               public Integer call() {
263                 return Thread.currentThread().getStackTrace().length;
264               }
265             },
266             directExecutor());
267     settableFuture.set(null);
268     assertThat(getDone(stackDepthCheck))
269         .isLessThan(Thread.currentThread().getStackTrace().length + 100);
270   }
271 
272   public void testAvoidsStackOverflow_alternatingCancelledAndSubmitted() throws Exception {
273     final SettableFuture<Void> settableFuture = SettableFuture.create();
274     ListenableFuture<Void> unused =
275         serializer.submitAsync(
276             new AsyncCallable<Void>() {
277               @Override
278               public ListenableFuture<Void> call() {
279                 return settableFuture;
280               }
281             },
282             directExecutor());
283     for (int i = 0; i < 25_000; i++) {
284       serializer.submit(Callables.<Void>returning(null), directExecutor()).cancel(true);
285       unused = serializer.submit(Callables.<Void>returning(null), directExecutor());
286     }
287     ListenableFuture<Integer> stackDepthCheck =
288         serializer.submit(
289             new Callable<Integer>() {
290               @Override
291               public Integer call() {
292                 return Thread.currentThread().getStackTrace().length;
293               }
294             },
295             directExecutor());
296     settableFuture.set(null);
297     assertThat(getDone(stackDepthCheck))
298         .isLessThan(Thread.currentThread().getStackTrace().length + 100);
299   }
300 
301   private static Function<Integer, Integer> add(final int delta) {
302     return new Function<Integer, Integer>() {
303       @Override
304       public Integer apply(Integer input) {
305         return input + delta;
306       }
307     };
308   }
309 
310   private static AsyncCallable<Integer> asyncAdd(
311       final ListenableFuture<Integer> future, final int delta, final Executor executor) {
312     return new AsyncCallable<Integer>() {
313       @Override
314       public ListenableFuture<Integer> call() throws Exception {
315         return Futures.transform(future, add(delta), executor);
316       }
317     };
318   }
319 
320   private static final class LongHolder {
321     long count;
322   }
323 
324   private static final int ITERATION_COUNT = 50_000;
325   private static final int DIRECT_EXECUTIONS_PER_THREAD = 100;
326 
327   @GwtIncompatible // threads
328 
329   public void testAvoidsStackOverflow_multipleThreads() throws Exception {
330     final LongHolder holder = new LongHolder();
331     final ArrayList<ListenableFuture<Integer>> lengthChecks = new ArrayList<>();
332     final List<Integer> completeLengthChecks;
333     final int baseStackDepth;
334     ExecutorService service = Executors.newFixedThreadPool(5);
335     try {
336       // Avoid counting frames from the executor itself, or the ExecutionSequencer
337       baseStackDepth =
338           serializer
339               .submit(
340                   new Callable<Integer>() {
341                     @Override
342                     public Integer call() {
343                       return Thread.currentThread().getStackTrace().length;
344                     }
345                   },
346                   service)
347               .get();
348       final SettableFuture<Void> settableFuture = SettableFuture.create();
349       ListenableFuture<?> unused =
350           serializer.submitAsync(
351               new AsyncCallable<Void>() {
352                 @Override
353                 public ListenableFuture<Void> call() {
354                   return settableFuture;
355                 }
356               },
357               directExecutor());
358       for (int i = 0; i < 50_000; i++) {
359         if (i % DIRECT_EXECUTIONS_PER_THREAD == 0) {
360           // after some number of iterations, switch threads
361           unused =
362               serializer.submit(
363                   new Callable<Void>() {
364                     @Override
365                     public Void call() {
366                       holder.count++;
367                       return null;
368                     }
369                   },
370                   service);
371         } else if (i % DIRECT_EXECUTIONS_PER_THREAD == DIRECT_EXECUTIONS_PER_THREAD - 1) {
372           // When at max depth, record stack trace depth
373           lengthChecks.add(
374               serializer.submit(
375                   new Callable<Integer>() {
376                     @Override
377                     public Integer call() {
378                       holder.count++;
379                       return Thread.currentThread().getStackTrace().length;
380                     }
381                   },
382                   directExecutor()));
383         } else {
384           // Otherwise, schedule a task on directExecutor
385           unused =
386               serializer.submit(
387                   new Callable<Void>() {
388                     @Override
389                     public Void call() {
390                       holder.count++;
391                       return null;
392                     }
393                   },
394                   directExecutor());
395         }
396       }
397       settableFuture.set(null);
398       completeLengthChecks = allAsList(lengthChecks).get();
399     } finally {
400       service.shutdown();
401     }
402     assertThat(holder.count).isEqualTo(ITERATION_COUNT);
403     for (int length : completeLengthChecks) {
404       // Verify that at max depth, less than one stack frame per submitted task was consumed
405       assertThat(length - baseStackDepth).isLessThan(DIRECT_EXECUTIONS_PER_THREAD / 2);
406     }
407   }
408 
409   @SuppressWarnings("ObjectToString") // Intended behavior
410   public void testToString() {
411     Future<?> unused = serializer.submitAsync(firstCallable, directExecutor());
412     TestCallable secondCallable = new TestCallable(SettableFuture.<Void>create());
413     Future<?> second = serializer.submitAsync(secondCallable, directExecutor());
414     assertThat(secondCallable.called).isFalse();
415     assertThat(second.toString()).contains(secondCallable.toString());
416     firstFuture.set(null);
417     assertThat(second.toString()).contains(secondCallable.future.toString());
418   }
419 
420   private static class BlockingCallable implements Callable<Void> {
421     private final CountDownLatch startLatch = new CountDownLatch(1);
422     private final CountDownLatch stopLatch = new CountDownLatch(1);
423 
424     private volatile boolean running = false;
425 
426     @Override
427     public Void call() throws InterruptedException {
428       running = true;
429       startLatch.countDown();
430       stopLatch.await();
431       running = false;
432       return null;
433     }
434 
435     public void waitForStart() throws InterruptedException {
436       startLatch.await();
437     }
438 
439     public void stop() {
440       stopLatch.countDown();
441     }
442 
443     public boolean isRunning() {
444       return running;
445     }
446   }
447 
448   private static final class TestCallable implements AsyncCallable<Void> {
449 
450     private final ListenableFuture<Void> future;
451     private boolean called = false;
452 
453     private TestCallable(ListenableFuture<Void> future) {
454       this.future = future;
455     }
456 
457     @Override
458     public ListenableFuture<Void> call() throws Exception {
459       called = true;
460       return future;
461     }
462   }
463 }
464