1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 /**
39  * A {@link CompletionService} that uses a supplied {@link Executor}
40  * to execute tasks.  This class arranges that submitted tasks are,
41  * upon completion, placed on a queue accessible using {@code take}.
42  * The class is lightweight enough to be suitable for transient use
43  * when processing groups of tasks.
44  *
45  * <p>
46  *
47  * <b>Usage Examples.</b>
48  *
49  * Suppose you have a set of solvers for a certain problem, each
50  * returning a value of some type {@code Result}, and would like to
51  * run them concurrently, processing the results of each of them that
52  * return a non-null value, in some method {@code use(Result r)}. You
53  * could write this as:
54  *
55  * <pre> {@code
56  * void solve(Executor e,
57  *            Collection<Callable<Result>> solvers)
58  *     throws InterruptedException, ExecutionException {
59  *   CompletionService<Result> cs
60  *       = new ExecutorCompletionService<>(e);
61  *   solvers.forEach(cs::submit);
62  *   for (int i = solvers.size(); i > 0; i--) {
63  *     Result r = cs.take().get();
64  *     if (r != null)
65  *       use(r);
66  *   }
67  * }}</pre>
68  *
69  * Suppose instead that you would like to use the first non-null result
70  * of the set of tasks, ignoring any that encounter exceptions,
71  * and cancelling all other tasks when the first one is ready:
72  *
73  * <pre> {@code
74  * void solve(Executor e,
75  *            Collection<Callable<Result>> solvers)
76  *     throws InterruptedException {
77  *   CompletionService<Result> cs
78  *       = new ExecutorCompletionService<>(e);
79  *   int n = solvers.size();
80  *   List<Future<Result>> futures = new ArrayList<>(n);
81  *   Result result = null;
82  *   try {
83  *     solvers.forEach(solver -> futures.add(cs.submit(solver)));
84  *     for (int i = n; i > 0; i--) {
85  *       try {
86  *         Result r = cs.take().get();
87  *         if (r != null) {
88  *           result = r;
89  *           break;
90  *         }
91  *       } catch (ExecutionException ignore) {}
92  *     }
93  *   } finally {
94  *     futures.forEach(future -> future.cancel(true));
95  *   }
96  *
97  *   if (result != null)
98  *     use(result);
99  * }}</pre>
100  *
101  * @since 1.5
102  */
103 public class ExecutorCompletionService<V> implements CompletionService<V> {
104     private final Executor executor;
105     private final AbstractExecutorService aes;
106     private final BlockingQueue<Future<V>> completionQueue;
107 
108     /**
109      * FutureTask extension to enqueue upon completion.
110      */
111     private static class QueueingFuture<V> extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task, BlockingQueue<Future<V>> completionQueue)112         QueueingFuture(RunnableFuture<V> task,
113                        BlockingQueue<Future<V>> completionQueue) {
114             super(task, null);
115             this.task = task;
116             this.completionQueue = completionQueue;
117         }
118         private final Future<V> task;
119         private final BlockingQueue<Future<V>> completionQueue;
done()120         protected void done() { completionQueue.add(task); }
121     }
122 
newTaskFor(Callable<V> task)123     private RunnableFuture<V> newTaskFor(Callable<V> task) {
124         if (aes == null)
125             return new FutureTask<V>(task);
126         else
127             return aes.newTaskFor(task);
128     }
129 
newTaskFor(Runnable task, V result)130     private RunnableFuture<V> newTaskFor(Runnable task, V result) {
131         if (aes == null)
132             return new FutureTask<V>(task, result);
133         else
134             return aes.newTaskFor(task, result);
135     }
136 
137     /**
138      * Creates an ExecutorCompletionService using the supplied
139      * executor for base task execution and a
140      * {@link LinkedBlockingQueue} as a completion queue.
141      *
142      * @param executor the executor to use
143      * @throws NullPointerException if executor is {@code null}
144      */
ExecutorCompletionService(Executor executor)145     public ExecutorCompletionService(Executor executor) {
146         if (executor == null)
147             throw new NullPointerException();
148         this.executor = executor;
149         this.aes = (executor instanceof AbstractExecutorService) ?
150             (AbstractExecutorService) executor : null;
151         this.completionQueue = new LinkedBlockingQueue<Future<V>>();
152     }
153 
154     /**
155      * Creates an ExecutorCompletionService using the supplied
156      * executor for base task execution and the supplied queue as its
157      * completion queue.
158      *
159      * @param executor the executor to use
160      * @param completionQueue the queue to use as the completion queue
161      *        normally one dedicated for use by this service. This
162      *        queue is treated as unbounded -- failed attempted
163      *        {@code Queue.add} operations for completed tasks cause
164      *        them not to be retrievable.
165      * @throws NullPointerException if executor or completionQueue are {@code null}
166      */
ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)167     public ExecutorCompletionService(Executor executor,
168                                      BlockingQueue<Future<V>> completionQueue) {
169         if (executor == null || completionQueue == null)
170             throw new NullPointerException();
171         this.executor = executor;
172         this.aes = (executor instanceof AbstractExecutorService) ?
173             (AbstractExecutorService) executor : null;
174         this.completionQueue = completionQueue;
175     }
176 
177     /**
178      * @throws RejectedExecutionException {@inheritDoc}
179      * @throws NullPointerException       {@inheritDoc}
180      */
submit(Callable<V> task)181     public Future<V> submit(Callable<V> task) {
182         if (task == null) throw new NullPointerException();
183         RunnableFuture<V> f = newTaskFor(task);
184         executor.execute(new QueueingFuture<V>(f, completionQueue));
185         return f;
186     }
187 
188     /**
189      * @throws RejectedExecutionException {@inheritDoc}
190      * @throws NullPointerException       {@inheritDoc}
191      */
submit(Runnable task, V result)192     public Future<V> submit(Runnable task, V result) {
193         if (task == null) throw new NullPointerException();
194         RunnableFuture<V> f = newTaskFor(task, result);
195         executor.execute(new QueueingFuture<V>(f, completionQueue));
196         return f;
197     }
198 
take()199     public Future<V> take() throws InterruptedException {
200         return completionQueue.take();
201     }
202 
poll()203     public Future<V> poll() {
204         return completionQueue.poll();
205     }
206 
poll(long timeout, TimeUnit unit)207     public Future<V> poll(long timeout, TimeUnit unit)
208             throws InterruptedException {
209         return completionQueue.poll(timeout, unit);
210     }
211 
212 }
213