1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import static java.util.concurrent.TimeUnit.NANOSECONDS;
39 
40 import java.util.ArrayList;
41 import java.util.Collection;
42 import java.util.Iterator;
43 import java.util.List;
44 
45 /**
46  * Provides default implementations of {@link ExecutorService}
47  * execution methods. This class implements the {@code submit},
48  * {@code invokeAny} and {@code invokeAll} methods using a
49  * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults
50  * to the {@link FutureTask} class provided in this package.  For example,
51  * the implementation of {@code submit(Runnable)} creates an
52  * associated {@code RunnableFuture} that is executed and
53  * returned. Subclasses may override the {@code newTaskFor} methods
54  * to return {@code RunnableFuture} implementations other than
55  * {@code FutureTask}.
56  *
57  * <p><b>Extension example.</b> Here is a sketch of a class
58  * that customizes {@link ThreadPoolExecutor} to use
59  * a {@code CustomTask} class instead of the default {@code FutureTask}:
60  * <pre> {@code
61  * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
62  *
63  *   static class CustomTask<V> implements RunnableFuture<V> { ... }
64  *
65  *   protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
66  *       return new CustomTask<V>(c);
67  *   }
68  *   protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
69  *       return new CustomTask<V>(r, v);
70  *   }
71  *   // ... add constructors, etc.
72  * }}</pre>
73  *
74  * @since 1.5
75  * @author Doug Lea
76  */
77 public abstract class AbstractExecutorService implements ExecutorService {
78 
79     /**
80      * Constructor for subclasses to call.
81      */
AbstractExecutorService()82     public AbstractExecutorService() {}
83 
84     /**
85      * Returns a {@code RunnableFuture} for the given runnable and default
86      * value.
87      *
88      * @param runnable the runnable task being wrapped
89      * @param value the default value for the returned future
90      * @param <T> the type of the given value
91      * @return a {@code RunnableFuture} which, when run, will run the
92      * underlying runnable and which, as a {@code Future}, will yield
93      * the given value as its result and provide for cancellation of
94      * the underlying task
95      * @since 1.6
96      */
newTaskFor(Runnable runnable, T value)97     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
98         return new FutureTask<T>(runnable, value);
99     }
100 
101     /**
102      * Returns a {@code RunnableFuture} for the given callable task.
103      *
104      * @param callable the callable task being wrapped
105      * @param <T> the type of the callable's result
106      * @return a {@code RunnableFuture} which, when run, will call the
107      * underlying callable and which, as a {@code Future}, will yield
108      * the callable's result as its result and provide for
109      * cancellation of the underlying task
110      * @since 1.6
111      */
newTaskFor(Callable<T> callable)112     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
113         return new FutureTask<T>(callable);
114     }
115 
116     /**
117      * @throws RejectedExecutionException {@inheritDoc}
118      * @throws NullPointerException       {@inheritDoc}
119      */
submit(Runnable task)120     public Future<?> submit(Runnable task) {
121         if (task == null) throw new NullPointerException();
122         RunnableFuture<Void> ftask = newTaskFor(task, null);
123         execute(ftask);
124         return ftask;
125     }
126 
127     /**
128      * @throws RejectedExecutionException {@inheritDoc}
129      * @throws NullPointerException       {@inheritDoc}
130      */
submit(Runnable task, T result)131     public <T> Future<T> submit(Runnable task, T result) {
132         if (task == null) throw new NullPointerException();
133         RunnableFuture<T> ftask = newTaskFor(task, result);
134         execute(ftask);
135         return ftask;
136     }
137 
138     /**
139      * @throws RejectedExecutionException {@inheritDoc}
140      * @throws NullPointerException       {@inheritDoc}
141      */
submit(Callable<T> task)142     public <T> Future<T> submit(Callable<T> task) {
143         if (task == null) throw new NullPointerException();
144         RunnableFuture<T> ftask = newTaskFor(task);
145         execute(ftask);
146         return ftask;
147     }
148 
149     /**
150      * the main mechanics of invokeAny.
151      */
doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)152     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
153                               boolean timed, long nanos)
154         throws InterruptedException, ExecutionException, TimeoutException {
155         if (tasks == null)
156             throw new NullPointerException();
157         int ntasks = tasks.size();
158         if (ntasks == 0)
159             throw new IllegalArgumentException();
160         ArrayList<Future<T>> futures = new ArrayList<>(ntasks);
161         ExecutorCompletionService<T> ecs =
162             new ExecutorCompletionService<T>(this);
163 
164         // For efficiency, especially in executors with limited
165         // parallelism, check to see if previously submitted tasks are
166         // done before submitting more of them. This interleaving
167         // plus the exception mechanics account for messiness of main
168         // loop.
169 
170         try {
171             // Record exceptions so that if we fail to obtain any
172             // result, we can throw the last exception we got.
173             ExecutionException ee = null;
174             final long deadline = timed ? System.nanoTime() + nanos : 0L;
175             Iterator<? extends Callable<T>> it = tasks.iterator();
176 
177             // Start one task for sure; the rest incrementally
178             futures.add(ecs.submit(it.next()));
179             --ntasks;
180             int active = 1;
181 
182             for (;;) {
183                 Future<T> f = ecs.poll();
184                 if (f == null) {
185                     if (ntasks > 0) {
186                         --ntasks;
187                         futures.add(ecs.submit(it.next()));
188                         ++active;
189                     }
190                     else if (active == 0)
191                         break;
192                     else if (timed) {
193                         f = ecs.poll(nanos, NANOSECONDS);
194                         if (f == null)
195                             throw new TimeoutException();
196                         nanos = deadline - System.nanoTime();
197                     }
198                     else
199                         f = ecs.take();
200                 }
201                 if (f != null) {
202                     --active;
203                     try {
204                         return f.get();
205                     } catch (ExecutionException eex) {
206                         ee = eex;
207                     } catch (RuntimeException rex) {
208                         ee = new ExecutionException(rex);
209                     }
210                 }
211             }
212 
213             if (ee == null)
214                 ee = new ExecutionException();
215             throw ee;
216 
217         } finally {
218             cancelAll(futures);
219         }
220     }
221 
invokeAny(Collection<? extends Callable<T>> tasks)222     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
223         throws InterruptedException, ExecutionException {
224         try {
225             return doInvokeAny(tasks, false, 0);
226         } catch (TimeoutException cannotHappen) {
227             assert false;
228             return null;
229         }
230     }
231 
invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)232     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
233                            long timeout, TimeUnit unit)
234         throws InterruptedException, ExecutionException, TimeoutException {
235         return doInvokeAny(tasks, true, unit.toNanos(timeout));
236     }
237 
invokeAll(Collection<? extends Callable<T>> tasks)238     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
239         throws InterruptedException {
240         if (tasks == null)
241             throw new NullPointerException();
242         ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
243         try {
244             for (Callable<T> t : tasks) {
245                 RunnableFuture<T> f = newTaskFor(t);
246                 futures.add(f);
247                 execute(f);
248             }
249             for (int i = 0, size = futures.size(); i < size; i++) {
250                 Future<T> f = futures.get(i);
251                 if (!f.isDone()) {
252                     try { f.get(); }
253                     catch (CancellationException | ExecutionException ignore) {}
254                 }
255             }
256             return futures;
257         } catch (Throwable t) {
258             cancelAll(futures);
259             throw t;
260         }
261     }
262 
invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)263     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
264                                          long timeout, TimeUnit unit)
265         throws InterruptedException {
266         if (tasks == null)
267             throw new NullPointerException();
268         final long nanos = unit.toNanos(timeout);
269         final long deadline = System.nanoTime() + nanos;
270         ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
271         int j = 0;
272         timedOut: try {
273             for (Callable<T> t : tasks)
274                 futures.add(newTaskFor(t));
275 
276             final int size = futures.size();
277 
278             // Interleave time checks and calls to execute in case
279             // executor doesn't have any/much parallelism.
280             for (int i = 0; i < size; i++) {
281                 if (((i == 0) ? nanos : deadline - System.nanoTime()) <= 0L)
282                     break timedOut;
283                 execute((Runnable)futures.get(i));
284             }
285 
286             for (; j < size; j++) {
287                 Future<T> f = futures.get(j);
288                 if (!f.isDone()) {
289                     try { f.get(deadline - System.nanoTime(), NANOSECONDS); }
290                     catch (CancellationException | ExecutionException ignore) {}
291                     catch (TimeoutException timedOut) {
292                         break timedOut;
293                     }
294                 }
295             }
296             return futures;
297         } catch (Throwable t) {
298             cancelAll(futures);
299             throw t;
300         }
301         // Timed out before all the tasks could be completed; cancel remaining
302         cancelAll(futures, j);
303         return futures;
304     }
305 
cancelAll(ArrayList<Future<T>> futures)306     private static <T> void cancelAll(ArrayList<Future<T>> futures) {
307         cancelAll(futures, 0);
308     }
309 
310     /** Cancels all futures with index at least j. */
cancelAll(ArrayList<Future<T>> futures, int j)311     private static <T> void cancelAll(ArrayList<Future<T>> futures, int j) {
312         for (int size = futures.size(); j < size; j++)
313             futures.get(j).cancel(true);
314     }
315 }
316