1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/publicdomain/zero/1.0/
5  * Other contributors include Andrew Wright, Jeffrey Hayes,
6  * Pat Fisher, Mike Judd.
7  */
8 
9 package jsr166;
10 
11 import static java.util.concurrent.TimeUnit.MILLISECONDS;
12 
13 import java.util.concurrent.ArrayBlockingQueue;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.ExecutorCompletionService;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Future;
19 import java.util.concurrent.FutureTask;
20 import java.util.concurrent.RunnableFuture;
21 import java.util.concurrent.ThreadPoolExecutor;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.atomic.AtomicBoolean;
24 
25 import junit.framework.Test;
26 import junit.framework.TestSuite;
27 
28 public class ExecutorCompletionServiceTest extends JSR166TestCase {
29     // android-note: Removed because the CTS runner does a bad job of
30     // retrying tests that have suite() declarations.
31     //
32     // public static void main(String[] args) {
33     //     main(suite(), args);
34     // }
35     // public static Test suite() {
36     //     return new TestSuite(ExecutorCompletionServiceTest.class);
37     // }
38 
39     /**
40      * Creating a new ECS with null Executor throw NPE
41      */
42     public void testConstructorNPE() {
43         try {
44             new ExecutorCompletionService(null);
45             shouldThrow();
46         } catch (NullPointerException success) {}
47     }
48 
49     /**
50      * Creating a new ECS with null queue throw NPE
51      */
52     public void testConstructorNPE2() {
53         try {
54             ExecutorService e = Executors.newCachedThreadPool();
55             new ExecutorCompletionService(e, null);
56             shouldThrow();
57         } catch (NullPointerException success) {}
58     }
59 
60     /**
61      * Submitting a null callable throws NPE
62      */
63     public void testSubmitNPE() {
64         final ExecutorService e = Executors.newCachedThreadPool();
65         final ExecutorCompletionService ecs = new ExecutorCompletionService(e);
66         try (PoolCleaner cleaner = cleaner(e)) {
67             Callable c = null;
68             try {
69                 ecs.submit(c);
70                 shouldThrow();
71             } catch (NullPointerException success) {}
72         }
73     }
74 
75     /**
76      * Submitting a null runnable throws NPE
77      */
78     public void testSubmitNPE2() {
79         final ExecutorService e = Executors.newCachedThreadPool();
80         final ExecutorCompletionService ecs = new ExecutorCompletionService(e);
81         try (PoolCleaner cleaner = cleaner(e)) {
82             Runnable r = null;
83             try {
84                 ecs.submit(r, Boolean.TRUE);
85                 shouldThrow();
86             } catch (NullPointerException success) {}
87         }
88     }
89 
90     /**
91      * A taken submitted task is completed
92      */
93     public void testTake() throws InterruptedException {
94         final ExecutorService e = Executors.newCachedThreadPool();
95         final ExecutorCompletionService ecs = new ExecutorCompletionService(e);
96         try (PoolCleaner cleaner = cleaner(e)) {
97             Callable c = new StringTask();
98             ecs.submit(c);
99             Future f = ecs.take();
100             assertTrue(f.isDone());
101         }
102     }
103 
104     /**
105      * Take returns the same future object returned by submit
106      */
107     public void testTake2() throws InterruptedException {
108         final ExecutorService e = Executors.newCachedThreadPool();
109         final ExecutorCompletionService ecs = new ExecutorCompletionService(e);
110         try (PoolCleaner cleaner = cleaner(e)) {
111             Callable c = new StringTask();
112             Future f1 = ecs.submit(c);
113             Future f2 = ecs.take();
114             assertSame(f1, f2);
115         }
116     }
117 
118     /**
119      * If poll returns non-null, the returned task is completed
120      */
121     public void testPoll1() throws Exception {
122         final ExecutorService e = Executors.newCachedThreadPool();
123         final ExecutorCompletionService ecs = new ExecutorCompletionService(e);
124         try (PoolCleaner cleaner = cleaner(e)) {
125             assertNull(ecs.poll());
126             Callable c = new StringTask();
127             ecs.submit(c);
128 
129             long startTime = System.nanoTime();
130             Future f;
131             while ((f = ecs.poll()) == null) {
132                 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
133                     fail("timed out");
134                 Thread.yield();
135             }
136             assertTrue(f.isDone());
137             assertSame(TEST_STRING, f.get());
138         }
139     }
140 
141     /**
142      * If timed poll returns non-null, the returned task is completed
143      */
144     public void testPoll2() throws InterruptedException {
145         final ExecutorService e = Executors.newCachedThreadPool();
146         final ExecutorCompletionService ecs = new ExecutorCompletionService(e);
147         try (PoolCleaner cleaner = cleaner(e)) {
148             assertNull(ecs.poll());
149             Callable c = new StringTask();
150             ecs.submit(c);
151             Future f = ecs.poll(SHORT_DELAY_MS, MILLISECONDS);
152             if (f != null)
153                 assertTrue(f.isDone());
154         }
155     }
156 
157     /**
158      * Submitting to underlying AES that overrides newTaskFor(Callable)
159      * returns and eventually runs Future returned by newTaskFor.
160      */
161     public void testNewTaskForCallable() throws InterruptedException {
162         final AtomicBoolean done = new AtomicBoolean(false);
163         class MyCallableFuture<V> extends FutureTask<V> {
164             MyCallableFuture(Callable<V> c) { super(c); }
165             protected void done() { done.set(true); }
166         }
167         final ExecutorService e =
168             new ThreadPoolExecutor(1, 1,
169                                    30L, TimeUnit.SECONDS,
170                                    new ArrayBlockingQueue<Runnable>(1)) {
171                 protected <T> RunnableFuture<T> newTaskFor(Callable<T> c) {
172                     return new MyCallableFuture<T>(c);
173                 }};
174         ExecutorCompletionService<String> ecs =
175             new ExecutorCompletionService<String>(e);
176         try (PoolCleaner cleaner = cleaner(e)) {
177             assertNull(ecs.poll());
178             Callable<String> c = new StringTask();
179             Future f1 = ecs.submit(c);
180             assertTrue("submit must return MyCallableFuture",
181                        f1 instanceof MyCallableFuture);
182             Future f2 = ecs.take();
183             assertSame("submit and take must return same objects", f1, f2);
184             assertTrue("completed task must have set done", done.get());
185         }
186     }
187 
188     /**
189      * Submitting to underlying AES that overrides newTaskFor(Runnable,T)
190      * returns and eventually runs Future returned by newTaskFor.
191      */
192     public void testNewTaskForRunnable() throws InterruptedException {
193         final AtomicBoolean done = new AtomicBoolean(false);
194         class MyRunnableFuture<V> extends FutureTask<V> {
195             MyRunnableFuture(Runnable t, V r) { super(t, r); }
196             protected void done() { done.set(true); }
197         }
198         final ExecutorService e =
199             new ThreadPoolExecutor(1, 1,
200                                    30L, TimeUnit.SECONDS,
201                                    new ArrayBlockingQueue<Runnable>(1)) {
202                 protected <T> RunnableFuture<T> newTaskFor(Runnable t, T r) {
203                     return new MyRunnableFuture<T>(t, r);
204                 }};
205         final ExecutorCompletionService<String> ecs =
206             new ExecutorCompletionService<String>(e);
207         try (PoolCleaner cleaner = cleaner(e)) {
208             assertNull(ecs.poll());
209             Runnable r = new NoOpRunnable();
210             Future f1 = ecs.submit(r, null);
211             assertTrue("submit must return MyRunnableFuture",
212                        f1 instanceof MyRunnableFuture);
213             Future f2 = ecs.take();
214             assertSame("submit and take must return same objects", f1, f2);
215             assertTrue("completed task must have set done", done.get());
216         }
217     }
218 
219 }
220