1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7 package java.util.concurrent; 8 import java.util.*; 9 10 /** 11 * Provides default implementations of {@link ExecutorService} 12 * execution methods. This class implements the {@code submit}, 13 * {@code invokeAny} and {@code invokeAll} methods using a 14 * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults 15 * to the {@link FutureTask} class provided in this package. For example, 16 * the implementation of {@code submit(Runnable)} creates an 17 * associated {@code RunnableFuture} that is executed and 18 * returned. Subclasses may override the {@code newTaskFor} methods 19 * to return {@code RunnableFuture} implementations other than 20 * {@code FutureTask}. 21 * 22 * <p><b>Extension example</b>. Here is a sketch of a class 23 * that customizes {@link ThreadPoolExecutor} to use 24 * a {@code CustomTask} class instead of the default {@code FutureTask}: 25 * <pre> {@code 26 * public class CustomThreadPoolExecutor extends ThreadPoolExecutor { 27 * 28 * static class CustomTask<V> implements RunnableFuture<V> {...} 29 * 30 * protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) { 31 * return new CustomTask<V>(c); 32 * } 33 * protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) { 34 * return new CustomTask<V>(r, v); 35 * } 36 * // ... add constructors, etc. 37 * }}</pre> 38 * 39 * @since 1.5 40 * @author Doug Lea 41 */ 42 public abstract class AbstractExecutorService implements ExecutorService { 43 44 /** 45 * Returns a {@code RunnableFuture} for the given runnable and default 46 * value. 47 * 48 * @param runnable the runnable task being wrapped 49 * @param value the default value for the returned future 50 * @return a {@code RunnableFuture} which, when run, will run the 51 * underlying runnable and which, as a {@code Future}, will yield 52 * the given value as its result and provide for cancellation of 53 * the underlying task 54 * @since 1.6 55 */ newTaskFor(Runnable runnable, T value)56 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 57 return new FutureTask<T>(runnable, value); 58 } 59 60 /** 61 * Returns a {@code RunnableFuture} for the given callable task. 62 * 63 * @param callable the callable task being wrapped 64 * @return a {@code RunnableFuture} which, when run, will call the 65 * underlying callable and which, as a {@code Future}, will yield 66 * the callable's result as its result and provide for 67 * cancellation of the underlying task 68 * @since 1.6 69 */ newTaskFor(Callable<T> callable)70 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 71 return new FutureTask<T>(callable); 72 } 73 74 /** 75 * @throws RejectedExecutionException {@inheritDoc} 76 * @throws NullPointerException {@inheritDoc} 77 */ submit(Runnable task)78 public Future<?> submit(Runnable task) { 79 if (task == null) throw new NullPointerException(); 80 RunnableFuture<Void> ftask = newTaskFor(task, null); 81 execute(ftask); 82 return ftask; 83 } 84 85 /** 86 * @throws RejectedExecutionException {@inheritDoc} 87 * @throws NullPointerException {@inheritDoc} 88 */ submit(Runnable task, T result)89 public <T> Future<T> submit(Runnable task, T result) { 90 if (task == null) throw new NullPointerException(); 91 RunnableFuture<T> ftask = newTaskFor(task, result); 92 execute(ftask); 93 return ftask; 94 } 95 96 /** 97 * @throws RejectedExecutionException {@inheritDoc} 98 * @throws NullPointerException {@inheritDoc} 99 */ submit(Callable<T> task)100 public <T> Future<T> submit(Callable<T> task) { 101 if (task == null) throw new NullPointerException(); 102 RunnableFuture<T> ftask = newTaskFor(task); 103 execute(ftask); 104 return ftask; 105 } 106 107 /** 108 * the main mechanics of invokeAny. 109 */ doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)110 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, 111 boolean timed, long nanos) 112 throws InterruptedException, ExecutionException, TimeoutException { 113 if (tasks == null) 114 throw new NullPointerException(); 115 int ntasks = tasks.size(); 116 if (ntasks == 0) 117 throw new IllegalArgumentException(); 118 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks); 119 ExecutorCompletionService<T> ecs = 120 new ExecutorCompletionService<T>(this); 121 122 // For efficiency, especially in executors with limited 123 // parallelism, check to see if previously submitted tasks are 124 // done before submitting more of them. This interleaving 125 // plus the exception mechanics account for messiness of main 126 // loop. 127 128 try { 129 // Record exceptions so that if we fail to obtain any 130 // result, we can throw the last exception we got. 131 ExecutionException ee = null; 132 final long deadline = timed ? System.nanoTime() + nanos : 0L; 133 Iterator<? extends Callable<T>> it = tasks.iterator(); 134 135 // Start one task for sure; the rest incrementally 136 futures.add(ecs.submit(it.next())); 137 --ntasks; 138 int active = 1; 139 140 for (;;) { 141 Future<T> f = ecs.poll(); 142 if (f == null) { 143 if (ntasks > 0) { 144 --ntasks; 145 futures.add(ecs.submit(it.next())); 146 ++active; 147 } 148 else if (active == 0) 149 break; 150 else if (timed) { 151 f = ecs.poll(nanos, TimeUnit.NANOSECONDS); 152 if (f == null) 153 throw new TimeoutException(); 154 nanos = deadline - System.nanoTime(); 155 } 156 else 157 f = ecs.take(); 158 } 159 if (f != null) { 160 --active; 161 try { 162 return f.get(); 163 } catch (ExecutionException eex) { 164 ee = eex; 165 } catch (RuntimeException rex) { 166 ee = new ExecutionException(rex); 167 } 168 } 169 } 170 171 if (ee == null) 172 ee = new ExecutionException(); 173 throw ee; 174 175 } finally { 176 for (int i = 0, size = futures.size(); i < size; i++) 177 futures.get(i).cancel(true); 178 } 179 } 180 invokeAny(Collection<? extends Callable<T>> tasks)181 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 182 throws InterruptedException, ExecutionException { 183 try { 184 return doInvokeAny(tasks, false, 0); 185 } catch (TimeoutException cannotHappen) { 186 assert false; 187 return null; 188 } 189 } 190 invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)191 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, 192 long timeout, TimeUnit unit) 193 throws InterruptedException, ExecutionException, TimeoutException { 194 return doInvokeAny(tasks, true, unit.toNanos(timeout)); 195 } 196 invokeAll(Collection<? extends Callable<T>> tasks)197 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 198 throws InterruptedException { 199 if (tasks == null) 200 throw new NullPointerException(); 201 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 202 boolean done = false; 203 try { 204 for (Callable<T> t : tasks) { 205 RunnableFuture<T> f = newTaskFor(t); 206 futures.add(f); 207 execute(f); 208 } 209 for (int i = 0, size = futures.size(); i < size; i++) { 210 Future<T> f = futures.get(i); 211 if (!f.isDone()) { 212 try { 213 f.get(); 214 } catch (CancellationException ignore) { 215 } catch (ExecutionException ignore) { 216 } 217 } 218 } 219 done = true; 220 return futures; 221 } finally { 222 if (!done) 223 for (int i = 0, size = futures.size(); i < size; i++) 224 futures.get(i).cancel(true); 225 } 226 } 227 invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)228 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 229 long timeout, TimeUnit unit) 230 throws InterruptedException { 231 if (tasks == null) 232 throw new NullPointerException(); 233 long nanos = unit.toNanos(timeout); 234 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 235 boolean done = false; 236 try { 237 for (Callable<T> t : tasks) 238 futures.add(newTaskFor(t)); 239 240 final long deadline = System.nanoTime() + nanos; 241 final int size = futures.size(); 242 243 // Interleave time checks and calls to execute in case 244 // executor doesn't have any/much parallelism. 245 for (int i = 0; i < size; i++) { 246 execute((Runnable)futures.get(i)); 247 nanos = deadline - System.nanoTime(); 248 if (nanos <= 0L) 249 return futures; 250 } 251 252 for (int i = 0; i < size; i++) { 253 Future<T> f = futures.get(i); 254 if (!f.isDone()) { 255 if (nanos <= 0L) 256 return futures; 257 try { 258 f.get(nanos, TimeUnit.NANOSECONDS); 259 } catch (CancellationException ignore) { 260 } catch (ExecutionException ignore) { 261 } catch (TimeoutException toe) { 262 return futures; 263 } 264 nanos = deadline - System.nanoTime(); 265 } 266 } 267 done = true; 268 return futures; 269 } finally { 270 if (!done) 271 for (int i = 0, size = futures.size(); i < size; i++) 272 futures.get(i).cancel(true); 273 } 274 } 275 276 } 277