1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License
15  */
16 
17 package com.android.dialer.common.concurrent;
18 
19 import com.google.common.base.Predicate;
20 import com.google.common.collect.ImmutableList;
21 import com.google.common.util.concurrent.AbstractFuture;
22 import com.google.common.util.concurrent.Atomics;
23 import com.google.common.util.concurrent.Futures;
24 import com.google.common.util.concurrent.ListenableFuture;
25 import com.google.common.util.concurrent.MoreExecutors;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import java.util.concurrent.atomic.AtomicReference;
29 
30 /** Static utility methods related to futures. */
31 public class DialerFutures {
32 
33   /**
34    * Returns a future that will complete with the same value as the first matching the supplied
35    * predicate, cancelling all inputs upon completion. If none match, {@code defaultValue} is
36    * returned.
37    *
38    * <p>If an input fails before a match is found, the returned future also fails.
39    *
40    * <p>Cancellation of the output future will cause cancellation of all input futures.
41    *
42    * @throws IllegalArgumentException if {@code futures} is empty.
43    */
firstMatching( Iterable<? extends ListenableFuture<? extends T>> futures, Predicate<T> predicate, T defaultValue)44   public static <T> ListenableFuture<T> firstMatching(
45       Iterable<? extends ListenableFuture<? extends T>> futures,
46       Predicate<T> predicate,
47       T defaultValue) {
48     return firstMatchingImpl(futures, predicate, defaultValue);
49   }
50 
firstMatchingImpl( Iterable<? extends ListenableFuture<? extends T>> futures, Predicate<T> predicate, T defaultValue)51   private static <T> ListenableFuture<T> firstMatchingImpl(
52       Iterable<? extends ListenableFuture<? extends T>> futures,
53       Predicate<T> predicate,
54       T defaultValue) {
55     AggregateFuture<T> output = new AnyOfFuture<>(futures);
56     final AtomicReference<AggregateFuture<T>> ref = Atomics.newReference(output);
57     final AtomicInteger pending = new AtomicInteger(output.futures.size());
58     for (final ListenableFuture<? extends T> future : output.futures) {
59       future.addListener(
60           new Runnable() {
61             @Override
62             public void run() {
63               // Call get() and then set() instead of getAndSet() because a volatile read/write is
64               // cheaper than a CAS and atomicity is guaranteed by setFuture.
65               AggregateFuture<T> output = ref.get();
66               if (output != null) {
67                 T value = null;
68                 try {
69                   value = Futures.getDone(future);
70                 } catch (ExecutionException e) {
71                   ref.set(null); // unpin
72                   output.setException(e);
73                   return;
74                 }
75                 if (!predicate.apply(value)) {
76                   if (pending.decrementAndGet() == 0) {
77                     // we are the last future (and every other future hasn't matched or failed).
78                     output.set(defaultValue);
79                     // no point in clearing the ref, every other listener has already run
80                   }
81                 } else {
82                   ref.set(null); // unpin
83                   output.set(value);
84                 }
85               }
86             }
87           },
88           MoreExecutors.directExecutor());
89     }
90     return output;
91   }
92 
93   private static class AggregateFuture<T> extends AbstractFuture<T> {
94     ImmutableList<ListenableFuture<? extends T>> futures;
95 
AggregateFuture(Iterable<? extends ListenableFuture<? extends T>> futures)96     AggregateFuture(Iterable<? extends ListenableFuture<? extends T>> futures) {
97       ImmutableList<ListenableFuture<? extends T>> futuresCopy = ImmutableList.copyOf(futures);
98       if (futuresCopy.isEmpty()) {
99         throw new IllegalArgumentException("Expected at least one future, got 0.");
100       }
101       this.futures = futuresCopy;
102     }
103 
104     // increase visibility
105     @Override
set(T t)106     protected boolean set(T t) {
107       return super.set(t);
108     }
109 
110     @Override
setException(Throwable throwable)111     protected boolean setException(Throwable throwable) {
112       return super.setException(throwable);
113     }
114 
115     @Override
setFuture(ListenableFuture<? extends T> t)116     protected boolean setFuture(ListenableFuture<? extends T> t) {
117       return super.setFuture(t);
118     }
119   }
120 
121   // Propagates cancellation to all inputs cancels all inputs upon completion
122   private static final class AnyOfFuture<T> extends AggregateFuture<T> {
AnyOfFuture(Iterable<? extends ListenableFuture<? extends T>> futures)123     AnyOfFuture(Iterable<? extends ListenableFuture<? extends T>> futures) {
124       super(futures);
125     }
126 
127     @SuppressWarnings("ShortCircuitBoolean")
128     @Override
afterDone()129     protected void afterDone() {
130       ImmutableList<ListenableFuture<? extends T>> localFutures = futures;
131       futures = null; // unpin
132       // even though afterDone is only called once, it is possible that the 'futures' field is null
133       // because it isn't final and thus the write might not be visible if the future instance was
134       // unsafely published.  See the comment at the top of Futures.java on memory visibility.
135       if (localFutures != null) {
136         boolean interrupt = !isCancelled() | wasInterrupted();
137         for (ListenableFuture<? extends T> future : localFutures) {
138           future.cancel(interrupt);
139         }
140       }
141     }
142   }
143 }
144