1 /* 2 * Copyright (C) 2011 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.collect; 18 19 import com.google.common.util.concurrent.Uninterruptibles; 20 21 import junit.framework.TestCase; 22 23 import java.util.Collection; 24 import java.util.List; 25 import java.util.concurrent.ArrayBlockingQueue; 26 import java.util.concurrent.BlockingQueue; 27 import java.util.concurrent.ExecutorService; 28 import java.util.concurrent.Executors; 29 import java.util.concurrent.Future; 30 import java.util.concurrent.LinkedBlockingQueue; 31 import java.util.concurrent.PriorityBlockingQueue; 32 import java.util.concurrent.SynchronousQueue; 33 import java.util.concurrent.TimeUnit; 34 35 /** 36 * Tests for {@link Queues}. 37 * 38 * @author Dimitris Andreou 39 */ 40 41 public class QueuesTest extends TestCase { 42 /* 43 * All the following tests relate to BlockingQueue methods in Queues. 44 */ 45 blockingQueues()46 public static List<BlockingQueue<Object>> blockingQueues() { 47 return ImmutableList.<BlockingQueue<Object>>of( 48 new LinkedBlockingQueue<Object>(), 49 new LinkedBlockingQueue<Object>(10), 50 new SynchronousQueue<Object>(), 51 new ArrayBlockingQueue<Object>(10), 52 new PriorityBlockingQueue<Object>(10, Ordering.arbitrary())); 53 } 54 55 private ExecutorService threadPool; 56 57 @Override setUp()58 public void setUp() { 59 threadPool = Executors.newCachedThreadPool(); 60 } 61 62 @Override tearDown()63 public void tearDown() throws InterruptedException { 64 // notice that if a Producer is interrupted (a bug), the Producer will go into an infinite 65 // loop, which will be noticed here 66 threadPool.shutdown(); 67 assertTrue("Some worker didn't finish in time", 68 threadPool.awaitTermination(1, TimeUnit.SECONDS)); 69 } 70 drain(BlockingQueue<T> q, Collection<? super T> buffer, int maxElements, long timeout, TimeUnit unit, boolean interruptibly)71 private static <T> int drain(BlockingQueue<T> q, Collection<? super T> buffer, int maxElements, 72 long timeout, TimeUnit unit, boolean interruptibly) throws InterruptedException { 73 return interruptibly 74 ? Queues.drain(q, buffer, maxElements, timeout, unit) 75 : Queues.drainUninterruptibly(q, buffer, maxElements, timeout, unit); 76 } 77 testMultipleProducers()78 public void testMultipleProducers() throws Exception { 79 for (BlockingQueue<Object> q : blockingQueues()) { 80 testMultipleProducers(q); 81 } 82 } 83 testMultipleProducers(BlockingQueue<Object> q)84 private void testMultipleProducers(BlockingQueue<Object> q) 85 throws InterruptedException { 86 for (boolean interruptibly : new boolean[] { true, false }) { 87 threadPool.submit(new Producer(q, 20)); 88 threadPool.submit(new Producer(q, 20)); 89 threadPool.submit(new Producer(q, 20)); 90 threadPool.submit(new Producer(q, 20)); 91 threadPool.submit(new Producer(q, 20)); 92 93 List<Object> buf = Lists.newArrayList(); 94 int elements = drain(q, buf, 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS, interruptibly); 95 assertEquals(100, elements); 96 assertEquals(100, buf.size()); 97 assertDrained(q); 98 } 99 } 100 testDrainTimesOut()101 public void testDrainTimesOut() throws Exception { 102 for (BlockingQueue<Object> q : blockingQueues()) { 103 testDrainTimesOut(q); 104 } 105 } 106 testDrainTimesOut(BlockingQueue<Object> q)107 private void testDrainTimesOut(BlockingQueue<Object> q) throws Exception { 108 for (boolean interruptibly : new boolean[] { true, false }) { 109 assertEquals(0, Queues.drain(q, ImmutableList.of(), 1, 10, TimeUnit.MILLISECONDS)); 110 111 // producing one, will ask for two 112 Future<?> submitter = threadPool.submit(new Producer(q, 1)); 113 114 // make sure we time out 115 long startTime = System.nanoTime(); 116 117 int drained = drain(q, Lists.newArrayList(), 2, 10, TimeUnit.MILLISECONDS, interruptibly); 118 assertTrue(drained <= 1); 119 120 assertTrue((System.nanoTime() - startTime) >= TimeUnit.MILLISECONDS.toNanos(10)); 121 122 // If even the first one wasn't there, clean up so that the next test doesn't see an element. 123 submitter.get(); 124 if (drained == 0) { 125 assertNotNull(q.poll()); 126 } 127 } 128 } 129 testZeroElements()130 public void testZeroElements() throws Exception { 131 for (BlockingQueue<Object> q : blockingQueues()) { 132 testZeroElements(q); 133 } 134 } 135 testZeroElements(BlockingQueue<Object> q)136 private void testZeroElements(BlockingQueue<Object> q) throws InterruptedException { 137 for (boolean interruptibly : new boolean[] { true, false }) { 138 // asking to drain zero elements 139 assertEquals(0, drain(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS, interruptibly)); 140 } 141 } 142 testEmpty()143 public void testEmpty() throws Exception { 144 for (BlockingQueue<Object> q : blockingQueues()) { 145 testEmpty(q); 146 } 147 } 148 testEmpty(BlockingQueue<Object> q)149 private void testEmpty(BlockingQueue<Object> q) { 150 assertDrained(q); 151 } 152 testNegativeMaxElements()153 public void testNegativeMaxElements() throws Exception { 154 for (BlockingQueue<Object> q : blockingQueues()) { 155 testNegativeMaxElements(q); 156 } 157 } 158 testNegativeMaxElements(BlockingQueue<Object> q)159 private void testNegativeMaxElements(BlockingQueue<Object> q) throws InterruptedException { 160 threadPool.submit(new Producer(q, 1)); 161 162 List<Object> buf = Lists.newArrayList(); 163 int elements = Queues.drain(q, buf, -1, Long.MAX_VALUE, TimeUnit.NANOSECONDS); 164 assertEquals(elements, 0); 165 assertTrue(buf.isEmpty()); 166 167 // Clean up produced element to free the producer thread, otherwise it will complain 168 // when we shutdown the threadpool. 169 Queues.drain(q, buf, 1, Long.MAX_VALUE, TimeUnit.NANOSECONDS); 170 } 171 testDrain_throws()172 public void testDrain_throws() throws Exception { 173 for (BlockingQueue<Object> q : blockingQueues()) { 174 testDrain_throws(q); 175 } 176 } 177 testDrain_throws(BlockingQueue<Object> q)178 private void testDrain_throws(BlockingQueue<Object> q) { 179 threadPool.submit(new Interrupter(Thread.currentThread())); 180 try { 181 Queues.drain(q, ImmutableList.of(), 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS); 182 fail(); 183 } catch (InterruptedException expected) { 184 } 185 } 186 testDrainUninterruptibly_doesNotThrow()187 public void testDrainUninterruptibly_doesNotThrow() throws Exception { 188 for (BlockingQueue<Object> q : blockingQueues()) { 189 testDrainUninterruptibly_doesNotThrow(q); 190 } 191 } 192 testDrainUninterruptibly_doesNotThrow(final BlockingQueue<Object> q)193 private void testDrainUninterruptibly_doesNotThrow(final BlockingQueue<Object> q) { 194 final Thread mainThread = Thread.currentThread(); 195 threadPool.submit(new Runnable() { 196 public void run() { 197 new Producer(q, 50).run(); 198 new Interrupter(mainThread).run(); 199 new Producer(q, 50).run(); 200 } 201 }); 202 List<Object> buf = Lists.newArrayList(); 203 int elements = 204 Queues.drainUninterruptibly(q, buf, 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS); 205 // so when this drains all elements, we know the thread has also been interrupted in between 206 assertTrue(Thread.interrupted()); 207 assertEquals(100, elements); 208 assertEquals(100, buf.size()); 209 } 210 testNewLinkedBlockingDequeCapacity()211 public void testNewLinkedBlockingDequeCapacity() { 212 try { 213 Queues.newLinkedBlockingDeque(0); 214 fail("Should have thrown IllegalArgumentException"); 215 } catch (IllegalArgumentException expected) { 216 // any capacity less than 1 should throw IllegalArgumentException 217 } 218 assertEquals(1, Queues.newLinkedBlockingDeque(1).remainingCapacity()); 219 assertEquals(11, Queues.newLinkedBlockingDeque(11).remainingCapacity()); 220 } 221 testNewLinkedBlockingQueueCapacity()222 public void testNewLinkedBlockingQueueCapacity() { 223 try { 224 Queues.newLinkedBlockingQueue(0); 225 fail("Should have thrown IllegalArgumentException"); 226 } catch (IllegalArgumentException expected) { 227 // any capacity less than 1 should throw IllegalArgumentException 228 } 229 assertEquals(1, Queues.newLinkedBlockingQueue(1).remainingCapacity()); 230 assertEquals(11, Queues.newLinkedBlockingQueue(11).remainingCapacity()); 231 } 232 233 /** 234 * Checks that #drain() invocations behave correctly for a drained (empty) queue. 235 */ assertDrained(BlockingQueue<Object> q)236 private void assertDrained(BlockingQueue<Object> q) { 237 assertNull(q.peek()); 238 assertInterruptibleDrained(q); 239 assertUninterruptibleDrained(q); 240 } 241 assertInterruptibleDrained(BlockingQueue<Object> q)242 private void assertInterruptibleDrained(BlockingQueue<Object> q) { 243 // nothing to drain, thus this should wait doing nothing 244 try { 245 assertEquals(0, Queues.drain(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS)); 246 } catch (InterruptedException e) { 247 throw new AssertionError(); 248 } 249 250 // but does the wait actually occurs? 251 threadPool.submit(new Interrupter(Thread.currentThread())); 252 try { 253 // if waiting works, this should get stuck 254 Queues.drain(q, Lists.newArrayList(), 1, Long.MAX_VALUE, TimeUnit.NANOSECONDS); 255 fail(); 256 } catch (InterruptedException expected) { 257 // we indeed waited; a slow thread had enough time to interrupt us 258 } 259 } 260 261 // same as above; uninterruptible version assertUninterruptibleDrained(BlockingQueue<Object> q)262 private void assertUninterruptibleDrained(BlockingQueue<Object> q) { 263 assertEquals(0, 264 Queues.drainUninterruptibly(q, ImmutableList.of(), 0, 10, TimeUnit.MILLISECONDS)); 265 266 // but does the wait actually occurs? 267 threadPool.submit(new Interrupter(Thread.currentThread())); 268 269 long startTime = System.nanoTime(); 270 Queues.drainUninterruptibly( 271 q, Lists.newArrayList(), 1, 10, TimeUnit.MILLISECONDS); 272 assertTrue((System.nanoTime() - startTime) >= TimeUnit.MILLISECONDS.toNanos(10)); 273 // wait for interrupted status and clear it 274 while (!Thread.interrupted()) { Thread.yield(); } 275 } 276 277 private static class Producer implements Runnable { 278 final BlockingQueue<Object> q; 279 final int elements; 280 Producer(BlockingQueue<Object> q, int elements)281 Producer(BlockingQueue<Object> q, int elements) { 282 this.q = q; 283 this.elements = elements; 284 } 285 run()286 @Override public void run() { 287 try { 288 for (int i = 0; i < elements; i++) { 289 q.put(new Object()); 290 } 291 } catch (InterruptedException e) { 292 // TODO(user): replace this when there is a better way to spawn threads in tests and 293 // have threads propagate their errors back to the test thread. 294 e.printStackTrace(); 295 // never returns, so that #tearDown() notices that one worker isn't done 296 Uninterruptibles.sleepUninterruptibly(Long.MAX_VALUE, TimeUnit.MILLISECONDS); 297 } 298 } 299 } 300 301 private static class Interrupter implements Runnable { 302 final Thread threadToInterrupt; 303 Interrupter(Thread threadToInterrupt)304 Interrupter(Thread threadToInterrupt) { 305 this.threadToInterrupt = threadToInterrupt; 306 } 307 run()308 @Override public void run() { 309 try { 310 Thread.sleep(100); 311 } catch (InterruptedException e) { 312 throw new AssertionError(); 313 } finally { 314 threadToInterrupt.interrupt(); 315 } 316 } 317 } 318 } 319