1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 * Other contributors include Andrew Wright, Jeffrey Hayes, 6 * Pat Fisher, Mike Judd. 7 */ 8 9 package jsr166; 10 11 import static java.util.concurrent.TimeUnit.MILLISECONDS; 12 13 import java.util.concurrent.ArrayBlockingQueue; 14 import java.util.concurrent.Callable; 15 import java.util.concurrent.ExecutorCompletionService; 16 import java.util.concurrent.Executors; 17 import java.util.concurrent.ExecutorService; 18 import java.util.concurrent.Future; 19 import java.util.concurrent.FutureTask; 20 import java.util.concurrent.RunnableFuture; 21 import java.util.concurrent.ThreadPoolExecutor; 22 import java.util.concurrent.TimeUnit; 23 import java.util.concurrent.atomic.AtomicBoolean; 24 25 import junit.framework.Test; 26 import junit.framework.TestSuite; 27 28 public class ExecutorCompletionServiceTest extends JSR166TestCase { 29 // android-note: Removed because the CTS runner does a bad job of 30 // retrying tests that have suite() declarations. 31 // 32 // public static void main(String[] args) { 33 // main(suite(), args); 34 // } 35 // public static Test suite() { 36 // return new TestSuite(ExecutorCompletionServiceTest.class); 37 // } 38 39 /** 40 * Creating a new ECS with null Executor throw NPE 41 */ testConstructorNPE()42 public void testConstructorNPE() { 43 try { 44 new ExecutorCompletionService(null); 45 shouldThrow(); 46 } catch (NullPointerException success) {} 47 } 48 49 /** 50 * Creating a new ECS with null queue throw NPE 51 */ testConstructorNPE2()52 public void testConstructorNPE2() { 53 try { 54 ExecutorService e = Executors.newCachedThreadPool(); 55 new ExecutorCompletionService(e, null); 56 shouldThrow(); 57 } catch (NullPointerException success) {} 58 } 59 60 /** 61 * Submitting a null callable throws NPE 62 */ testSubmitNPE()63 public void testSubmitNPE() { 64 final ExecutorService e = Executors.newCachedThreadPool(); 65 final ExecutorCompletionService ecs = new ExecutorCompletionService(e); 66 try (PoolCleaner cleaner = cleaner(e)) { 67 Callable c = null; 68 try { 69 ecs.submit(c); 70 shouldThrow(); 71 } catch (NullPointerException success) {} 72 } 73 } 74 75 /** 76 * Submitting a null runnable throws NPE 77 */ testSubmitNPE2()78 public void testSubmitNPE2() { 79 final ExecutorService e = Executors.newCachedThreadPool(); 80 final ExecutorCompletionService ecs = new ExecutorCompletionService(e); 81 try (PoolCleaner cleaner = cleaner(e)) { 82 Runnable r = null; 83 try { 84 ecs.submit(r, Boolean.TRUE); 85 shouldThrow(); 86 } catch (NullPointerException success) {} 87 } 88 } 89 90 /** 91 * A taken submitted task is completed 92 */ testTake()93 public void testTake() throws InterruptedException { 94 final ExecutorService e = Executors.newCachedThreadPool(); 95 final ExecutorCompletionService ecs = new ExecutorCompletionService(e); 96 try (PoolCleaner cleaner = cleaner(e)) { 97 Callable c = new StringTask(); 98 ecs.submit(c); 99 Future f = ecs.take(); 100 assertTrue(f.isDone()); 101 } 102 } 103 104 /** 105 * Take returns the same future object returned by submit 106 */ testTake2()107 public void testTake2() throws InterruptedException { 108 final ExecutorService e = Executors.newCachedThreadPool(); 109 final ExecutorCompletionService ecs = new ExecutorCompletionService(e); 110 try (PoolCleaner cleaner = cleaner(e)) { 111 Callable c = new StringTask(); 112 Future f1 = ecs.submit(c); 113 Future f2 = ecs.take(); 114 assertSame(f1, f2); 115 } 116 } 117 118 /** 119 * If poll returns non-null, the returned task is completed 120 */ testPoll1()121 public void testPoll1() throws Exception { 122 final ExecutorService e = Executors.newCachedThreadPool(); 123 final ExecutorCompletionService ecs = new ExecutorCompletionService(e); 124 try (PoolCleaner cleaner = cleaner(e)) { 125 assertNull(ecs.poll()); 126 Callable c = new StringTask(); 127 ecs.submit(c); 128 129 long startTime = System.nanoTime(); 130 Future f; 131 while ((f = ecs.poll()) == null) { 132 if (millisElapsedSince(startTime) > LONG_DELAY_MS) 133 fail("timed out"); 134 Thread.yield(); 135 } 136 assertTrue(f.isDone()); 137 assertSame(TEST_STRING, f.get()); 138 } 139 } 140 141 /** 142 * If timed poll returns non-null, the returned task is completed 143 */ testPoll2()144 public void testPoll2() throws InterruptedException { 145 final ExecutorService e = Executors.newCachedThreadPool(); 146 final ExecutorCompletionService ecs = new ExecutorCompletionService(e); 147 try (PoolCleaner cleaner = cleaner(e)) { 148 assertNull(ecs.poll()); 149 Callable c = new StringTask(); 150 ecs.submit(c); 151 Future f = ecs.poll(SHORT_DELAY_MS, MILLISECONDS); 152 if (f != null) 153 assertTrue(f.isDone()); 154 } 155 } 156 157 /** 158 * Submitting to underlying AES that overrides newTaskFor(Callable) 159 * returns and eventually runs Future returned by newTaskFor. 160 */ testNewTaskForCallable()161 public void testNewTaskForCallable() throws InterruptedException { 162 final AtomicBoolean done = new AtomicBoolean(false); 163 class MyCallableFuture<V> extends FutureTask<V> { 164 MyCallableFuture(Callable<V> c) { super(c); } 165 protected void done() { done.set(true); } 166 } 167 final ExecutorService e = 168 new ThreadPoolExecutor(1, 1, 169 30L, TimeUnit.SECONDS, 170 new ArrayBlockingQueue<Runnable>(1)) { 171 protected <T> RunnableFuture<T> newTaskFor(Callable<T> c) { 172 return new MyCallableFuture<T>(c); 173 }}; 174 ExecutorCompletionService<String> ecs = 175 new ExecutorCompletionService<String>(e); 176 try (PoolCleaner cleaner = cleaner(e)) { 177 assertNull(ecs.poll()); 178 Callable<String> c = new StringTask(); 179 Future f1 = ecs.submit(c); 180 assertTrue("submit must return MyCallableFuture", 181 f1 instanceof MyCallableFuture); 182 Future f2 = ecs.take(); 183 assertSame("submit and take must return same objects", f1, f2); 184 assertTrue("completed task must have set done", done.get()); 185 } 186 } 187 188 /** 189 * Submitting to underlying AES that overrides newTaskFor(Runnable,T) 190 * returns and eventually runs Future returned by newTaskFor. 191 */ testNewTaskForRunnable()192 public void testNewTaskForRunnable() throws InterruptedException { 193 final AtomicBoolean done = new AtomicBoolean(false); 194 class MyRunnableFuture<V> extends FutureTask<V> { 195 MyRunnableFuture(Runnable t, V r) { super(t, r); } 196 protected void done() { done.set(true); } 197 } 198 final ExecutorService e = 199 new ThreadPoolExecutor(1, 1, 200 30L, TimeUnit.SECONDS, 201 new ArrayBlockingQueue<Runnable>(1)) { 202 protected <T> RunnableFuture<T> newTaskFor(Runnable t, T r) { 203 return new MyRunnableFuture<T>(t, r); 204 }}; 205 final ExecutorCompletionService<String> ecs = 206 new ExecutorCompletionService<String>(e); 207 try (PoolCleaner cleaner = cleaner(e)) { 208 assertNull(ecs.poll()); 209 Runnable r = new NoOpRunnable(); 210 Future f1 = ecs.submit(r, null); 211 assertTrue("submit must return MyRunnableFuture", 212 f1 instanceof MyRunnableFuture); 213 Future f2 = ecs.take(); 214 assertSame("submit and take must return same objects", f1, f2); 215 assertTrue("completed task must have set done", done.get()); 216 } 217 } 218 219 } 220