1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/publicdomain/zero/1.0/
5  */
6 
7 package java.util.concurrent;
8 import java.util.*;
9 
10 /**
11  * Provides default implementations of {@link ExecutorService}
12  * execution methods. This class implements the {@code submit},
13  * {@code invokeAny} and {@code invokeAll} methods using a
14  * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults
15  * to the {@link FutureTask} class provided in this package.  For example,
16  * the implementation of {@code submit(Runnable)} creates an
17  * associated {@code RunnableFuture} that is executed and
18  * returned. Subclasses may override the {@code newTaskFor} methods
19  * to return {@code RunnableFuture} implementations other than
20  * {@code FutureTask}.
21  *
22  * <p><b>Extension example</b>. Here is a sketch of a class
23  * that customizes {@link ThreadPoolExecutor} to use
24  * a {@code CustomTask} class instead of the default {@code FutureTask}:
25  *  <pre> {@code
26  * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
27  *
28  *   static class CustomTask<V> implements RunnableFuture<V> {...}
29  *
30  *   protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
31  *       return new CustomTask<V>(c);
32  *   }
33  *   protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
34  *       return new CustomTask<V>(r, v);
35  *   }
36  *   // ... add constructors, etc.
37  * }}</pre>
38  *
39  * @since 1.5
40  * @author Doug Lea
41  */
42 public abstract class AbstractExecutorService implements ExecutorService {
43 
44     /**
45      * Returns a {@code RunnableFuture} for the given runnable and default
46      * value.
47      *
48      * @param runnable the runnable task being wrapped
49      * @param value the default value for the returned future
50      * @return a {@code RunnableFuture} which, when run, will run the
51      * underlying runnable and which, as a {@code Future}, will yield
52      * the given value as its result and provide for cancellation of
53      * the underlying task
54      * @since 1.6
55      */
newTaskFor(Runnable runnable, T value)56     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
57         return new FutureTask<T>(runnable, value);
58     }
59 
60     /**
61      * Returns a {@code RunnableFuture} for the given callable task.
62      *
63      * @param callable the callable task being wrapped
64      * @return a {@code RunnableFuture} which, when run, will call the
65      * underlying callable and which, as a {@code Future}, will yield
66      * the callable's result as its result and provide for
67      * cancellation of the underlying task
68      * @since 1.6
69      */
newTaskFor(Callable<T> callable)70     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
71         return new FutureTask<T>(callable);
72     }
73 
74     /**
75      * @throws RejectedExecutionException {@inheritDoc}
76      * @throws NullPointerException       {@inheritDoc}
77      */
submit(Runnable task)78     public Future<?> submit(Runnable task) {
79         if (task == null) throw new NullPointerException();
80         RunnableFuture<Void> ftask = newTaskFor(task, null);
81         execute(ftask);
82         return ftask;
83     }
84 
85     /**
86      * @throws RejectedExecutionException {@inheritDoc}
87      * @throws NullPointerException       {@inheritDoc}
88      */
submit(Runnable task, T result)89     public <T> Future<T> submit(Runnable task, T result) {
90         if (task == null) throw new NullPointerException();
91         RunnableFuture<T> ftask = newTaskFor(task, result);
92         execute(ftask);
93         return ftask;
94     }
95 
96     /**
97      * @throws RejectedExecutionException {@inheritDoc}
98      * @throws NullPointerException       {@inheritDoc}
99      */
submit(Callable<T> task)100     public <T> Future<T> submit(Callable<T> task) {
101         if (task == null) throw new NullPointerException();
102         RunnableFuture<T> ftask = newTaskFor(task);
103         execute(ftask);
104         return ftask;
105     }
106 
107     /**
108      * the main mechanics of invokeAny.
109      */
doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)110     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
111                               boolean timed, long nanos)
112         throws InterruptedException, ExecutionException, TimeoutException {
113         if (tasks == null)
114             throw new NullPointerException();
115         int ntasks = tasks.size();
116         if (ntasks == 0)
117             throw new IllegalArgumentException();
118         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
119         ExecutorCompletionService<T> ecs =
120             new ExecutorCompletionService<T>(this);
121 
122         // For efficiency, especially in executors with limited
123         // parallelism, check to see if previously submitted tasks are
124         // done before submitting more of them. This interleaving
125         // plus the exception mechanics account for messiness of main
126         // loop.
127 
128         try {
129             // Record exceptions so that if we fail to obtain any
130             // result, we can throw the last exception we got.
131             ExecutionException ee = null;
132             final long deadline = timed ? System.nanoTime() + nanos : 0L;
133             Iterator<? extends Callable<T>> it = tasks.iterator();
134 
135             // Start one task for sure; the rest incrementally
136             futures.add(ecs.submit(it.next()));
137             --ntasks;
138             int active = 1;
139 
140             for (;;) {
141                 Future<T> f = ecs.poll();
142                 if (f == null) {
143                     if (ntasks > 0) {
144                         --ntasks;
145                         futures.add(ecs.submit(it.next()));
146                         ++active;
147                     }
148                     else if (active == 0)
149                         break;
150                     else if (timed) {
151                         f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
152                         if (f == null)
153                             throw new TimeoutException();
154                         nanos = deadline - System.nanoTime();
155                     }
156                     else
157                         f = ecs.take();
158                 }
159                 if (f != null) {
160                     --active;
161                     try {
162                         return f.get();
163                     } catch (ExecutionException eex) {
164                         ee = eex;
165                     } catch (RuntimeException rex) {
166                         ee = new ExecutionException(rex);
167                     }
168                 }
169             }
170 
171             if (ee == null)
172                 ee = new ExecutionException();
173             throw ee;
174 
175         } finally {
176             for (int i = 0, size = futures.size(); i < size; i++)
177                 futures.get(i).cancel(true);
178         }
179     }
180 
invokeAny(Collection<? extends Callable<T>> tasks)181     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
182         throws InterruptedException, ExecutionException {
183         try {
184             return doInvokeAny(tasks, false, 0);
185         } catch (TimeoutException cannotHappen) {
186             assert false;
187             return null;
188         }
189     }
190 
invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)191     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
192                            long timeout, TimeUnit unit)
193         throws InterruptedException, ExecutionException, TimeoutException {
194         return doInvokeAny(tasks, true, unit.toNanos(timeout));
195     }
196 
invokeAll(Collection<? extends Callable<T>> tasks)197     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
198         throws InterruptedException {
199         if (tasks == null)
200             throw new NullPointerException();
201         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
202         boolean done = false;
203         try {
204             for (Callable<T> t : tasks) {
205                 RunnableFuture<T> f = newTaskFor(t);
206                 futures.add(f);
207                 execute(f);
208             }
209             for (int i = 0, size = futures.size(); i < size; i++) {
210                 Future<T> f = futures.get(i);
211                 if (!f.isDone()) {
212                     try {
213                         f.get();
214                     } catch (CancellationException ignore) {
215                     } catch (ExecutionException ignore) {
216                     }
217                 }
218             }
219             done = true;
220             return futures;
221         } finally {
222             if (!done)
223                 for (int i = 0, size = futures.size(); i < size; i++)
224                     futures.get(i).cancel(true);
225         }
226     }
227 
invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)228     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
229                                          long timeout, TimeUnit unit)
230         throws InterruptedException {
231         if (tasks == null)
232             throw new NullPointerException();
233         long nanos = unit.toNanos(timeout);
234         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
235         boolean done = false;
236         try {
237             for (Callable<T> t : tasks)
238                 futures.add(newTaskFor(t));
239 
240             final long deadline = System.nanoTime() + nanos;
241             final int size = futures.size();
242 
243             // Interleave time checks and calls to execute in case
244             // executor doesn't have any/much parallelism.
245             for (int i = 0; i < size; i++) {
246                 execute((Runnable)futures.get(i));
247                 nanos = deadline - System.nanoTime();
248                 if (nanos <= 0L)
249                     return futures;
250             }
251 
252             for (int i = 0; i < size; i++) {
253                 Future<T> f = futures.get(i);
254                 if (!f.isDone()) {
255                     if (nanos <= 0L)
256                         return futures;
257                     try {
258                         f.get(nanos, TimeUnit.NANOSECONDS);
259                     } catch (CancellationException ignore) {
260                     } catch (ExecutionException ignore) {
261                     } catch (TimeoutException toe) {
262                         return futures;
263                     }
264                     nanos = deadline - System.nanoTime();
265                 }
266             }
267             done = true;
268             return futures;
269         } finally {
270             if (!done)
271                 for (int i = 0, size = futures.size(); i < size; i++)
272                     futures.get(i).cancel(true);
273         }
274     }
275 
276 }
277