1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/publicdomain/zero/1.0/
5  * Other contributors include Andrew Wright, Jeffrey Hayes,
6  * Pat Fisher, Mike Judd.
7  */
8 
9 package jsr166;
10 
11 import junit.framework.*;
12 import java.util.*;
13 import java.util.concurrent.ArrayBlockingQueue;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.ExecutorCompletionService;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.Future;
19 import java.util.concurrent.FutureTask;
20 import java.util.concurrent.RunnableFuture;
21 import java.util.concurrent.ThreadPoolExecutor;
22 import java.util.concurrent.TimeUnit;
23 import static java.util.concurrent.TimeUnit.MILLISECONDS;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import java.security.*;
26 
27 public class ExecutorCompletionServiceTest extends JSR166TestCase {
28 
29     /**
30      * Creating a new ECS with null Executor throw NPE
31      */
testConstructorNPE()32     public void testConstructorNPE() {
33         try {
34             ExecutorCompletionService ecs = new ExecutorCompletionService(null);
35             shouldThrow();
36         } catch (NullPointerException success) {}
37     }
38 
39     /**
40      * Creating a new ECS with null queue throw NPE
41      */
testConstructorNPE2()42     public void testConstructorNPE2() {
43         try {
44             ExecutorService e = Executors.newCachedThreadPool();
45             ExecutorCompletionService ecs = new ExecutorCompletionService(e, null);
46             shouldThrow();
47         } catch (NullPointerException success) {}
48     }
49 
50     /**
51      * Submitting a null callable throws NPE
52      */
testSubmitNPE()53     public void testSubmitNPE() {
54         ExecutorService e = Executors.newCachedThreadPool();
55         ExecutorCompletionService ecs = new ExecutorCompletionService(e);
56         try {
57             Callable c = null;
58             ecs.submit(c);
59             shouldThrow();
60         } catch (NullPointerException success) {
61         } finally {
62             joinPool(e);
63         }
64     }
65 
66     /**
67      * Submitting a null runnable throws NPE
68      */
testSubmitNPE2()69     public void testSubmitNPE2() {
70         ExecutorService e = Executors.newCachedThreadPool();
71         ExecutorCompletionService ecs = new ExecutorCompletionService(e);
72         try {
73             Runnable r = null;
74             ecs.submit(r, Boolean.TRUE);
75             shouldThrow();
76         } catch (NullPointerException success) {
77         } finally {
78             joinPool(e);
79         }
80     }
81 
82     /**
83      * A taken submitted task is completed
84      */
testTake()85     public void testTake() throws InterruptedException {
86         ExecutorService e = Executors.newCachedThreadPool();
87         ExecutorCompletionService ecs = new ExecutorCompletionService(e);
88         try {
89             Callable c = new StringTask();
90             ecs.submit(c);
91             Future f = ecs.take();
92             assertTrue(f.isDone());
93         } finally {
94             joinPool(e);
95         }
96     }
97 
98     /**
99      * Take returns the same future object returned by submit
100      */
testTake2()101     public void testTake2() throws InterruptedException {
102         ExecutorService e = Executors.newCachedThreadPool();
103         ExecutorCompletionService ecs = new ExecutorCompletionService(e);
104         try {
105             Callable c = new StringTask();
106             Future f1 = ecs.submit(c);
107             Future f2 = ecs.take();
108             assertSame(f1, f2);
109         } finally {
110             joinPool(e);
111         }
112     }
113 
114     /**
115      * If poll returns non-null, the returned task is completed
116      */
testPoll1()117     public void testPoll1() throws Exception {
118         ExecutorService e = Executors.newCachedThreadPool();
119         ExecutorCompletionService ecs = new ExecutorCompletionService(e);
120         try {
121             assertNull(ecs.poll());
122             Callable c = new StringTask();
123             ecs.submit(c);
124 
125             long startTime = System.nanoTime();
126             Future f;
127             while ((f = ecs.poll()) == null) {
128                 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
129                     fail("timed out");
130                 Thread.yield();
131             }
132             assertTrue(f.isDone());
133             assertSame(TEST_STRING, f.get());
134         } finally {
135             joinPool(e);
136         }
137     }
138 
139     /**
140      * If timed poll returns non-null, the returned task is completed
141      */
testPoll2()142     public void testPoll2() throws InterruptedException {
143         ExecutorService e = Executors.newCachedThreadPool();
144         ExecutorCompletionService ecs = new ExecutorCompletionService(e);
145         try {
146             assertNull(ecs.poll());
147             Callable c = new StringTask();
148             ecs.submit(c);
149             Future f = ecs.poll(SHORT_DELAY_MS, MILLISECONDS);
150             if (f != null)
151                 assertTrue(f.isDone());
152         } finally {
153             joinPool(e);
154         }
155     }
156 
157     /**
158      * Submitting to underlying AES that overrides newTaskFor(Callable)
159      * returns and eventually runs Future returned by newTaskFor.
160      */
testNewTaskForCallable()161     public void testNewTaskForCallable() throws InterruptedException {
162         final AtomicBoolean done = new AtomicBoolean(false);
163         class MyCallableFuture<V> extends FutureTask<V> {
164             MyCallableFuture(Callable<V> c) { super(c); }
165             protected void done() { done.set(true); }
166         }
167         ExecutorService e = new ThreadPoolExecutor(
168                                  1, 1, 30L, TimeUnit.SECONDS,
169                                  new ArrayBlockingQueue<Runnable>(1)) {
170             protected <T> RunnableFuture<T> newTaskFor(Callable<T> c) {
171                 return new MyCallableFuture<T>(c);
172             }};
173         ExecutorCompletionService<String> ecs =
174             new ExecutorCompletionService<String>(e);
175         try {
176             assertNull(ecs.poll());
177             Callable<String> c = new StringTask();
178             Future f1 = ecs.submit(c);
179             assertTrue("submit must return MyCallableFuture",
180                        f1 instanceof MyCallableFuture);
181             Future f2 = ecs.take();
182             assertSame("submit and take must return same objects", f1, f2);
183             assertTrue("completed task must have set done", done.get());
184         } finally {
185             joinPool(e);
186         }
187     }
188 
189     /**
190      * Submitting to underlying AES that overrides newTaskFor(Runnable,T)
191      * returns and eventually runs Future returned by newTaskFor.
192      */
testNewTaskForRunnable()193     public void testNewTaskForRunnable() throws InterruptedException {
194         final AtomicBoolean done = new AtomicBoolean(false);
195         class MyRunnableFuture<V> extends FutureTask<V> {
196             MyRunnableFuture(Runnable t, V r) { super(t, r); }
197             protected void done() { done.set(true); }
198         }
199         ExecutorService e = new ThreadPoolExecutor(
200                                  1, 1, 30L, TimeUnit.SECONDS,
201                                  new ArrayBlockingQueue<Runnable>(1)) {
202             protected <T> RunnableFuture<T> newTaskFor(Runnable t, T r) {
203                 return new MyRunnableFuture<T>(t, r);
204             }};
205         ExecutorCompletionService<String> ecs =
206             new ExecutorCompletionService<String>(e);
207         try {
208             assertNull(ecs.poll());
209             Runnable r = new NoOpRunnable();
210             Future f1 = ecs.submit(r, null);
211             assertTrue("submit must return MyRunnableFuture",
212                        f1 instanceof MyRunnableFuture);
213             Future f2 = ecs.take();
214             assertSame("submit and take must return same objects", f1, f2);
215             assertTrue("completed task must have set done", done.get());
216         } finally {
217             joinPool(e);
218         }
219     }
220 
221 }
222