1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import static java.util.concurrent.TimeUnit.NANOSECONDS; 39 40 import java.util.ArrayList; 41 import java.util.Collection; 42 import java.util.Iterator; 43 import java.util.List; 44 45 /** 46 * Provides default implementations of {@link ExecutorService} 47 * execution methods. This class implements the {@code submit}, 48 * {@code invokeAny} and {@code invokeAll} methods using a 49 * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults 50 * to the {@link FutureTask} class provided in this package. For example, 51 * the implementation of {@code submit(Runnable)} creates an 52 * associated {@code RunnableFuture} that is executed and 53 * returned. Subclasses may override the {@code newTaskFor} methods 54 * to return {@code RunnableFuture} implementations other than 55 * {@code FutureTask}. 56 * 57 * <p><b>Extension example.</b> Here is a sketch of a class 58 * that customizes {@link ThreadPoolExecutor} to use 59 * a {@code CustomTask} class instead of the default {@code FutureTask}: 60 * <pre> {@code 61 * public class CustomThreadPoolExecutor extends ThreadPoolExecutor { 62 * 63 * static class CustomTask<V> implements RunnableFuture<V> { ... } 64 * 65 * protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) { 66 * return new CustomTask<V>(c); 67 * } 68 * protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) { 69 * return new CustomTask<V>(r, v); 70 * } 71 * // ... add constructors, etc. 72 * }}</pre> 73 * 74 * @since 1.5 75 * @author Doug Lea 76 */ 77 public abstract class AbstractExecutorService implements ExecutorService { 78 79 /** 80 * Constructor for subclasses to call. 81 */ AbstractExecutorService()82 public AbstractExecutorService() {} 83 84 /** 85 * Returns a {@code RunnableFuture} for the given runnable and default 86 * value. 87 * 88 * @param runnable the runnable task being wrapped 89 * @param value the default value for the returned future 90 * @param <T> the type of the given value 91 * @return a {@code RunnableFuture} which, when run, will run the 92 * underlying runnable and which, as a {@code Future}, will yield 93 * the given value as its result and provide for cancellation of 94 * the underlying task 95 * @since 1.6 96 */ newTaskFor(Runnable runnable, T value)97 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 98 return new FutureTask<T>(runnable, value); 99 } 100 101 /** 102 * Returns a {@code RunnableFuture} for the given callable task. 103 * 104 * @param callable the callable task being wrapped 105 * @param <T> the type of the callable's result 106 * @return a {@code RunnableFuture} which, when run, will call the 107 * underlying callable and which, as a {@code Future}, will yield 108 * the callable's result as its result and provide for 109 * cancellation of the underlying task 110 * @since 1.6 111 */ newTaskFor(Callable<T> callable)112 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 113 return new FutureTask<T>(callable); 114 } 115 116 /** 117 * @throws RejectedExecutionException {@inheritDoc} 118 * @throws NullPointerException {@inheritDoc} 119 */ submit(Runnable task)120 public Future<?> submit(Runnable task) { 121 if (task == null) throw new NullPointerException(); 122 RunnableFuture<Void> ftask = newTaskFor(task, null); 123 execute(ftask); 124 return ftask; 125 } 126 127 /** 128 * @throws RejectedExecutionException {@inheritDoc} 129 * @throws NullPointerException {@inheritDoc} 130 */ submit(Runnable task, T result)131 public <T> Future<T> submit(Runnable task, T result) { 132 if (task == null) throw new NullPointerException(); 133 RunnableFuture<T> ftask = newTaskFor(task, result); 134 execute(ftask); 135 return ftask; 136 } 137 138 /** 139 * @throws RejectedExecutionException {@inheritDoc} 140 * @throws NullPointerException {@inheritDoc} 141 */ submit(Callable<T> task)142 public <T> Future<T> submit(Callable<T> task) { 143 if (task == null) throw new NullPointerException(); 144 RunnableFuture<T> ftask = newTaskFor(task); 145 execute(ftask); 146 return ftask; 147 } 148 149 /** 150 * the main mechanics of invokeAny. 151 */ doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)152 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, 153 boolean timed, long nanos) 154 throws InterruptedException, ExecutionException, TimeoutException { 155 if (tasks == null) 156 throw new NullPointerException(); 157 int ntasks = tasks.size(); 158 if (ntasks == 0) 159 throw new IllegalArgumentException(); 160 ArrayList<Future<T>> futures = new ArrayList<>(ntasks); 161 ExecutorCompletionService<T> ecs = 162 new ExecutorCompletionService<T>(this); 163 164 // For efficiency, especially in executors with limited 165 // parallelism, check to see if previously submitted tasks are 166 // done before submitting more of them. This interleaving 167 // plus the exception mechanics account for messiness of main 168 // loop. 169 170 try { 171 // Record exceptions so that if we fail to obtain any 172 // result, we can throw the last exception we got. 173 ExecutionException ee = null; 174 final long deadline = timed ? System.nanoTime() + nanos : 0L; 175 Iterator<? extends Callable<T>> it = tasks.iterator(); 176 177 // Start one task for sure; the rest incrementally 178 futures.add(ecs.submit(it.next())); 179 --ntasks; 180 int active = 1; 181 182 for (;;) { 183 Future<T> f = ecs.poll(); 184 if (f == null) { 185 if (ntasks > 0) { 186 --ntasks; 187 futures.add(ecs.submit(it.next())); 188 ++active; 189 } 190 else if (active == 0) 191 break; 192 else if (timed) { 193 f = ecs.poll(nanos, NANOSECONDS); 194 if (f == null) 195 throw new TimeoutException(); 196 nanos = deadline - System.nanoTime(); 197 } 198 else 199 f = ecs.take(); 200 } 201 if (f != null) { 202 --active; 203 try { 204 return f.get(); 205 } catch (ExecutionException eex) { 206 ee = eex; 207 } catch (RuntimeException rex) { 208 ee = new ExecutionException(rex); 209 } 210 } 211 } 212 213 if (ee == null) 214 ee = new ExecutionException(); 215 throw ee; 216 217 } finally { 218 cancelAll(futures); 219 } 220 } 221 invokeAny(Collection<? extends Callable<T>> tasks)222 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 223 throws InterruptedException, ExecutionException { 224 try { 225 return doInvokeAny(tasks, false, 0); 226 } catch (TimeoutException cannotHappen) { 227 assert false; 228 return null; 229 } 230 } 231 invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)232 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, 233 long timeout, TimeUnit unit) 234 throws InterruptedException, ExecutionException, TimeoutException { 235 return doInvokeAny(tasks, true, unit.toNanos(timeout)); 236 } 237 invokeAll(Collection<? extends Callable<T>> tasks)238 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 239 throws InterruptedException { 240 if (tasks == null) 241 throw new NullPointerException(); 242 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); 243 try { 244 for (Callable<T> t : tasks) { 245 RunnableFuture<T> f = newTaskFor(t); 246 futures.add(f); 247 execute(f); 248 } 249 for (int i = 0, size = futures.size(); i < size; i++) { 250 Future<T> f = futures.get(i); 251 if (!f.isDone()) { 252 try { f.get(); } 253 catch (CancellationException | ExecutionException ignore) {} 254 } 255 } 256 return futures; 257 } catch (Throwable t) { 258 cancelAll(futures); 259 throw t; 260 } 261 } 262 invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)263 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 264 long timeout, TimeUnit unit) 265 throws InterruptedException { 266 if (tasks == null) 267 throw new NullPointerException(); 268 final long nanos = unit.toNanos(timeout); 269 final long deadline = System.nanoTime() + nanos; 270 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); 271 int j = 0; 272 timedOut: try { 273 for (Callable<T> t : tasks) 274 futures.add(newTaskFor(t)); 275 276 final int size = futures.size(); 277 278 // Interleave time checks and calls to execute in case 279 // executor doesn't have any/much parallelism. 280 for (int i = 0; i < size; i++) { 281 if (((i == 0) ? nanos : deadline - System.nanoTime()) <= 0L) 282 break timedOut; 283 execute((Runnable)futures.get(i)); 284 } 285 286 for (; j < size; j++) { 287 Future<T> f = futures.get(j); 288 if (!f.isDone()) { 289 try { f.get(deadline - System.nanoTime(), NANOSECONDS); } 290 catch (CancellationException | ExecutionException ignore) {} 291 catch (TimeoutException timedOut) { 292 break timedOut; 293 } 294 } 295 } 296 return futures; 297 } catch (Throwable t) { 298 cancelAll(futures); 299 throw t; 300 } 301 // Timed out before all the tasks could be completed; cancel remaining 302 cancelAll(futures, j); 303 return futures; 304 } 305 cancelAll(ArrayList<Future<T>> futures)306 private static <T> void cancelAll(ArrayList<Future<T>> futures) { 307 cancelAll(futures, 0); 308 } 309 310 /** Cancels all futures with index at least j. */ cancelAll(ArrayList<Future<T>> futures, int j)311 private static <T> void cancelAll(ArrayList<Future<T>> futures, int j) { 312 for (int size = futures.size(); j < size; j++) 313 futures.get(j).cancel(true); 314 } 315 } 316