1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/publicdomain/zero/1.0/
5  */
6 
7 package java.util.concurrent;
8 
9 import static java.util.concurrent.TimeUnit.NANOSECONDS;
10 
11 import java.util.ArrayList;
12 import java.util.Collection;
13 import java.util.Iterator;
14 import java.util.List;
15 
16 /**
17  * Provides default implementations of {@link ExecutorService}
18  * execution methods. This class implements the {@code submit},
19  * {@code invokeAny} and {@code invokeAll} methods using a
20  * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults
21  * to the {@link FutureTask} class provided in this package.  For example,
22  * the implementation of {@code submit(Runnable)} creates an
23  * associated {@code RunnableFuture} that is executed and
24  * returned. Subclasses may override the {@code newTaskFor} methods
25  * to return {@code RunnableFuture} implementations other than
26  * {@code FutureTask}.
27  *
28  * <p><b>Extension example</b>. Here is a sketch of a class
29  * that customizes {@link ThreadPoolExecutor} to use
30  * a {@code CustomTask} class instead of the default {@code FutureTask}:
31  * <pre> {@code
32  * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
33  *
34  *   static class CustomTask<V> implements RunnableFuture<V> {...}
35  *
36  *   protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
37  *       return new CustomTask<V>(c);
38  *   }
39  *   protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
40  *       return new CustomTask<V>(r, v);
41  *   }
42  *   // ... add constructors, etc.
43  * }}</pre>
44  *
45  * @since 1.5
46  * @author Doug Lea
47  */
48 public abstract class AbstractExecutorService implements ExecutorService {
49 
50     /**
51      * Returns a {@code RunnableFuture} for the given runnable and default
52      * value.
53      *
54      * @param runnable the runnable task being wrapped
55      * @param value the default value for the returned future
56      * @param <T> the type of the given value
57      * @return a {@code RunnableFuture} which, when run, will run the
58      * underlying runnable and which, as a {@code Future}, will yield
59      * the given value as its result and provide for cancellation of
60      * the underlying task
61      * @since 1.6
62      */
newTaskFor(Runnable runnable, T value)63     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
64         return new FutureTask<T>(runnable, value);
65     }
66 
67     /**
68      * Returns a {@code RunnableFuture} for the given callable task.
69      *
70      * @param callable the callable task being wrapped
71      * @param <T> the type of the callable's result
72      * @return a {@code RunnableFuture} which, when run, will call the
73      * underlying callable and which, as a {@code Future}, will yield
74      * the callable's result as its result and provide for
75      * cancellation of the underlying task
76      * @since 1.6
77      */
newTaskFor(Callable<T> callable)78     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
79         return new FutureTask<T>(callable);
80     }
81 
82     /**
83      * @throws RejectedExecutionException {@inheritDoc}
84      * @throws NullPointerException       {@inheritDoc}
85      */
submit(Runnable task)86     public Future<?> submit(Runnable task) {
87         if (task == null) throw new NullPointerException();
88         RunnableFuture<Void> ftask = newTaskFor(task, null);
89         execute(ftask);
90         return ftask;
91     }
92 
93     /**
94      * @throws RejectedExecutionException {@inheritDoc}
95      * @throws NullPointerException       {@inheritDoc}
96      */
submit(Runnable task, T result)97     public <T> Future<T> submit(Runnable task, T result) {
98         if (task == null) throw new NullPointerException();
99         RunnableFuture<T> ftask = newTaskFor(task, result);
100         execute(ftask);
101         return ftask;
102     }
103 
104     /**
105      * @throws RejectedExecutionException {@inheritDoc}
106      * @throws NullPointerException       {@inheritDoc}
107      */
submit(Callable<T> task)108     public <T> Future<T> submit(Callable<T> task) {
109         if (task == null) throw new NullPointerException();
110         RunnableFuture<T> ftask = newTaskFor(task);
111         execute(ftask);
112         return ftask;
113     }
114 
115     /**
116      * the main mechanics of invokeAny.
117      */
doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)118     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
119                               boolean timed, long nanos)
120         throws InterruptedException, ExecutionException, TimeoutException {
121         if (tasks == null)
122             throw new NullPointerException();
123         int ntasks = tasks.size();
124         if (ntasks == 0)
125             throw new IllegalArgumentException();
126         ArrayList<Future<T>> futures = new ArrayList<>(ntasks);
127         ExecutorCompletionService<T> ecs =
128             new ExecutorCompletionService<T>(this);
129 
130         // For efficiency, especially in executors with limited
131         // parallelism, check to see if previously submitted tasks are
132         // done before submitting more of them. This interleaving
133         // plus the exception mechanics account for messiness of main
134         // loop.
135 
136         try {
137             // Record exceptions so that if we fail to obtain any
138             // result, we can throw the last exception we got.
139             ExecutionException ee = null;
140             final long deadline = timed ? System.nanoTime() + nanos : 0L;
141             Iterator<? extends Callable<T>> it = tasks.iterator();
142 
143             // Start one task for sure; the rest incrementally
144             futures.add(ecs.submit(it.next()));
145             --ntasks;
146             int active = 1;
147 
148             for (;;) {
149                 Future<T> f = ecs.poll();
150                 if (f == null) {
151                     if (ntasks > 0) {
152                         --ntasks;
153                         futures.add(ecs.submit(it.next()));
154                         ++active;
155                     }
156                     else if (active == 0)
157                         break;
158                     else if (timed) {
159                         f = ecs.poll(nanos, NANOSECONDS);
160                         if (f == null)
161                             throw new TimeoutException();
162                         nanos = deadline - System.nanoTime();
163                     }
164                     else
165                         f = ecs.take();
166                 }
167                 if (f != null) {
168                     --active;
169                     try {
170                         return f.get();
171                     } catch (ExecutionException eex) {
172                         ee = eex;
173                     } catch (RuntimeException rex) {
174                         ee = new ExecutionException(rex);
175                     }
176                 }
177             }
178 
179             if (ee == null)
180                 ee = new ExecutionException();
181             throw ee;
182 
183         } finally {
184             cancelAll(futures);
185         }
186     }
187 
invokeAny(Collection<? extends Callable<T>> tasks)188     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
189         throws InterruptedException, ExecutionException {
190         try {
191             return doInvokeAny(tasks, false, 0);
192         } catch (TimeoutException cannotHappen) {
193             assert false;
194             return null;
195         }
196     }
197 
invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)198     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
199                            long timeout, TimeUnit unit)
200         throws InterruptedException, ExecutionException, TimeoutException {
201         return doInvokeAny(tasks, true, unit.toNanos(timeout));
202     }
203 
invokeAll(Collection<? extends Callable<T>> tasks)204     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
205         throws InterruptedException {
206         if (tasks == null)
207             throw new NullPointerException();
208         ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
209         try {
210             for (Callable<T> t : tasks) {
211                 RunnableFuture<T> f = newTaskFor(t);
212                 futures.add(f);
213                 execute(f);
214             }
215             for (int i = 0, size = futures.size(); i < size; i++) {
216                 Future<T> f = futures.get(i);
217                 if (!f.isDone()) {
218                     try { f.get(); }
219                     catch (CancellationException ignore) {}
220                     catch (ExecutionException ignore) {}
221                 }
222             }
223             return futures;
224         } catch (Throwable t) {
225             cancelAll(futures);
226             throw t;
227         }
228     }
229 
invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)230     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
231                                          long timeout, TimeUnit unit)
232         throws InterruptedException {
233         if (tasks == null)
234             throw new NullPointerException();
235         final long nanos = unit.toNanos(timeout);
236         final long deadline = System.nanoTime() + nanos;
237         ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
238         int j = 0;
239         timedOut: try {
240             for (Callable<T> t : tasks)
241                 futures.add(newTaskFor(t));
242 
243             final int size = futures.size();
244 
245             // Interleave time checks and calls to execute in case
246             // executor doesn't have any/much parallelism.
247             for (int i = 0; i < size; i++) {
248                 if (((i == 0) ? nanos : deadline - System.nanoTime()) <= 0L)
249                     break timedOut;
250                 execute((Runnable)futures.get(i));
251             }
252 
253             for (; j < size; j++) {
254                 Future<T> f = futures.get(j);
255                 if (!f.isDone()) {
256                     try { f.get(deadline - System.nanoTime(), NANOSECONDS); }
257                     catch (CancellationException ignore) {}
258                     catch (ExecutionException ignore) {}
259                     catch (TimeoutException timedOut) {
260                         break timedOut;
261                     }
262                 }
263             }
264             return futures;
265         } catch (Throwable t) {
266             cancelAll(futures);
267             throw t;
268         }
269         // Timed out before all the tasks could be completed; cancel remaining
270         cancelAll(futures, j);
271         return futures;
272     }
273 
cancelAll(ArrayList<Future<T>> futures)274     private static <T> void cancelAll(ArrayList<Future<T>> futures) {
275         cancelAll(futures, 0);
276     }
277 
278     /** Cancels all futures with index at least j. */
cancelAll(ArrayList<Future<T>> futures, int j)279     private static <T> void cancelAll(ArrayList<Future<T>> futures, int j) {
280         for (int size = futures.size(); j < size; j++)
281             futures.get(j).cancel(true);
282     }
283 }
284