1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7 package java.util.concurrent; 8 9 import static java.util.concurrent.TimeUnit.NANOSECONDS; 10 11 import java.util.ArrayList; 12 import java.util.Collection; 13 import java.util.Iterator; 14 import java.util.List; 15 16 /** 17 * Provides default implementations of {@link ExecutorService} 18 * execution methods. This class implements the {@code submit}, 19 * {@code invokeAny} and {@code invokeAll} methods using a 20 * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults 21 * to the {@link FutureTask} class provided in this package. For example, 22 * the implementation of {@code submit(Runnable)} creates an 23 * associated {@code RunnableFuture} that is executed and 24 * returned. Subclasses may override the {@code newTaskFor} methods 25 * to return {@code RunnableFuture} implementations other than 26 * {@code FutureTask}. 27 * 28 * <p><b>Extension example</b>. Here is a sketch of a class 29 * that customizes {@link ThreadPoolExecutor} to use 30 * a {@code CustomTask} class instead of the default {@code FutureTask}: 31 * <pre> {@code 32 * public class CustomThreadPoolExecutor extends ThreadPoolExecutor { 33 * 34 * static class CustomTask<V> implements RunnableFuture<V> {...} 35 * 36 * protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) { 37 * return new CustomTask<V>(c); 38 * } 39 * protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) { 40 * return new CustomTask<V>(r, v); 41 * } 42 * // ... add constructors, etc. 43 * }}</pre> 44 * 45 * @since 1.5 46 * @author Doug Lea 47 */ 48 public abstract class AbstractExecutorService implements ExecutorService { 49 50 /** 51 * Returns a {@code RunnableFuture} for the given runnable and default 52 * value. 53 * 54 * @param runnable the runnable task being wrapped 55 * @param value the default value for the returned future 56 * @param <T> the type of the given value 57 * @return a {@code RunnableFuture} which, when run, will run the 58 * underlying runnable and which, as a {@code Future}, will yield 59 * the given value as its result and provide for cancellation of 60 * the underlying task 61 * @since 1.6 62 */ newTaskFor(Runnable runnable, T value)63 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 64 return new FutureTask<T>(runnable, value); 65 } 66 67 /** 68 * Returns a {@code RunnableFuture} for the given callable task. 69 * 70 * @param callable the callable task being wrapped 71 * @param <T> the type of the callable's result 72 * @return a {@code RunnableFuture} which, when run, will call the 73 * underlying callable and which, as a {@code Future}, will yield 74 * the callable's result as its result and provide for 75 * cancellation of the underlying task 76 * @since 1.6 77 */ newTaskFor(Callable<T> callable)78 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 79 return new FutureTask<T>(callable); 80 } 81 82 /** 83 * @throws RejectedExecutionException {@inheritDoc} 84 * @throws NullPointerException {@inheritDoc} 85 */ submit(Runnable task)86 public Future<?> submit(Runnable task) { 87 if (task == null) throw new NullPointerException(); 88 RunnableFuture<Void> ftask = newTaskFor(task, null); 89 execute(ftask); 90 return ftask; 91 } 92 93 /** 94 * @throws RejectedExecutionException {@inheritDoc} 95 * @throws NullPointerException {@inheritDoc} 96 */ submit(Runnable task, T result)97 public <T> Future<T> submit(Runnable task, T result) { 98 if (task == null) throw new NullPointerException(); 99 RunnableFuture<T> ftask = newTaskFor(task, result); 100 execute(ftask); 101 return ftask; 102 } 103 104 /** 105 * @throws RejectedExecutionException {@inheritDoc} 106 * @throws NullPointerException {@inheritDoc} 107 */ submit(Callable<T> task)108 public <T> Future<T> submit(Callable<T> task) { 109 if (task == null) throw new NullPointerException(); 110 RunnableFuture<T> ftask = newTaskFor(task); 111 execute(ftask); 112 return ftask; 113 } 114 115 /** 116 * the main mechanics of invokeAny. 117 */ doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)118 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, 119 boolean timed, long nanos) 120 throws InterruptedException, ExecutionException, TimeoutException { 121 if (tasks == null) 122 throw new NullPointerException(); 123 int ntasks = tasks.size(); 124 if (ntasks == 0) 125 throw new IllegalArgumentException(); 126 ArrayList<Future<T>> futures = new ArrayList<>(ntasks); 127 ExecutorCompletionService<T> ecs = 128 new ExecutorCompletionService<T>(this); 129 130 // For efficiency, especially in executors with limited 131 // parallelism, check to see if previously submitted tasks are 132 // done before submitting more of them. This interleaving 133 // plus the exception mechanics account for messiness of main 134 // loop. 135 136 try { 137 // Record exceptions so that if we fail to obtain any 138 // result, we can throw the last exception we got. 139 ExecutionException ee = null; 140 final long deadline = timed ? System.nanoTime() + nanos : 0L; 141 Iterator<? extends Callable<T>> it = tasks.iterator(); 142 143 // Start one task for sure; the rest incrementally 144 futures.add(ecs.submit(it.next())); 145 --ntasks; 146 int active = 1; 147 148 for (;;) { 149 Future<T> f = ecs.poll(); 150 if (f == null) { 151 if (ntasks > 0) { 152 --ntasks; 153 futures.add(ecs.submit(it.next())); 154 ++active; 155 } 156 else if (active == 0) 157 break; 158 else if (timed) { 159 f = ecs.poll(nanos, NANOSECONDS); 160 if (f == null) 161 throw new TimeoutException(); 162 nanos = deadline - System.nanoTime(); 163 } 164 else 165 f = ecs.take(); 166 } 167 if (f != null) { 168 --active; 169 try { 170 return f.get(); 171 } catch (ExecutionException eex) { 172 ee = eex; 173 } catch (RuntimeException rex) { 174 ee = new ExecutionException(rex); 175 } 176 } 177 } 178 179 if (ee == null) 180 ee = new ExecutionException(); 181 throw ee; 182 183 } finally { 184 cancelAll(futures); 185 } 186 } 187 invokeAny(Collection<? extends Callable<T>> tasks)188 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 189 throws InterruptedException, ExecutionException { 190 try { 191 return doInvokeAny(tasks, false, 0); 192 } catch (TimeoutException cannotHappen) { 193 assert false; 194 return null; 195 } 196 } 197 invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)198 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, 199 long timeout, TimeUnit unit) 200 throws InterruptedException, ExecutionException, TimeoutException { 201 return doInvokeAny(tasks, true, unit.toNanos(timeout)); 202 } 203 invokeAll(Collection<? extends Callable<T>> tasks)204 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 205 throws InterruptedException { 206 if (tasks == null) 207 throw new NullPointerException(); 208 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); 209 try { 210 for (Callable<T> t : tasks) { 211 RunnableFuture<T> f = newTaskFor(t); 212 futures.add(f); 213 execute(f); 214 } 215 for (int i = 0, size = futures.size(); i < size; i++) { 216 Future<T> f = futures.get(i); 217 if (!f.isDone()) { 218 try { f.get(); } 219 catch (CancellationException ignore) {} 220 catch (ExecutionException ignore) {} 221 } 222 } 223 return futures; 224 } catch (Throwable t) { 225 cancelAll(futures); 226 throw t; 227 } 228 } 229 invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)230 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 231 long timeout, TimeUnit unit) 232 throws InterruptedException { 233 if (tasks == null) 234 throw new NullPointerException(); 235 final long nanos = unit.toNanos(timeout); 236 final long deadline = System.nanoTime() + nanos; 237 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); 238 int j = 0; 239 timedOut: try { 240 for (Callable<T> t : tasks) 241 futures.add(newTaskFor(t)); 242 243 final int size = futures.size(); 244 245 // Interleave time checks and calls to execute in case 246 // executor doesn't have any/much parallelism. 247 for (int i = 0; i < size; i++) { 248 if (((i == 0) ? nanos : deadline - System.nanoTime()) <= 0L) 249 break timedOut; 250 execute((Runnable)futures.get(i)); 251 } 252 253 for (; j < size; j++) { 254 Future<T> f = futures.get(j); 255 if (!f.isDone()) { 256 try { f.get(deadline - System.nanoTime(), NANOSECONDS); } 257 catch (CancellationException ignore) {} 258 catch (ExecutionException ignore) {} 259 catch (TimeoutException timedOut) { 260 break timedOut; 261 } 262 } 263 } 264 return futures; 265 } catch (Throwable t) { 266 cancelAll(futures); 267 throw t; 268 } 269 // Timed out before all the tasks could be completed; cancel remaining 270 cancelAll(futures, j); 271 return futures; 272 } 273 cancelAll(ArrayList<Future<T>> futures)274 private static <T> void cancelAll(ArrayList<Future<T>> futures) { 275 cancelAll(futures, 0); 276 } 277 278 /** Cancels all futures with index at least j. */ cancelAll(ArrayList<Future<T>> futures, int j)279 private static <T> void cancelAll(ArrayList<Future<T>> futures, int j) { 280 for (int size = futures.size(); j < size; j++) 281 futures.get(j).cancel(true); 282 } 283 } 284