1 /*
2  * Copyright (C) 2011 The Guava Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.google.common.collect;
18 
19 import com.google.common.util.concurrent.Uninterruptibles;
20 
21 import junit.framework.TestCase;
22 
23 import java.util.Collection;
24 import java.util.List;
25 import java.util.concurrent.ArrayBlockingQueue;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.PriorityBlockingQueue;
32 import java.util.concurrent.SynchronousQueue;
33 import java.util.concurrent.TimeUnit;
34 
35 /**
36  * Tests for {@link Queues}.
37  *
38  * @author Dimitris Andreou
39  */
40 
41 public class QueuesTest extends TestCase {
42   /*
43    * All the following tests relate to BlockingQueue methods in Queues.
44    */
45 
blockingQueues()46   public static List<BlockingQueue<Object>> blockingQueues() {
47     return ImmutableList.<BlockingQueue<Object>>of(
48         new LinkedBlockingQueue<Object>(),
49         new LinkedBlockingQueue<Object>(10),
50         new SynchronousQueue<Object>(),
51         new ArrayBlockingQueue<Object>(10),
52         new PriorityBlockingQueue<Object>(10, Ordering.arbitrary()));
53   }
54 
55   private ExecutorService threadPool;
56 
57   @Override
setUp()58   public void setUp() {
59     threadPool = Executors.newCachedThreadPool();
60   }
61 
62   @Override
tearDown()63   public void tearDown() throws InterruptedException {
64     // notice that if a Producer is interrupted (a bug), the Producer will go into an infinite
65     // loop, which will be noticed here
66     threadPool.shutdown();
67     assertTrue("Some worker didn't finish in time",
68         threadPool.awaitTermination(1, TimeUnit.SECONDS));
69   }
70 
drain(BlockingQueue<T> q, Collection<? super T> buffer, int maxElements, long timeout, TimeUnit unit, boolean interruptibly)71   private static <T> int drain(BlockingQueue<T> q, Collection<? super T> buffer, int maxElements,
72       long timeout, TimeUnit unit, boolean interruptibly) throws InterruptedException {
73     return interruptibly
74         ? Queues.drain(q, buffer, maxElements, timeout, unit)
75         : Queues.drainUninterruptibly(q, buffer, maxElements, timeout, unit);
76   }
77 
testMultipleProducers()78   public void testMultipleProducers() throws Exception {
79     for (BlockingQueue<Object> q : blockingQueues()) {
80       testMultipleProducers(q);
81     }
82   }
83 
testMultipleProducers(BlockingQueue<Object> q)84   private void testMultipleProducers(BlockingQueue<Object> q)
85       throws InterruptedException {
86     for (boolean interruptibly : new boolean[] { true, false }) {
87       threadPool.submit(new Producer(q, 20));
88       threadPool.submit(new Producer(q, 20));
89       threadPool.submit(new Producer(q, 20));
90       threadPool.submit(new Producer(q, 20));
91       threadPool.submit(new Producer(q, 20));
92 
93       List<Object> buf = Lists.newArrayList();
94       int elements = drain(q, buf, 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS, interruptibly);
95       assertEquals(100, elements);
96       assertEquals(100, buf.size());
97       assertDrained(q);
98     }
99   }
100 
testDrainTimesOut()101   public void testDrainTimesOut() throws Exception {
102     for (BlockingQueue<Object> q : blockingQueues()) {
103       testDrainTimesOut(q);
104     }
105   }
106 
testDrainTimesOut(BlockingQueue<Object> q)107   private void testDrainTimesOut(BlockingQueue<Object> q) throws Exception {
108     for (boolean interruptibly : new boolean[] { true, false }) {
109       assertEquals(0, Queues.drain(q, ImmutableList.of(), 1, 10, TimeUnit.MILLISECONDS));
110 
111       // producing one, will ask for two
112       Future<?> submitter = threadPool.submit(new Producer(q, 1));
113 
114       // make sure we time out
115       long startTime = System.nanoTime();
116 
117       int drained = drain(q, Lists.newArrayList(), 2, 10, TimeUnit.MILLISECONDS, interruptibly);
118       assertTrue(drained <= 1);
119 
120       assertTrue((System.nanoTime() - startTime) >= TimeUnit.MILLISECONDS.toNanos(10));
121 
122       // If even the first one wasn't there, clean up so that the next test doesn't see an element.
123       submitter.get();
124       if (drained == 0) {
125         assertNotNull(q.poll());
126       }
127     }
128   }
129 
testZeroElements()130   public void testZeroElements() throws Exception {
131     for (BlockingQueue<Object> q : blockingQueues()) {
132       testZeroElements(q);
133     }
134   }
135 
testZeroElements(BlockingQueue<Object> q)136   private void testZeroElements(BlockingQueue<Object> q) throws InterruptedException {
137     for (boolean interruptibly : new boolean[] { true, false }) {
138       // asking to drain zero elements
139       assertEquals(0, drain(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS, interruptibly));
140     }
141   }
142 
testEmpty()143   public void testEmpty() throws Exception {
144     for (BlockingQueue<Object> q : blockingQueues()) {
145       testEmpty(q);
146     }
147   }
148 
testEmpty(BlockingQueue<Object> q)149   private void testEmpty(BlockingQueue<Object> q) {
150     assertDrained(q);
151   }
152 
testNegativeMaxElements()153   public void testNegativeMaxElements() throws Exception {
154     for (BlockingQueue<Object> q : blockingQueues()) {
155       testNegativeMaxElements(q);
156     }
157   }
158 
testNegativeMaxElements(BlockingQueue<Object> q)159   private void testNegativeMaxElements(BlockingQueue<Object> q) throws InterruptedException {
160     threadPool.submit(new Producer(q, 1));
161 
162     List<Object> buf = Lists.newArrayList();
163     int elements = Queues.drain(q, buf, -1, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
164     assertEquals(elements, 0);
165     assertTrue(buf.isEmpty());
166 
167     // Clean up produced element to free the producer thread, otherwise it will complain
168     // when we shutdown the threadpool.
169     Queues.drain(q, buf, 1, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
170   }
171 
testDrain_throws()172   public void testDrain_throws() throws Exception {
173     for (BlockingQueue<Object> q : blockingQueues()) {
174       testDrain_throws(q);
175     }
176   }
177 
testDrain_throws(BlockingQueue<Object> q)178   private void testDrain_throws(BlockingQueue<Object> q) {
179     threadPool.submit(new Interrupter(Thread.currentThread()));
180     try {
181       Queues.drain(q, ImmutableList.of(), 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
182       fail();
183     } catch (InterruptedException expected) {
184     }
185   }
186 
testDrainUninterruptibly_doesNotThrow()187   public void testDrainUninterruptibly_doesNotThrow() throws Exception {
188     for (BlockingQueue<Object> q : blockingQueues()) {
189       testDrainUninterruptibly_doesNotThrow(q);
190     }
191   }
192 
testDrainUninterruptibly_doesNotThrow(final BlockingQueue<Object> q)193   private void testDrainUninterruptibly_doesNotThrow(final BlockingQueue<Object> q) {
194     final Thread mainThread = Thread.currentThread();
195     threadPool.submit(new Runnable() {
196       public void run() {
197         new Producer(q, 50).run();
198         new Interrupter(mainThread).run();
199         new Producer(q, 50).run();
200       }
201     });
202     List<Object> buf = Lists.newArrayList();
203     int elements =
204         Queues.drainUninterruptibly(q, buf, 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
205     // so when this drains all elements, we know the thread has also been interrupted in between
206     assertTrue(Thread.interrupted());
207     assertEquals(100, elements);
208     assertEquals(100, buf.size());
209   }
210 
testNewLinkedBlockingDequeCapacity()211   public void testNewLinkedBlockingDequeCapacity() {
212     try {
213       Queues.newLinkedBlockingDeque(0);
214       fail("Should have thrown IllegalArgumentException");
215     } catch (IllegalArgumentException expected) {
216       // any capacity less than 1 should throw IllegalArgumentException
217     }
218     assertEquals(1, Queues.newLinkedBlockingDeque(1).remainingCapacity());
219     assertEquals(11, Queues.newLinkedBlockingDeque(11).remainingCapacity());
220   }
221 
testNewLinkedBlockingQueueCapacity()222   public void testNewLinkedBlockingQueueCapacity() {
223     try {
224       Queues.newLinkedBlockingQueue(0);
225       fail("Should have thrown IllegalArgumentException");
226     } catch (IllegalArgumentException expected) {
227       // any capacity less than 1 should throw IllegalArgumentException
228     }
229     assertEquals(1, Queues.newLinkedBlockingQueue(1).remainingCapacity());
230     assertEquals(11, Queues.newLinkedBlockingQueue(11).remainingCapacity());
231   }
232 
233   /**
234    * Checks that #drain() invocations behave correctly for a drained (empty) queue.
235    */
assertDrained(BlockingQueue<Object> q)236   private void assertDrained(BlockingQueue<Object> q) {
237     assertNull(q.peek());
238     assertInterruptibleDrained(q);
239     assertUninterruptibleDrained(q);
240   }
241 
assertInterruptibleDrained(BlockingQueue<Object> q)242   private void assertInterruptibleDrained(BlockingQueue<Object> q) {
243     // nothing to drain, thus this should wait doing nothing
244     try {
245       assertEquals(0, Queues.drain(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS));
246     } catch (InterruptedException e) {
247       throw new AssertionError();
248     }
249 
250     // but does the wait actually occurs?
251     threadPool.submit(new Interrupter(Thread.currentThread()));
252     try {
253       // if waiting works, this should get stuck
254       Queues.drain(q, Lists.newArrayList(), 1, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
255       fail();
256     } catch (InterruptedException expected) {
257       // we indeed waited; a slow thread had enough time to interrupt us
258     }
259   }
260 
261   // same as above; uninterruptible version
assertUninterruptibleDrained(BlockingQueue<Object> q)262   private void assertUninterruptibleDrained(BlockingQueue<Object> q) {
263     assertEquals(0,
264         Queues.drainUninterruptibly(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS));
265 
266     // but does the wait actually occurs?
267     threadPool.submit(new Interrupter(Thread.currentThread()));
268 
269     long startTime = System.nanoTime();
270     Queues.drainUninterruptibly(
271         q, Lists.newArrayList(), 1, 10, TimeUnit.MILLISECONDS);
272     assertTrue((System.nanoTime() - startTime) >= TimeUnit.MILLISECONDS.toNanos(10));
273     // wait for interrupted status and clear it
274     while (!Thread.interrupted()) { Thread.yield(); }
275   }
276 
277   private static class Producer implements Runnable {
278     final BlockingQueue<Object> q;
279     final int elements;
280 
Producer(BlockingQueue<Object> q, int elements)281     Producer(BlockingQueue<Object> q, int elements) {
282       this.q = q;
283       this.elements = elements;
284     }
285 
run()286     @Override public void run() {
287       try {
288         for (int i = 0; i < elements; i++) {
289           q.put(new Object());
290         }
291       } catch (InterruptedException e) {
292         // TODO(user): replace this when there is a better way to spawn threads in tests and
293         // have threads propagate their errors back to the test thread.
294         e.printStackTrace();
295         // never returns, so that #tearDown() notices that one worker isn't done
296         Uninterruptibles.sleepUninterruptibly(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
297       }
298     }
299   }
300 
301   private static class Interrupter implements Runnable {
302     final Thread threadToInterrupt;
303 
Interrupter(Thread threadToInterrupt)304     Interrupter(Thread threadToInterrupt) {
305       this.threadToInterrupt = threadToInterrupt;
306     }
307 
run()308     @Override public void run() {
309       try {
310         Thread.sleep(100);
311       } catch (InterruptedException e) {
312         throw new AssertionError();
313       } finally {
314         threadToInterrupt.interrupt();
315       }
316     }
317   }
318 }
319