1 /* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License 15 */ 16 17 package com.android.dialer.common.concurrent; 18 19 import com.google.common.base.Predicate; 20 import com.google.common.collect.ImmutableList; 21 import com.google.common.util.concurrent.AbstractFuture; 22 import com.google.common.util.concurrent.Atomics; 23 import com.google.common.util.concurrent.Futures; 24 import com.google.common.util.concurrent.ListenableFuture; 25 import com.google.common.util.concurrent.MoreExecutors; 26 import java.util.concurrent.ExecutionException; 27 import java.util.concurrent.atomic.AtomicInteger; 28 import java.util.concurrent.atomic.AtomicReference; 29 30 /** Static utility methods related to futures. */ 31 public class DialerFutures { 32 33 /** 34 * Returns a future that will complete with the same value as the first matching the supplied 35 * predicate, cancelling all inputs upon completion. If none match, {@code defaultValue} is 36 * returned. 37 * 38 * <p>If an input fails before a match is found, the returned future also fails. 39 * 40 * <p>Cancellation of the output future will cause cancellation of all input futures. 41 * 42 * @throws IllegalArgumentException if {@code futures} is empty. 43 */ firstMatching( Iterable<? extends ListenableFuture<? extends T>> futures, Predicate<T> predicate, T defaultValue)44 public static <T> ListenableFuture<T> firstMatching( 45 Iterable<? extends ListenableFuture<? extends T>> futures, 46 Predicate<T> predicate, 47 T defaultValue) { 48 return firstMatchingImpl(futures, predicate, defaultValue); 49 } 50 firstMatchingImpl( Iterable<? extends ListenableFuture<? extends T>> futures, Predicate<T> predicate, T defaultValue)51 private static <T> ListenableFuture<T> firstMatchingImpl( 52 Iterable<? extends ListenableFuture<? extends T>> futures, 53 Predicate<T> predicate, 54 T defaultValue) { 55 AggregateFuture<T> output = new AnyOfFuture<>(futures); 56 final AtomicReference<AggregateFuture<T>> ref = Atomics.newReference(output); 57 final AtomicInteger pending = new AtomicInteger(output.futures.size()); 58 for (final ListenableFuture<? extends T> future : output.futures) { 59 future.addListener( 60 new Runnable() { 61 @Override 62 public void run() { 63 // Call get() and then set() instead of getAndSet() because a volatile read/write is 64 // cheaper than a CAS and atomicity is guaranteed by setFuture. 65 AggregateFuture<T> output = ref.get(); 66 if (output != null) { 67 T value = null; 68 try { 69 value = Futures.getDone(future); 70 } catch (ExecutionException e) { 71 ref.set(null); // unpin 72 output.setException(e); 73 return; 74 } 75 if (!predicate.apply(value)) { 76 if (pending.decrementAndGet() == 0) { 77 // we are the last future (and every other future hasn't matched or failed). 78 output.set(defaultValue); 79 // no point in clearing the ref, every other listener has already run 80 } 81 } else { 82 ref.set(null); // unpin 83 output.set(value); 84 } 85 } 86 } 87 }, 88 MoreExecutors.directExecutor()); 89 } 90 return output; 91 } 92 93 private static class AggregateFuture<T> extends AbstractFuture<T> { 94 ImmutableList<ListenableFuture<? extends T>> futures; 95 AggregateFuture(Iterable<? extends ListenableFuture<? extends T>> futures)96 AggregateFuture(Iterable<? extends ListenableFuture<? extends T>> futures) { 97 ImmutableList<ListenableFuture<? extends T>> futuresCopy = ImmutableList.copyOf(futures); 98 if (futuresCopy.isEmpty()) { 99 throw new IllegalArgumentException("Expected at least one future, got 0."); 100 } 101 this.futures = futuresCopy; 102 } 103 104 // increase visibility 105 @Override set(T t)106 protected boolean set(T t) { 107 return super.set(t); 108 } 109 110 @Override setException(Throwable throwable)111 protected boolean setException(Throwable throwable) { 112 return super.setException(throwable); 113 } 114 115 @Override setFuture(ListenableFuture<? extends T> t)116 protected boolean setFuture(ListenableFuture<? extends T> t) { 117 return super.setFuture(t); 118 } 119 } 120 121 // Propagates cancellation to all inputs cancels all inputs upon completion 122 private static final class AnyOfFuture<T> extends AggregateFuture<T> { AnyOfFuture(Iterable<? extends ListenableFuture<? extends T>> futures)123 AnyOfFuture(Iterable<? extends ListenableFuture<? extends T>> futures) { 124 super(futures); 125 } 126 127 @SuppressWarnings("ShortCircuitBoolean") 128 @Override afterDone()129 protected void afterDone() { 130 ImmutableList<ListenableFuture<? extends T>> localFutures = futures; 131 futures = null; // unpin 132 // even though afterDone is only called once, it is possible that the 'futures' field is null 133 // because it isn't final and thus the write might not be visible if the future instance was 134 // unsafely published. See the comment at the top of Futures.java on memory visibility. 135 if (localFutures != null) { 136 boolean interrupt = !isCancelled() | wasInterrupted(); 137 for (ListenableFuture<? extends T> future : localFutures) { 138 future.cancel(interrupt); 139 } 140 } 141 } 142 } 143 } 144