1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/publicdomain/zero/1.0/
5  * Other contributors include Andrew Wright, Jeffrey Hayes,
6  * Pat Fisher, Mike Judd.
7  */
8 
9 package jsr166;
10 
11 import static java.util.concurrent.TimeUnit.MILLISECONDS;
12 
13 import java.util.ArrayList;
14 import java.util.Arrays;
15 import java.util.Collection;
16 import java.util.Iterator;
17 import java.util.NoSuchElementException;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.CountDownLatch;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.SynchronousQueue;
23 
24 import junit.framework.Test;
25 
26 public class SynchronousQueueTest extends JSR166TestCase {
27 
28     // android-note: These tests have been moved into their own separate
29     // classes to work around CTS issues.
30     //
31     // public static class Fair extends BlockingQueueTest {
32     //     protected BlockingQueue emptyCollection() {
33     //         return new SynchronousQueue(true);
34     //     }
35     // }
36 
37     // public static class NonFair extends BlockingQueueTest {
38     //     protected BlockingQueue emptyCollection() {
39     //         return new SynchronousQueue(false);
40     //     }
41     // }
42 
43     // android-note: Removed because the CTS runner does a bad job of
44     // retrying tests that have suite() declarations.
45     //
46     // public static void main(String[] args) {
47     //     main(suite(), args);
48     // }
49     // public static Test suite() {
50     //     return newTestSuite(SynchronousQueueTest.class,
51     //                         new Fair().testSuite(),
52     //                         new NonFair().testSuite());
53     // }
54 
55     /**
56      * Any SynchronousQueue is both empty and full
57      */
testEmptyFull()58     public void testEmptyFull()      { testEmptyFull(false); }
testEmptyFull_fair()59     public void testEmptyFull_fair() { testEmptyFull(true); }
testEmptyFull(boolean fair)60     public void testEmptyFull(boolean fair) {
61         final SynchronousQueue q = new SynchronousQueue(fair);
62         assertTrue(q.isEmpty());
63         assertEquals(0, q.size());
64         assertEquals(0, q.remainingCapacity());
65         assertFalse(q.offer(zero));
66     }
67 
68     /**
69      * offer fails if no active taker
70      */
testOffer()71     public void testOffer()      { testOffer(false); }
testOffer_fair()72     public void testOffer_fair() { testOffer(true); }
testOffer(boolean fair)73     public void testOffer(boolean fair) {
74         SynchronousQueue q = new SynchronousQueue(fair);
75         assertFalse(q.offer(one));
76     }
77 
78     /**
79      * add throws IllegalStateException if no active taker
80      */
testAdd()81     public void testAdd()      { testAdd(false); }
testAdd_fair()82     public void testAdd_fair() { testAdd(true); }
testAdd(boolean fair)83     public void testAdd(boolean fair) {
84         SynchronousQueue q = new SynchronousQueue(fair);
85         assertEquals(0, q.remainingCapacity());
86         try {
87             q.add(one);
88             shouldThrow();
89         } catch (IllegalStateException success) {}
90     }
91 
92     /**
93      * addAll(this) throws IllegalArgumentException
94      */
testAddAll_self()95     public void testAddAll_self()      { testAddAll_self(false); }
testAddAll_self_fair()96     public void testAddAll_self_fair() { testAddAll_self(true); }
testAddAll_self(boolean fair)97     public void testAddAll_self(boolean fair) {
98         SynchronousQueue q = new SynchronousQueue(fair);
99         try {
100             q.addAll(q);
101             shouldThrow();
102         } catch (IllegalArgumentException success) {}
103     }
104 
105     /**
106      * addAll throws ISE if no active taker
107      */
testAddAll_ISE()108     public void testAddAll_ISE()      { testAddAll_ISE(false); }
testAddAll_ISE_fair()109     public void testAddAll_ISE_fair() { testAddAll_ISE(true); }
testAddAll_ISE(boolean fair)110     public void testAddAll_ISE(boolean fair) {
111         SynchronousQueue q = new SynchronousQueue(fair);
112         Integer[] ints = new Integer[1];
113         for (int i = 0; i < ints.length; i++)
114             ints[i] = i;
115         Collection<Integer> coll = Arrays.asList(ints);
116         try {
117             q.addAll(coll);
118             shouldThrow();
119         } catch (IllegalStateException success) {}
120     }
121 
122     /**
123      * put blocks interruptibly if no active taker
124      */
testBlockingPut()125     public void testBlockingPut()      { testBlockingPut(false); }
testBlockingPut_fair()126     public void testBlockingPut_fair() { testBlockingPut(true); }
testBlockingPut(boolean fair)127     public void testBlockingPut(boolean fair) {
128         final SynchronousQueue q = new SynchronousQueue(fair);
129         final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
130         Thread t = newStartedThread(new CheckedRunnable() {
131             public void realRun() throws InterruptedException {
132                 Thread.currentThread().interrupt();
133                 try {
134                     q.put(99);
135                     shouldThrow();
136                 } catch (InterruptedException success) {}
137                 assertFalse(Thread.interrupted());
138 
139                 pleaseInterrupt.countDown();
140                 try {
141                     q.put(99);
142                     shouldThrow();
143                 } catch (InterruptedException success) {}
144                 assertFalse(Thread.interrupted());
145             }});
146 
147         await(pleaseInterrupt);
148         assertThreadStaysAlive(t);
149         t.interrupt();
150         awaitTermination(t);
151         assertEquals(0, q.remainingCapacity());
152     }
153 
154     /**
155      * put blocks interruptibly waiting for take
156      */
testPutWithTake()157     public void testPutWithTake()      { testPutWithTake(false); }
testPutWithTake_fair()158     public void testPutWithTake_fair() { testPutWithTake(true); }
testPutWithTake(boolean fair)159     public void testPutWithTake(boolean fair) {
160         final SynchronousQueue q = new SynchronousQueue(fair);
161         final CountDownLatch pleaseTake = new CountDownLatch(1);
162         final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
163         Thread t = newStartedThread(new CheckedRunnable() {
164             public void realRun() throws InterruptedException {
165                 pleaseTake.countDown();
166                 q.put(one);
167 
168                 pleaseInterrupt.countDown();
169                 try {
170                     q.put(99);
171                     shouldThrow();
172                 } catch (InterruptedException success) {}
173                 assertFalse(Thread.interrupted());
174             }});
175 
176         await(pleaseTake);
177         assertEquals(0, q.remainingCapacity());
178         try { assertSame(one, q.take()); }
179         catch (InterruptedException e) { threadUnexpectedException(e); }
180 
181         await(pleaseInterrupt);
182         assertThreadStaysAlive(t);
183         t.interrupt();
184         awaitTermination(t);
185         assertEquals(0, q.remainingCapacity());
186     }
187 
188     /**
189      * timed offer times out if elements not taken
190      */
testTimedOffer()191     public void testTimedOffer()      { testTimedOffer(false); }
testTimedOffer_fair()192     public void testTimedOffer_fair() { testTimedOffer(true); }
testTimedOffer(boolean fair)193     public void testTimedOffer(boolean fair) {
194         final SynchronousQueue q = new SynchronousQueue(fair);
195         final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
196         Thread t = newStartedThread(new CheckedRunnable() {
197             public void realRun() throws InterruptedException {
198                 long startTime = System.nanoTime();
199                 assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS));
200                 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
201                 pleaseInterrupt.countDown();
202                 try {
203                     q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS);
204                     shouldThrow();
205                 } catch (InterruptedException success) {}
206             }});
207 
208         await(pleaseInterrupt);
209         assertThreadStaysAlive(t);
210         t.interrupt();
211         awaitTermination(t);
212     }
213 
214     /**
215      * poll return null if no active putter
216      */
testPoll()217     public void testPoll()      { testPoll(false); }
testPoll_fair()218     public void testPoll_fair() { testPoll(true); }
testPoll(boolean fair)219     public void testPoll(boolean fair) {
220         final SynchronousQueue q = new SynchronousQueue(fair);
221         assertNull(q.poll());
222     }
223 
224     /**
225      * timed poll with zero timeout times out if no active putter
226      */
testTimedPoll0()227     public void testTimedPoll0()      { testTimedPoll0(false); }
testTimedPoll0_fair()228     public void testTimedPoll0_fair() { testTimedPoll0(true); }
testTimedPoll0(boolean fair)229     public void testTimedPoll0(boolean fair) {
230         final SynchronousQueue q = new SynchronousQueue(fair);
231         try { assertNull(q.poll(0, MILLISECONDS)); }
232         catch (InterruptedException e) { threadUnexpectedException(e); }
233     }
234 
235     /**
236      * timed poll with nonzero timeout times out if no active putter
237      */
testTimedPoll()238     public void testTimedPoll()      { testTimedPoll(false); }
testTimedPoll_fair()239     public void testTimedPoll_fair() { testTimedPoll(true); }
testTimedPoll(boolean fair)240     public void testTimedPoll(boolean fair) {
241         final SynchronousQueue q = new SynchronousQueue(fair);
242         long startTime = System.nanoTime();
243         try { assertNull(q.poll(timeoutMillis(), MILLISECONDS)); }
244         catch (InterruptedException e) { threadUnexpectedException(e); }
245         assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
246     }
247 
248     /**
249      * timed poll before a delayed offer times out, returning null;
250      * after offer succeeds; on interruption throws
251      */
testTimedPollWithOffer()252     public void testTimedPollWithOffer()      { testTimedPollWithOffer(false); }
testTimedPollWithOffer_fair()253     public void testTimedPollWithOffer_fair() { testTimedPollWithOffer(true); }
testTimedPollWithOffer(boolean fair)254     public void testTimedPollWithOffer(boolean fair) {
255         final SynchronousQueue q = new SynchronousQueue(fair);
256         final CountDownLatch pleaseOffer = new CountDownLatch(1);
257         final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
258         Thread t = newStartedThread(new CheckedRunnable() {
259             public void realRun() throws InterruptedException {
260                 long startTime = System.nanoTime();
261                 assertNull(q.poll(timeoutMillis(), MILLISECONDS));
262                 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
263 
264                 pleaseOffer.countDown();
265                 startTime = System.nanoTime();
266                 assertSame(zero, q.poll(LONG_DELAY_MS, MILLISECONDS));
267 
268                 Thread.currentThread().interrupt();
269                 try {
270                     q.poll(LONG_DELAY_MS, MILLISECONDS);
271                     shouldThrow();
272                 } catch (InterruptedException success) {}
273                 assertFalse(Thread.interrupted());
274 
275                 pleaseInterrupt.countDown();
276                 try {
277                     q.poll(LONG_DELAY_MS, MILLISECONDS);
278                     shouldThrow();
279                 } catch (InterruptedException success) {}
280                 assertFalse(Thread.interrupted());
281 
282                 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
283             }});
284 
285         await(pleaseOffer);
286         long startTime = System.nanoTime();
287         try { assertTrue(q.offer(zero, LONG_DELAY_MS, MILLISECONDS)); }
288         catch (InterruptedException e) { threadUnexpectedException(e); }
289         assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
290 
291         await(pleaseInterrupt);
292         assertThreadStaysAlive(t);
293         t.interrupt();
294         awaitTermination(t);
295     }
296 
297     /**
298      * peek() returns null if no active putter
299      */
300     public void testPeek()      { testPeek(false); }
301     public void testPeek_fair() { testPeek(true); }
302     public void testPeek(boolean fair) {
303         final SynchronousQueue q = new SynchronousQueue(fair);
304         assertNull(q.peek());
305     }
306 
307     /**
308      * element() throws NoSuchElementException if no active putter
309      */
310     public void testElement()      { testElement(false); }
311     public void testElement_fair() { testElement(true); }
312     public void testElement(boolean fair) {
313         final SynchronousQueue q = new SynchronousQueue(fair);
314         try {
315             q.element();
316             shouldThrow();
317         } catch (NoSuchElementException success) {}
318     }
319 
320     /**
321      * remove() throws NoSuchElementException if no active putter
322      */
323     public void testRemove()      { testRemove(false); }
324     public void testRemove_fair() { testRemove(true); }
325     public void testRemove(boolean fair) {
326         final SynchronousQueue q = new SynchronousQueue(fair);
327         try {
328             q.remove();
329             shouldThrow();
330         } catch (NoSuchElementException success) {}
331     }
332 
333     /**
334      * contains returns false
335      */
336     public void testContains()      { testContains(false); }
337     public void testContains_fair() { testContains(true); }
338     public void testContains(boolean fair) {
339         final SynchronousQueue q = new SynchronousQueue(fair);
340         assertFalse(q.contains(zero));
341     }
342 
343     /**
344      * clear ensures isEmpty
345      */
346     public void testClear()      { testClear(false); }
347     public void testClear_fair() { testClear(true); }
348     public void testClear(boolean fair) {
349         final SynchronousQueue q = new SynchronousQueue(fair);
350         q.clear();
351         assertTrue(q.isEmpty());
352     }
353 
354     /**
355      * containsAll returns false unless empty
356      */
357     public void testContainsAll()      { testContainsAll(false); }
358     public void testContainsAll_fair() { testContainsAll(true); }
359     public void testContainsAll(boolean fair) {
360         final SynchronousQueue q = new SynchronousQueue(fair);
361         Integer[] empty = new Integer[0];
362         assertTrue(q.containsAll(Arrays.asList(empty)));
363         Integer[] ints = new Integer[1]; ints[0] = zero;
364         assertFalse(q.containsAll(Arrays.asList(ints)));
365     }
366 
367     /**
368      * retainAll returns false
369      */
370     public void testRetainAll()      { testRetainAll(false); }
371     public void testRetainAll_fair() { testRetainAll(true); }
372     public void testRetainAll(boolean fair) {
373         final SynchronousQueue q = new SynchronousQueue(fair);
374         Integer[] empty = new Integer[0];
375         assertFalse(q.retainAll(Arrays.asList(empty)));
376         Integer[] ints = new Integer[1]; ints[0] = zero;
377         assertFalse(q.retainAll(Arrays.asList(ints)));
378     }
379 
380     /**
381      * removeAll returns false
382      */
383     public void testRemoveAll()      { testRemoveAll(false); }
384     public void testRemoveAll_fair() { testRemoveAll(true); }
385     public void testRemoveAll(boolean fair) {
386         final SynchronousQueue q = new SynchronousQueue(fair);
387         Integer[] empty = new Integer[0];
388         assertFalse(q.removeAll(Arrays.asList(empty)));
389         Integer[] ints = new Integer[1]; ints[0] = zero;
390         assertFalse(q.containsAll(Arrays.asList(ints)));
391     }
392 
393     /**
394      * toArray is empty
395      */
396     public void testToArray()      { testToArray(false); }
397     public void testToArray_fair() { testToArray(true); }
398     public void testToArray(boolean fair) {
399         final SynchronousQueue q = new SynchronousQueue(fair);
400         Object[] o = q.toArray();
401         assertEquals(0, o.length);
402     }
403 
404     /**
405      * toArray(Integer array) returns its argument with the first
406      * element (if present) nulled out
407      */
408     public void testToArray2()      { testToArray2(false); }
409     public void testToArray2_fair() { testToArray2(true); }
410     public void testToArray2(boolean fair) {
411         final SynchronousQueue<Integer> q
412             = new SynchronousQueue<Integer>(fair);
413         Integer[] a;
414 
415         a = new Integer[0];
416         assertSame(a, q.toArray(a));
417 
418         a = new Integer[3];
419         Arrays.fill(a, 42);
420         assertSame(a, q.toArray(a));
421         assertNull(a[0]);
422         for (int i = 1; i < a.length; i++)
423             assertEquals(42, (int) a[i]);
424     }
425 
426     /**
427      * toArray(null) throws NPE
428      */
429     public void testToArray_null()      { testToArray_null(false); }
430     public void testToArray_null_fair() { testToArray_null(true); }
431     public void testToArray_null(boolean fair) {
432         final SynchronousQueue q = new SynchronousQueue(fair);
433         try {
434             Object[] o = q.toArray(null);
435             shouldThrow();
436         } catch (NullPointerException success) {}
437     }
438 
439     /**
440      * iterator does not traverse any elements
441      */
442     public void testIterator()      { testIterator(false); }
443     public void testIterator_fair() { testIterator(true); }
444     public void testIterator(boolean fair) {
445         assertIteratorExhausted(new SynchronousQueue(fair).iterator());
446     }
447 
448     /**
449      * iterator remove throws ISE
450      */
451     public void testIteratorRemove()      { testIteratorRemove(false); }
452     public void testIteratorRemove_fair() { testIteratorRemove(true); }
453     public void testIteratorRemove(boolean fair) {
454         final SynchronousQueue q = new SynchronousQueue(fair);
455         Iterator it = q.iterator();
456         try {
457             it.remove();
458             shouldThrow();
459         } catch (IllegalStateException success) {}
460     }
461 
462     /**
463      * toString returns a non-null string
464      */
465     public void testToString()      { testToString(false); }
466     public void testToString_fair() { testToString(true); }
467     public void testToString(boolean fair) {
468         final SynchronousQueue q = new SynchronousQueue(fair);
469         String s = q.toString();
470         assertNotNull(s);
471     }
472 
473     /**
474      * offer transfers elements across Executor tasks
475      */
476     public void testOfferInExecutor()      { testOfferInExecutor(false); }
477     public void testOfferInExecutor_fair() { testOfferInExecutor(true); }
478     public void testOfferInExecutor(boolean fair) {
479         final SynchronousQueue q = new SynchronousQueue(fair);
480         final CheckedBarrier threadsStarted = new CheckedBarrier(2);
481         final ExecutorService executor = Executors.newFixedThreadPool(2);
482         try (PoolCleaner cleaner = cleaner(executor)) {
483 
484             executor.execute(new CheckedRunnable() {
485                 public void realRun() throws InterruptedException {
486                     assertFalse(q.offer(one));
487                     threadsStarted.await();
488                     assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
489                     assertEquals(0, q.remainingCapacity());
490                 }});
491 
492             executor.execute(new CheckedRunnable() {
493                 public void realRun() throws InterruptedException {
494                     threadsStarted.await();
495                     assertSame(one, q.take());
496                 }});
497         }
498     }
499 
500     /**
501      * timed poll retrieves elements across Executor threads
502      */
503     public void testPollInExecutor()      { testPollInExecutor(false); }
504     public void testPollInExecutor_fair() { testPollInExecutor(true); }
505     public void testPollInExecutor(boolean fair) {
506         final SynchronousQueue q = new SynchronousQueue(fair);
507         final CheckedBarrier threadsStarted = new CheckedBarrier(2);
508         final ExecutorService executor = Executors.newFixedThreadPool(2);
509         try (PoolCleaner cleaner = cleaner(executor)) {
510             executor.execute(new CheckedRunnable() {
511                 public void realRun() throws InterruptedException {
512                     assertNull(q.poll());
513                     threadsStarted.await();
514                     assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
515                     assertTrue(q.isEmpty());
516                 }});
517 
518             executor.execute(new CheckedRunnable() {
519                 public void realRun() throws InterruptedException {
520                     threadsStarted.await();
521                     q.put(one);
522                 }});
523         }
524     }
525 
526     /**
527      * a deserialized serialized queue is usable
528      */
529     public void testSerialization() {
530         final SynchronousQueue x = new SynchronousQueue();
531         final SynchronousQueue y = new SynchronousQueue(false);
532         final SynchronousQueue z = new SynchronousQueue(true);
533         assertSerialEquals(x, y);
534         assertNotSerialEquals(x, z);
535         SynchronousQueue[] qs = { x, y, z };
536         for (SynchronousQueue q : qs) {
537             SynchronousQueue clone = serialClone(q);
538             assertNotSame(q, clone);
539             assertSerialEquals(q, clone);
540             assertTrue(clone.isEmpty());
541             assertEquals(0, clone.size());
542             assertEquals(0, clone.remainingCapacity());
543             assertFalse(clone.offer(zero));
544         }
545     }
546 
547     /**
548      * drainTo(c) of empty queue doesn't transfer elements
549      */
550     public void testDrainTo()      { testDrainTo(false); }
551     public void testDrainTo_fair() { testDrainTo(true); }
552     public void testDrainTo(boolean fair) {
553         final SynchronousQueue q = new SynchronousQueue(fair);
554         ArrayList l = new ArrayList();
555         q.drainTo(l);
556         assertEquals(0, q.size());
557         assertEquals(0, l.size());
558     }
559 
560     /**
561      * drainTo empties queue, unblocking a waiting put.
562      */
563     public void testDrainToWithActivePut()      { testDrainToWithActivePut(false); }
564     public void testDrainToWithActivePut_fair() { testDrainToWithActivePut(true); }
565     public void testDrainToWithActivePut(boolean fair) {
566         final SynchronousQueue q = new SynchronousQueue(fair);
567         Thread t = newStartedThread(new CheckedRunnable() {
568             public void realRun() throws InterruptedException {
569                 q.put(one);
570             }});
571 
572         ArrayList l = new ArrayList();
573         long startTime = System.nanoTime();
574         while (l.isEmpty()) {
575             q.drainTo(l);
576             if (millisElapsedSince(startTime) > LONG_DELAY_MS)
577                 fail("timed out");
578             Thread.yield();
579         }
580         assertTrue(l.size() == 1);
581         assertSame(one, l.get(0));
582         awaitTermination(t);
583     }
584 
585     /**
586      * drainTo(c, n) empties up to n elements of queue into c
587      */
588     public void testDrainToN() throws InterruptedException {
589         final SynchronousQueue q = new SynchronousQueue();
590         Thread t1 = newStartedThread(new CheckedRunnable() {
591             public void realRun() throws InterruptedException {
592                 q.put(one);
593             }});
594 
595         Thread t2 = newStartedThread(new CheckedRunnable() {
596             public void realRun() throws InterruptedException {
597                 q.put(two);
598             }});
599 
600         ArrayList l = new ArrayList();
601         int drained;
602         while ((drained = q.drainTo(l, 1)) == 0) Thread.yield();
603         assertEquals(1, drained);
604         assertEquals(1, l.size());
605         while ((drained = q.drainTo(l, 1)) == 0) Thread.yield();
606         assertEquals(1, drained);
607         assertEquals(2, l.size());
608         assertTrue(l.contains(one));
609         assertTrue(l.contains(two));
610         awaitTermination(t1);
611         awaitTermination(t2);
612     }
613 
614     /**
615      * remove(null), contains(null) always return false
616      */
617     public void testNeverContainsNull() {
618         Collection<?> q = new SynchronousQueue();
619         assertFalse(q.contains(null));
620         assertFalse(q.remove(null));
621     }
622 
623 }
624