1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 * Other contributors include Andrew Wright, Jeffrey Hayes, 6 * Pat Fisher, Mike Judd. 7 */ 8 9 package jsr166; 10 11 import junit.framework.*; 12 import java.util.*; 13 import java.util.concurrent.ArrayBlockingQueue; 14 import java.util.concurrent.Callable; 15 import java.util.concurrent.ExecutorCompletionService; 16 import java.util.concurrent.ExecutorService; 17 import java.util.concurrent.Executors; 18 import java.util.concurrent.Future; 19 import java.util.concurrent.FutureTask; 20 import java.util.concurrent.RunnableFuture; 21 import java.util.concurrent.ThreadPoolExecutor; 22 import java.util.concurrent.TimeUnit; 23 import static java.util.concurrent.TimeUnit.MILLISECONDS; 24 import java.util.concurrent.atomic.AtomicBoolean; 25 import java.security.*; 26 27 public class ExecutorCompletionServiceTest extends JSR166TestCase { 28 29 /** 30 * Creating a new ECS with null Executor throw NPE 31 */ testConstructorNPE()32 public void testConstructorNPE() { 33 try { 34 ExecutorCompletionService ecs = new ExecutorCompletionService(null); 35 shouldThrow(); 36 } catch (NullPointerException success) {} 37 } 38 39 /** 40 * Creating a new ECS with null queue throw NPE 41 */ testConstructorNPE2()42 public void testConstructorNPE2() { 43 try { 44 ExecutorService e = Executors.newCachedThreadPool(); 45 ExecutorCompletionService ecs = new ExecutorCompletionService(e, null); 46 shouldThrow(); 47 } catch (NullPointerException success) {} 48 } 49 50 /** 51 * Submitting a null callable throws NPE 52 */ testSubmitNPE()53 public void testSubmitNPE() { 54 ExecutorService e = Executors.newCachedThreadPool(); 55 ExecutorCompletionService ecs = new ExecutorCompletionService(e); 56 try { 57 Callable c = null; 58 ecs.submit(c); 59 shouldThrow(); 60 } catch (NullPointerException success) { 61 } finally { 62 joinPool(e); 63 } 64 } 65 66 /** 67 * Submitting a null runnable throws NPE 68 */ testSubmitNPE2()69 public void testSubmitNPE2() { 70 ExecutorService e = Executors.newCachedThreadPool(); 71 ExecutorCompletionService ecs = new ExecutorCompletionService(e); 72 try { 73 Runnable r = null; 74 ecs.submit(r, Boolean.TRUE); 75 shouldThrow(); 76 } catch (NullPointerException success) { 77 } finally { 78 joinPool(e); 79 } 80 } 81 82 /** 83 * A taken submitted task is completed 84 */ testTake()85 public void testTake() throws InterruptedException { 86 ExecutorService e = Executors.newCachedThreadPool(); 87 ExecutorCompletionService ecs = new ExecutorCompletionService(e); 88 try { 89 Callable c = new StringTask(); 90 ecs.submit(c); 91 Future f = ecs.take(); 92 assertTrue(f.isDone()); 93 } finally { 94 joinPool(e); 95 } 96 } 97 98 /** 99 * Take returns the same future object returned by submit 100 */ testTake2()101 public void testTake2() throws InterruptedException { 102 ExecutorService e = Executors.newCachedThreadPool(); 103 ExecutorCompletionService ecs = new ExecutorCompletionService(e); 104 try { 105 Callable c = new StringTask(); 106 Future f1 = ecs.submit(c); 107 Future f2 = ecs.take(); 108 assertSame(f1, f2); 109 } finally { 110 joinPool(e); 111 } 112 } 113 114 /** 115 * If poll returns non-null, the returned task is completed 116 */ testPoll1()117 public void testPoll1() throws Exception { 118 ExecutorService e = Executors.newCachedThreadPool(); 119 ExecutorCompletionService ecs = new ExecutorCompletionService(e); 120 try { 121 assertNull(ecs.poll()); 122 Callable c = new StringTask(); 123 ecs.submit(c); 124 125 long startTime = System.nanoTime(); 126 Future f; 127 while ((f = ecs.poll()) == null) { 128 if (millisElapsedSince(startTime) > LONG_DELAY_MS) 129 fail("timed out"); 130 Thread.yield(); 131 } 132 assertTrue(f.isDone()); 133 assertSame(TEST_STRING, f.get()); 134 } finally { 135 joinPool(e); 136 } 137 } 138 139 /** 140 * If timed poll returns non-null, the returned task is completed 141 */ testPoll2()142 public void testPoll2() throws InterruptedException { 143 ExecutorService e = Executors.newCachedThreadPool(); 144 ExecutorCompletionService ecs = new ExecutorCompletionService(e); 145 try { 146 assertNull(ecs.poll()); 147 Callable c = new StringTask(); 148 ecs.submit(c); 149 Future f = ecs.poll(SHORT_DELAY_MS, MILLISECONDS); 150 if (f != null) 151 assertTrue(f.isDone()); 152 } finally { 153 joinPool(e); 154 } 155 } 156 157 /** 158 * Submitting to underlying AES that overrides newTaskFor(Callable) 159 * returns and eventually runs Future returned by newTaskFor. 160 */ testNewTaskForCallable()161 public void testNewTaskForCallable() throws InterruptedException { 162 final AtomicBoolean done = new AtomicBoolean(false); 163 class MyCallableFuture<V> extends FutureTask<V> { 164 MyCallableFuture(Callable<V> c) { super(c); } 165 protected void done() { done.set(true); } 166 } 167 ExecutorService e = new ThreadPoolExecutor( 168 1, 1, 30L, TimeUnit.SECONDS, 169 new ArrayBlockingQueue<Runnable>(1)) { 170 protected <T> RunnableFuture<T> newTaskFor(Callable<T> c) { 171 return new MyCallableFuture<T>(c); 172 }}; 173 ExecutorCompletionService<String> ecs = 174 new ExecutorCompletionService<String>(e); 175 try { 176 assertNull(ecs.poll()); 177 Callable<String> c = new StringTask(); 178 Future f1 = ecs.submit(c); 179 assertTrue("submit must return MyCallableFuture", 180 f1 instanceof MyCallableFuture); 181 Future f2 = ecs.take(); 182 assertSame("submit and take must return same objects", f1, f2); 183 assertTrue("completed task must have set done", done.get()); 184 } finally { 185 joinPool(e); 186 } 187 } 188 189 /** 190 * Submitting to underlying AES that overrides newTaskFor(Runnable,T) 191 * returns and eventually runs Future returned by newTaskFor. 192 */ testNewTaskForRunnable()193 public void testNewTaskForRunnable() throws InterruptedException { 194 final AtomicBoolean done = new AtomicBoolean(false); 195 class MyRunnableFuture<V> extends FutureTask<V> { 196 MyRunnableFuture(Runnable t, V r) { super(t, r); } 197 protected void done() { done.set(true); } 198 } 199 ExecutorService e = new ThreadPoolExecutor( 200 1, 1, 30L, TimeUnit.SECONDS, 201 new ArrayBlockingQueue<Runnable>(1)) { 202 protected <T> RunnableFuture<T> newTaskFor(Runnable t, T r) { 203 return new MyRunnableFuture<T>(t, r); 204 }}; 205 ExecutorCompletionService<String> ecs = 206 new ExecutorCompletionService<String>(e); 207 try { 208 assertNull(ecs.poll()); 209 Runnable r = new NoOpRunnable(); 210 Future f1 = ecs.submit(r, null); 211 assertTrue("submit must return MyRunnableFuture", 212 f1 instanceof MyRunnableFuture); 213 Future f2 = ecs.take(); 214 assertSame("submit and take must return same objects", f1, f2); 215 assertTrue("completed task must have set done", done.get()); 216 } finally { 217 joinPool(e); 218 } 219 } 220 221 } 222