1 /*
2  * Copyright (C) 2008 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.util.concurrent;
18 
19 import com.google.common.collect.ImmutableList;
20 import com.google.common.collect.Lists;
21 
22 import junit.framework.TestCase;
23 
24 import java.util.List;
25 import java.util.Queue;
26 import java.util.concurrent.CyclicBarrier;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.RejectedExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34 
35 /**
36  * Tests {@link SerializingExecutor}.
37  *
38  * @author JJ Furman
39  */
40 public class SerializingExecutorTest extends TestCase {
41   private static class FakeExecutor implements Executor {
42     Queue<Runnable> tasks = Lists.newLinkedList();
execute(Runnable command)43     @Override public void execute(Runnable command) {
44       tasks.add(command);
45     }
46 
hasNext()47     boolean hasNext() {
48       return !tasks.isEmpty();
49     }
50 
runNext()51     void runNext() {
52       assertTrue("expected at least one task to run", hasNext());
53       tasks.remove().run();
54     }
55 
56   }
57   private FakeExecutor fakePool;
58   private SerializingExecutor e;
59 
60   @Override
setUp()61   public void setUp() {
62     fakePool = new FakeExecutor();
63     e = new SerializingExecutor(fakePool);
64   }
65 
testSerializingNullExecutor_fails()66   public void testSerializingNullExecutor_fails() {
67     try {
68       new SerializingExecutor(null);
69       fail("Should have failed with NullPointerException.");
70     } catch (NullPointerException expected) {
71     }
72   }
73 
testBasics()74   public void testBasics() {
75     final AtomicInteger totalCalls = new AtomicInteger();
76     Runnable intCounter = new Runnable() {
77       @Override
78       public void run() {
79         totalCalls.incrementAndGet();
80       }
81     };
82 
83     assertFalse(fakePool.hasNext());
84     e.execute(intCounter);
85     assertTrue(fakePool.hasNext());
86     e.execute(intCounter);
87     assertEquals(0, totalCalls.get());
88     fakePool.runNext(); // run just 1 sub task...
89     assertEquals(2, totalCalls.get());
90     assertFalse(fakePool.hasNext());
91 
92     // Check that execute can be safely repeated
93     e.execute(intCounter);
94     e.execute(intCounter);
95     e.execute(intCounter);
96     assertEquals(2, totalCalls.get());
97     fakePool.runNext();
98     assertEquals(5, totalCalls.get());
99     assertFalse(fakePool.hasNext());
100   }
101 
testOrdering()102   public void testOrdering() {
103     final List<Integer> callOrder = Lists.newArrayList();
104 
105     class FakeOp implements Runnable {
106       final int op;
107 
108       FakeOp(int op) {
109         this.op = op;
110       }
111 
112       @Override
113       public void run() {
114         callOrder.add(op);
115       }
116     }
117 
118     e.execute(new FakeOp(0));
119     e.execute(new FakeOp(1));
120     e.execute(new FakeOp(2));
121     fakePool.runNext();
122 
123     assertEquals(ImmutableList.of(0, 1, 2), callOrder);
124   }
125 
testExceptions()126   public void testExceptions() {
127 
128     final AtomicInteger numCalls = new AtomicInteger();
129 
130     Runnable runMe = new Runnable() {
131       @Override
132       public void run() {
133         numCalls.incrementAndGet();
134         throw new RuntimeException("FAKE EXCEPTION!");
135       }
136     };
137 
138     e.execute(runMe);
139     e.execute(runMe);
140     fakePool.runNext();
141 
142     assertEquals(2, numCalls.get());
143   }
144 
testDelegateRejection()145   public void testDelegateRejection() {
146     final AtomicInteger numCalls = new AtomicInteger();
147     final AtomicBoolean reject = new AtomicBoolean(true);
148     final SerializingExecutor executor = new SerializingExecutor(
149         new Executor() {
150           @Override public void execute(Runnable r) {
151             if (reject.get()) {
152               throw new RejectedExecutionException();
153             }
154             r.run();
155           }
156         });
157     Runnable task = new Runnable() {
158       @Override
159       public void run() {
160         numCalls.incrementAndGet();
161       }
162     };
163     try {
164       executor.execute(task);
165       fail();
166     } catch (RejectedExecutionException expected) {}
167     assertEquals(0, numCalls.get());
168     reject.set(false);
169     executor.execute(task);
170     assertEquals(2, numCalls.get());
171   }
172 
testTaskThrowsError()173   public void testTaskThrowsError() throws Exception {
174     class MyError extends Error {}
175     final CyclicBarrier barrier = new CyclicBarrier(2);
176     // we need to make sure the error gets thrown on a different thread.
177     ExecutorService service = Executors.newSingleThreadExecutor();
178     try {
179       final SerializingExecutor executor = new SerializingExecutor(service);
180       Runnable errorTask = new Runnable() {
181         @Override
182         public void run() {
183           throw new MyError();
184         }
185       };
186       Runnable barrierTask = new Runnable() {
187         @Override
188         public void run() {
189           try {
190             barrier.await();
191           } catch (Exception e) {
192             throw new RuntimeException(e);
193           }
194         }
195       };
196       executor.execute(errorTask);
197       service.execute(barrierTask);  // submit directly to the service
198       // the barrier task runs after the error task so we know that the error has been observed by
199       // SerializingExecutor by the time the barrier is satified
200       barrier.await(10, TimeUnit.SECONDS);
201       executor.execute(barrierTask);
202       // timeout means the second task wasn't even tried
203       barrier.await(10, TimeUnit.SECONDS);
204     } finally {
205       service.shutdown();
206     }
207   }
208 }
209