1 /* 2 * Copyright (C) 2008 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import com.google.common.collect.ImmutableList; 20 import com.google.common.collect.Lists; 21 22 import junit.framework.TestCase; 23 24 import java.util.List; 25 import java.util.Queue; 26 import java.util.concurrent.CyclicBarrier; 27 import java.util.concurrent.Executor; 28 import java.util.concurrent.ExecutorService; 29 import java.util.concurrent.Executors; 30 import java.util.concurrent.RejectedExecutionException; 31 import java.util.concurrent.TimeUnit; 32 import java.util.concurrent.atomic.AtomicBoolean; 33 import java.util.concurrent.atomic.AtomicInteger; 34 35 /** 36 * Tests {@link SerializingExecutor}. 37 * 38 * @author JJ Furman 39 */ 40 public class SerializingExecutorTest extends TestCase { 41 private static class FakeExecutor implements Executor { 42 Queue<Runnable> tasks = Lists.newLinkedList(); execute(Runnable command)43 @Override public void execute(Runnable command) { 44 tasks.add(command); 45 } 46 hasNext()47 boolean hasNext() { 48 return !tasks.isEmpty(); 49 } 50 runNext()51 void runNext() { 52 assertTrue("expected at least one task to run", hasNext()); 53 tasks.remove().run(); 54 } 55 56 } 57 private FakeExecutor fakePool; 58 private SerializingExecutor e; 59 60 @Override setUp()61 public void setUp() { 62 fakePool = new FakeExecutor(); 63 e = new SerializingExecutor(fakePool); 64 } 65 testSerializingNullExecutor_fails()66 public void testSerializingNullExecutor_fails() { 67 try { 68 new SerializingExecutor(null); 69 fail("Should have failed with NullPointerException."); 70 } catch (NullPointerException expected) { 71 } 72 } 73 testBasics()74 public void testBasics() { 75 final AtomicInteger totalCalls = new AtomicInteger(); 76 Runnable intCounter = new Runnable() { 77 @Override 78 public void run() { 79 totalCalls.incrementAndGet(); 80 } 81 }; 82 83 assertFalse(fakePool.hasNext()); 84 e.execute(intCounter); 85 assertTrue(fakePool.hasNext()); 86 e.execute(intCounter); 87 assertEquals(0, totalCalls.get()); 88 fakePool.runNext(); // run just 1 sub task... 89 assertEquals(2, totalCalls.get()); 90 assertFalse(fakePool.hasNext()); 91 92 // Check that execute can be safely repeated 93 e.execute(intCounter); 94 e.execute(intCounter); 95 e.execute(intCounter); 96 assertEquals(2, totalCalls.get()); 97 fakePool.runNext(); 98 assertEquals(5, totalCalls.get()); 99 assertFalse(fakePool.hasNext()); 100 } 101 testOrdering()102 public void testOrdering() { 103 final List<Integer> callOrder = Lists.newArrayList(); 104 105 class FakeOp implements Runnable { 106 final int op; 107 108 FakeOp(int op) { 109 this.op = op; 110 } 111 112 @Override 113 public void run() { 114 callOrder.add(op); 115 } 116 } 117 118 e.execute(new FakeOp(0)); 119 e.execute(new FakeOp(1)); 120 e.execute(new FakeOp(2)); 121 fakePool.runNext(); 122 123 assertEquals(ImmutableList.of(0, 1, 2), callOrder); 124 } 125 testExceptions()126 public void testExceptions() { 127 128 final AtomicInteger numCalls = new AtomicInteger(); 129 130 Runnable runMe = new Runnable() { 131 @Override 132 public void run() { 133 numCalls.incrementAndGet(); 134 throw new RuntimeException("FAKE EXCEPTION!"); 135 } 136 }; 137 138 e.execute(runMe); 139 e.execute(runMe); 140 fakePool.runNext(); 141 142 assertEquals(2, numCalls.get()); 143 } 144 testDelegateRejection()145 public void testDelegateRejection() { 146 final AtomicInteger numCalls = new AtomicInteger(); 147 final AtomicBoolean reject = new AtomicBoolean(true); 148 final SerializingExecutor executor = new SerializingExecutor( 149 new Executor() { 150 @Override public void execute(Runnable r) { 151 if (reject.get()) { 152 throw new RejectedExecutionException(); 153 } 154 r.run(); 155 } 156 }); 157 Runnable task = new Runnable() { 158 @Override 159 public void run() { 160 numCalls.incrementAndGet(); 161 } 162 }; 163 try { 164 executor.execute(task); 165 fail(); 166 } catch (RejectedExecutionException expected) {} 167 assertEquals(0, numCalls.get()); 168 reject.set(false); 169 executor.execute(task); 170 assertEquals(2, numCalls.get()); 171 } 172 testTaskThrowsError()173 public void testTaskThrowsError() throws Exception { 174 class MyError extends Error {} 175 final CyclicBarrier barrier = new CyclicBarrier(2); 176 // we need to make sure the error gets thrown on a different thread. 177 ExecutorService service = Executors.newSingleThreadExecutor(); 178 try { 179 final SerializingExecutor executor = new SerializingExecutor(service); 180 Runnable errorTask = new Runnable() { 181 @Override 182 public void run() { 183 throw new MyError(); 184 } 185 }; 186 Runnable barrierTask = new Runnable() { 187 @Override 188 public void run() { 189 try { 190 barrier.await(); 191 } catch (Exception e) { 192 throw new RuntimeException(e); 193 } 194 } 195 }; 196 executor.execute(errorTask); 197 service.execute(barrierTask); // submit directly to the service 198 // the barrier task runs after the error task so we know that the error has been observed by 199 // SerializingExecutor by the time the barrier is satified 200 barrier.await(10, TimeUnit.SECONDS); 201 executor.execute(barrierTask); 202 // timeout means the second task wasn't even tried 203 barrier.await(10, TimeUnit.SECONDS); 204 } finally { 205 service.shutdown(); 206 } 207 } 208 } 209