1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 /**
39  * A {@link CompletionService} that uses a supplied {@link Executor}
40  * to execute tasks.  This class arranges that submitted tasks are,
41  * upon completion, placed on a queue accessible using {@code take}.
42  * The class is lightweight enough to be suitable for transient use
43  * when processing groups of tasks.
44  *
45  * <p>
46  *
47  * <b>Usage Examples.</b>
48  *
49  * Suppose you have a set of solvers for a certain problem, each
50  * returning a value of some type {@code Result}, and would like to
51  * run them concurrently, processing the results of each of them that
52  * return a non-null value, in some method {@code use(Result r)}. You
53  * could write this as:
54  *
55  * <pre> {@code
56  * void solve(Executor e,
57  *            Collection<Callable<Result>> solvers)
58  *     throws InterruptedException, ExecutionException {
59  *   CompletionService<Result> ecs
60  *       = new ExecutorCompletionService<Result>(e);
61  *   for (Callable<Result> s : solvers)
62  *     ecs.submit(s);
63  *   int n = solvers.size();
64  *   for (int i = 0; i < n; ++i) {
65  *     Result r = ecs.take().get();
66  *     if (r != null)
67  *       use(r);
68  *   }
69  * }}</pre>
70  *
71  * Suppose instead that you would like to use the first non-null result
72  * of the set of tasks, ignoring any that encounter exceptions,
73  * and cancelling all other tasks when the first one is ready:
74  *
75  * <pre> {@code
76  * void solve(Executor e,
77  *            Collection<Callable<Result>> solvers)
78  *     throws InterruptedException {
79  *   CompletionService<Result> ecs
80  *       = new ExecutorCompletionService<Result>(e);
81  *   int n = solvers.size();
82  *   List<Future<Result>> futures = new ArrayList<>(n);
83  *   Result result = null;
84  *   try {
85  *     for (Callable<Result> s : solvers)
86  *       futures.add(ecs.submit(s));
87  *     for (int i = 0; i < n; ++i) {
88  *       try {
89  *         Result r = ecs.take().get();
90  *         if (r != null) {
91  *           result = r;
92  *           break;
93  *         }
94  *       } catch (ExecutionException ignore) {}
95  *     }
96  *   }
97  *   finally {
98  *     for (Future<Result> f : futures)
99  *       f.cancel(true);
100  *   }
101  *
102  *   if (result != null)
103  *     use(result);
104  * }}</pre>
105  */
106 public class ExecutorCompletionService<V> implements CompletionService<V> {
107     private final Executor executor;
108     private final AbstractExecutorService aes;
109     private final BlockingQueue<Future<V>> completionQueue;
110 
111     /**
112      * FutureTask extension to enqueue upon completion.
113      */
114     private static class QueueingFuture<V> extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task, BlockingQueue<Future<V>> completionQueue)115         QueueingFuture(RunnableFuture<V> task,
116                        BlockingQueue<Future<V>> completionQueue) {
117             super(task, null);
118             this.task = task;
119             this.completionQueue = completionQueue;
120         }
121         private final Future<V> task;
122         private final BlockingQueue<Future<V>> completionQueue;
done()123         protected void done() { completionQueue.add(task); }
124     }
125 
newTaskFor(Callable<V> task)126     private RunnableFuture<V> newTaskFor(Callable<V> task) {
127         if (aes == null)
128             return new FutureTask<V>(task);
129         else
130             return aes.newTaskFor(task);
131     }
132 
newTaskFor(Runnable task, V result)133     private RunnableFuture<V> newTaskFor(Runnable task, V result) {
134         if (aes == null)
135             return new FutureTask<V>(task, result);
136         else
137             return aes.newTaskFor(task, result);
138     }
139 
140     /**
141      * Creates an ExecutorCompletionService using the supplied
142      * executor for base task execution and a
143      * {@link LinkedBlockingQueue} as a completion queue.
144      *
145      * @param executor the executor to use
146      * @throws NullPointerException if executor is {@code null}
147      */
ExecutorCompletionService(Executor executor)148     public ExecutorCompletionService(Executor executor) {
149         if (executor == null)
150             throw new NullPointerException();
151         this.executor = executor;
152         this.aes = (executor instanceof AbstractExecutorService) ?
153             (AbstractExecutorService) executor : null;
154         this.completionQueue = new LinkedBlockingQueue<Future<V>>();
155     }
156 
157     /**
158      * Creates an ExecutorCompletionService using the supplied
159      * executor for base task execution and the supplied queue as its
160      * completion queue.
161      *
162      * @param executor the executor to use
163      * @param completionQueue the queue to use as the completion queue
164      *        normally one dedicated for use by this service. This
165      *        queue is treated as unbounded -- failed attempted
166      *        {@code Queue.add} operations for completed tasks cause
167      *        them not to be retrievable.
168      * @throws NullPointerException if executor or completionQueue are {@code null}
169      */
ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)170     public ExecutorCompletionService(Executor executor,
171                                      BlockingQueue<Future<V>> completionQueue) {
172         if (executor == null || completionQueue == null)
173             throw new NullPointerException();
174         this.executor = executor;
175         this.aes = (executor instanceof AbstractExecutorService) ?
176             (AbstractExecutorService) executor : null;
177         this.completionQueue = completionQueue;
178     }
179 
submit(Callable<V> task)180     public Future<V> submit(Callable<V> task) {
181         if (task == null) throw new NullPointerException();
182         RunnableFuture<V> f = newTaskFor(task);
183         executor.execute(new QueueingFuture<V>(f, completionQueue));
184         return f;
185     }
186 
submit(Runnable task, V result)187     public Future<V> submit(Runnable task, V result) {
188         if (task == null) throw new NullPointerException();
189         RunnableFuture<V> f = newTaskFor(task, result);
190         executor.execute(new QueueingFuture<V>(f, completionQueue));
191         return f;
192     }
193 
take()194     public Future<V> take() throws InterruptedException {
195         return completionQueue.take();
196     }
197 
poll()198     public Future<V> poll() {
199         return completionQueue.poll();
200     }
201 
poll(long timeout, TimeUnit unit)202     public Future<V> poll(long timeout, TimeUnit unit)
203             throws InterruptedException {
204         return completionQueue.poll(timeout, unit);
205     }
206 
207 }
208