1 /*
2  * Copyright (C) 2008 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import com.google.common.collect.ImmutableList;
20 import com.google.common.collect.Lists;
21 import com.google.common.collect.Queues;
22 
23 import junit.framework.TestCase;
24 
25 import java.util.List;
26 import java.util.Queue;
27 import java.util.concurrent.CyclicBarrier;
28 import java.util.concurrent.Executor;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.RejectedExecutionException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicBoolean;
34 import java.util.concurrent.atomic.AtomicInteger;
35 
36 /**
37  * Tests {@link SerializingExecutor}.
38  *
39  * @author JJ Furman
40  */
41 public class SerializingExecutorTest extends TestCase {
42   private static class FakeExecutor implements Executor {
43     Queue<Runnable> tasks = Queues.newArrayDeque();
execute(Runnable command)44     @Override public void execute(Runnable command) {
45       tasks.add(command);
46     }
47 
hasNext()48     boolean hasNext() {
49       return !tasks.isEmpty();
50     }
51 
runNext()52     void runNext() {
53       assertTrue("expected at least one task to run", hasNext());
54       tasks.remove().run();
55     }
56 
57   }
58   private FakeExecutor fakePool;
59   private SerializingExecutor e;
60 
61   @Override
setUp()62   public void setUp() {
63     fakePool = new FakeExecutor();
64     e = new SerializingExecutor(fakePool);
65   }
66 
testSerializingNullExecutor_fails()67   public void testSerializingNullExecutor_fails() {
68     try {
69       new SerializingExecutor(null);
70       fail("Should have failed with NullPointerException.");
71     } catch (NullPointerException expected) {
72     }
73   }
74 
testBasics()75   public void testBasics() {
76     final AtomicInteger totalCalls = new AtomicInteger();
77     Runnable intCounter = new Runnable() {
78       @Override
79       public void run() {
80         totalCalls.incrementAndGet();
81       }
82     };
83 
84     assertFalse(fakePool.hasNext());
85     e.execute(intCounter);
86     assertTrue(fakePool.hasNext());
87     e.execute(intCounter);
88     assertEquals(0, totalCalls.get());
89     fakePool.runNext(); // run just 1 sub task...
90     assertEquals(2, totalCalls.get());
91     assertFalse(fakePool.hasNext());
92 
93     // Check that execute can be safely repeated
94     e.execute(intCounter);
95     e.execute(intCounter);
96     e.execute(intCounter);
97     assertEquals(2, totalCalls.get());
98     fakePool.runNext();
99     assertEquals(5, totalCalls.get());
100     assertFalse(fakePool.hasNext());
101   }
102 
testOrdering()103   public void testOrdering() {
104     final List<Integer> callOrder = Lists.newArrayList();
105 
106     class FakeOp implements Runnable {
107       final int op;
108 
109       FakeOp(int op) {
110         this.op = op;
111       }
112 
113       @Override
114       public void run() {
115         callOrder.add(op);
116       }
117     }
118 
119     e.execute(new FakeOp(0));
120     e.execute(new FakeOp(1));
121     e.execute(new FakeOp(2));
122     fakePool.runNext();
123 
124     assertEquals(ImmutableList.of(0, 1, 2), callOrder);
125   }
126 
testExceptions()127   public void testExceptions() {
128 
129     final AtomicInteger numCalls = new AtomicInteger();
130 
131     Runnable runMe = new Runnable() {
132       @Override
133       public void run() {
134         numCalls.incrementAndGet();
135         throw new RuntimeException("FAKE EXCEPTION!");
136       }
137     };
138 
139     e.execute(runMe);
140     e.execute(runMe);
141     fakePool.runNext();
142 
143     assertEquals(2, numCalls.get());
144   }
145 
testDelegateRejection()146   public void testDelegateRejection() {
147     final AtomicInteger numCalls = new AtomicInteger();
148     final AtomicBoolean reject = new AtomicBoolean(true);
149     final SerializingExecutor executor = new SerializingExecutor(
150         new Executor() {
151           @Override public void execute(Runnable r) {
152             if (reject.get()) {
153               throw new RejectedExecutionException();
154             }
155             r.run();
156           }
157         });
158     Runnable task = new Runnable() {
159       @Override
160       public void run() {
161         numCalls.incrementAndGet();
162       }
163     };
164     try {
165       executor.execute(task);
166       fail();
167     } catch (RejectedExecutionException expected) {}
168     assertEquals(0, numCalls.get());
169     reject.set(false);
170     executor.execute(task);
171     assertEquals(2, numCalls.get());
172   }
173 
testTaskThrowsError()174   public void testTaskThrowsError() throws Exception {
175     class MyError extends Error {}
176     final CyclicBarrier barrier = new CyclicBarrier(2);
177     // we need to make sure the error gets thrown on a different thread.
178     ExecutorService service = Executors.newSingleThreadExecutor();
179     try {
180       final SerializingExecutor executor = new SerializingExecutor(service);
181       Runnable errorTask = new Runnable() {
182         @Override
183         public void run() {
184           throw new MyError();
185         }
186       };
187       Runnable barrierTask = new Runnable() {
188         @Override
189         public void run() {
190           try {
191             barrier.await();
192           } catch (Exception e) {
193             throw new RuntimeException(e);
194           }
195         }
196       };
197       executor.execute(errorTask);
198       service.execute(barrierTask);  // submit directly to the service
199       // the barrier task runs after the error task so we know that the error has been observed by
200       // SerializingExecutor by the time the barrier is satified
201       barrier.await(10, TimeUnit.SECONDS);
202       executor.execute(barrierTask);
203       // timeout means the second task wasn't even tried
204       barrier.await(10, TimeUnit.SECONDS);
205     } finally {
206       service.shutdown();
207     }
208   }
209 }
210