1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/publicdomain/zero/1.0/
5  * Other contributors include Andrew Wright, Jeffrey Hayes,
6  * Pat Fisher, Mike Judd.
7  */
8 
9 package jsr166;
10 
11 import junit.framework.*;
12 import java.util.Arrays;
13 import java.util.ArrayList;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.NoSuchElementException;
17 import java.util.Queue;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.CountDownLatch;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.SynchronousQueue;
23 import static java.util.concurrent.TimeUnit.MILLISECONDS;
24 
25 public class SynchronousQueueTest extends JSR166TestCase {
26 
27     /**
28      * Any SynchronousQueue is both empty and full
29      */
testEmptyFull()30     public void testEmptyFull()      { testEmptyFull(false); }
testEmptyFull_fair()31     public void testEmptyFull_fair() { testEmptyFull(true); }
testEmptyFull(boolean fair)32     public void testEmptyFull(boolean fair) {
33         final SynchronousQueue q = new SynchronousQueue(fair);
34         assertTrue(q.isEmpty());
35         assertEquals(0, q.size());
36         assertEquals(0, q.remainingCapacity());
37         assertFalse(q.offer(zero));
38     }
39 
40     /**
41      * offer fails if no active taker
42      */
testOffer()43     public void testOffer()      { testOffer(false); }
testOffer_fair()44     public void testOffer_fair() { testOffer(true); }
testOffer(boolean fair)45     public void testOffer(boolean fair) {
46         SynchronousQueue q = new SynchronousQueue(fair);
47         assertFalse(q.offer(one));
48     }
49 
50     /**
51      * add throws IllegalStateException if no active taker
52      */
testAdd()53     public void testAdd()      { testAdd(false); }
testAdd_fair()54     public void testAdd_fair() { testAdd(true); }
testAdd(boolean fair)55     public void testAdd(boolean fair) {
56         SynchronousQueue q = new SynchronousQueue(fair);
57         assertEquals(0, q.remainingCapacity());
58         try {
59             q.add(one);
60             shouldThrow();
61         } catch (IllegalStateException success) {}
62     }
63 
64     /**
65      * addAll(this) throws IllegalArgumentException
66      */
testAddAll_self()67     public void testAddAll_self()      { testAddAll_self(false); }
testAddAll_self_fair()68     public void testAddAll_self_fair() { testAddAll_self(true); }
testAddAll_self(boolean fair)69     public void testAddAll_self(boolean fair) {
70         SynchronousQueue q = new SynchronousQueue(fair);
71         try {
72             q.addAll(q);
73             shouldThrow();
74         } catch (IllegalArgumentException success) {}
75     }
76 
77     /**
78      * addAll throws ISE if no active taker
79      */
testAddAll_ISE()80     public void testAddAll_ISE()      { testAddAll_ISE(false); }
testAddAll_ISE_fair()81     public void testAddAll_ISE_fair() { testAddAll_ISE(true); }
testAddAll_ISE(boolean fair)82     public void testAddAll_ISE(boolean fair) {
83         SynchronousQueue q = new SynchronousQueue(fair);
84         Integer[] ints = new Integer[1];
85         for (int i = 0; i < ints.length; i++)
86             ints[i] = i;
87         Collection<Integer> coll = Arrays.asList(ints);
88         try {
89             q.addAll(coll);
90             shouldThrow();
91         } catch (IllegalStateException success) {}
92     }
93 
94     /**
95      * put blocks interruptibly if no active taker
96      */
testBlockingPut()97     public void testBlockingPut()      { testBlockingPut(false); }
testBlockingPut_fair()98     public void testBlockingPut_fair() { testBlockingPut(true); }
testBlockingPut(boolean fair)99     public void testBlockingPut(boolean fair) {
100         final SynchronousQueue q = new SynchronousQueue(fair);
101         final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
102         Thread t = newStartedThread(new CheckedRunnable() {
103             public void realRun() throws InterruptedException {
104                 Thread.currentThread().interrupt();
105                 try {
106                     q.put(99);
107                     shouldThrow();
108                 } catch (InterruptedException success) {}
109                 assertFalse(Thread.interrupted());
110 
111                 pleaseInterrupt.countDown();
112                 try {
113                     q.put(99);
114                     shouldThrow();
115                 } catch (InterruptedException success) {}
116                 assertFalse(Thread.interrupted());
117             }});
118 
119         await(pleaseInterrupt);
120         assertThreadStaysAlive(t);
121         t.interrupt();
122         awaitTermination(t);
123         assertEquals(0, q.remainingCapacity());
124     }
125 
126     /**
127      * put blocks interruptibly waiting for take
128      */
testPutWithTake()129     public void testPutWithTake()      { testPutWithTake(false); }
testPutWithTake_fair()130     public void testPutWithTake_fair() { testPutWithTake(true); }
testPutWithTake(boolean fair)131     public void testPutWithTake(boolean fair) {
132         final SynchronousQueue q = new SynchronousQueue(fair);
133         final CountDownLatch pleaseTake = new CountDownLatch(1);
134         final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
135         Thread t = newStartedThread(new CheckedRunnable() {
136             public void realRun() throws InterruptedException {
137                 pleaseTake.countDown();
138                 q.put(one);
139 
140                 pleaseInterrupt.countDown();
141                 try {
142                     q.put(99);
143                     shouldThrow();
144                 } catch (InterruptedException success) {}
145                 assertFalse(Thread.interrupted());
146             }});
147 
148         await(pleaseTake);
149         assertEquals(0, q.remainingCapacity());
150         try { assertSame(one, q.take()); }
151         catch (InterruptedException e) { threadUnexpectedException(e); }
152 
153         await(pleaseInterrupt);
154         assertThreadStaysAlive(t);
155         t.interrupt();
156         awaitTermination(t);
157         assertEquals(0, q.remainingCapacity());
158     }
159 
160     /**
161      * timed offer times out if elements not taken
162      */
testTimedOffer()163     public void testTimedOffer()      { testTimedOffer(false); }
testTimedOffer_fair()164     public void testTimedOffer_fair() { testTimedOffer(true); }
testTimedOffer(boolean fair)165     public void testTimedOffer(boolean fair) {
166         final SynchronousQueue q = new SynchronousQueue(fair);
167         final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
168         Thread t = newStartedThread(new CheckedRunnable() {
169             public void realRun() throws InterruptedException {
170                 long startTime = System.nanoTime();
171                 assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS));
172                 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
173                 pleaseInterrupt.countDown();
174                 try {
175                     q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS);
176                     shouldThrow();
177                 } catch (InterruptedException success) {}
178             }});
179 
180         await(pleaseInterrupt);
181         assertThreadStaysAlive(t);
182         t.interrupt();
183         awaitTermination(t);
184     }
185 
186     /**
187      * poll return null if no active putter
188      */
testPoll()189     public void testPoll()      { testPoll(false); }
testPoll_fair()190     public void testPoll_fair() { testPoll(true); }
testPoll(boolean fair)191     public void testPoll(boolean fair) {
192         final SynchronousQueue q = new SynchronousQueue(fair);
193         assertNull(q.poll());
194     }
195 
196     /**
197      * timed poll with zero timeout times out if no active putter
198      */
testTimedPoll0()199     public void testTimedPoll0()      { testTimedPoll0(false); }
testTimedPoll0_fair()200     public void testTimedPoll0_fair() { testTimedPoll0(true); }
testTimedPoll0(boolean fair)201     public void testTimedPoll0(boolean fair) {
202         final SynchronousQueue q = new SynchronousQueue(fair);
203         try { assertNull(q.poll(0, MILLISECONDS)); }
204         catch (InterruptedException e) { threadUnexpectedException(e); }
205     }
206 
207     /**
208      * timed poll with nonzero timeout times out if no active putter
209      */
testTimedPoll()210     public void testTimedPoll()      { testTimedPoll(false); }
testTimedPoll_fair()211     public void testTimedPoll_fair() { testTimedPoll(true); }
testTimedPoll(boolean fair)212     public void testTimedPoll(boolean fair) {
213         final SynchronousQueue q = new SynchronousQueue(fair);
214         long startTime = System.nanoTime();
215         try { assertNull(q.poll(timeoutMillis(), MILLISECONDS)); }
216         catch (InterruptedException e) { threadUnexpectedException(e); }
217         assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
218     }
219 
220     /**
221      * timed poll before a delayed offer times out, returning null;
222      * after offer succeeds; on interruption throws
223      */
testTimedPollWithOffer()224     public void testTimedPollWithOffer()      { testTimedPollWithOffer(false); }
testTimedPollWithOffer_fair()225     public void testTimedPollWithOffer_fair() { testTimedPollWithOffer(true); }
testTimedPollWithOffer(boolean fair)226     public void testTimedPollWithOffer(boolean fair) {
227         final SynchronousQueue q = new SynchronousQueue(fair);
228         final CountDownLatch pleaseOffer = new CountDownLatch(1);
229         final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
230         Thread t = newStartedThread(new CheckedRunnable() {
231             public void realRun() throws InterruptedException {
232                 long startTime = System.nanoTime();
233                 assertNull(q.poll(timeoutMillis(), MILLISECONDS));
234                 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
235 
236                 pleaseOffer.countDown();
237                 startTime = System.nanoTime();
238                 assertSame(zero, q.poll(LONG_DELAY_MS, MILLISECONDS));
239                 assertTrue(millisElapsedSince(startTime) < MEDIUM_DELAY_MS);
240 
241                 Thread.currentThread().interrupt();
242                 try {
243                     q.poll(LONG_DELAY_MS, MILLISECONDS);
244                     shouldThrow();
245                 } catch (InterruptedException success) {}
246                 assertFalse(Thread.interrupted());
247 
248                 pleaseInterrupt.countDown();
249                 try {
250                     q.poll(LONG_DELAY_MS, MILLISECONDS);
251                     shouldThrow();
252                 } catch (InterruptedException success) {}
253                 assertFalse(Thread.interrupted());
254             }});
255 
256         await(pleaseOffer);
257         long startTime = System.nanoTime();
258         try { assertTrue(q.offer(zero, LONG_DELAY_MS, MILLISECONDS)); }
259         catch (InterruptedException e) { threadUnexpectedException(e); }
260         assertTrue(millisElapsedSince(startTime) < MEDIUM_DELAY_MS);
261 
262         await(pleaseInterrupt);
263         assertThreadStaysAlive(t);
264         t.interrupt();
265         awaitTermination(t);
266     }
267 
268     /**
269      * peek() returns null if no active putter
270      */
271     public void testPeek()      { testPeek(false); }
272     public void testPeek_fair() { testPeek(true); }
273     public void testPeek(boolean fair) {
274         final SynchronousQueue q = new SynchronousQueue(fair);
275         assertNull(q.peek());
276     }
277 
278     /**
279      * element() throws NoSuchElementException if no active putter
280      */
281     public void testElement()      { testElement(false); }
282     public void testElement_fair() { testElement(true); }
283     public void testElement(boolean fair) {
284         final SynchronousQueue q = new SynchronousQueue(fair);
285         try {
286             q.element();
287             shouldThrow();
288         } catch (NoSuchElementException success) {}
289     }
290 
291     /**
292      * remove() throws NoSuchElementException if no active putter
293      */
294     public void testRemove()      { testRemove(false); }
295     public void testRemove_fair() { testRemove(true); }
296     public void testRemove(boolean fair) {
297         final SynchronousQueue q = new SynchronousQueue(fair);
298         try {
299             q.remove();
300             shouldThrow();
301         } catch (NoSuchElementException success) {}
302     }
303 
304     /**
305      * contains returns false
306      */
307     public void testContains()      { testContains(false); }
308     public void testContains_fair() { testContains(true); }
309     public void testContains(boolean fair) {
310         final SynchronousQueue q = new SynchronousQueue(fair);
311         assertFalse(q.contains(zero));
312     }
313 
314     /**
315      * clear ensures isEmpty
316      */
317     public void testClear()      { testClear(false); }
318     public void testClear_fair() { testClear(true); }
319     public void testClear(boolean fair) {
320         final SynchronousQueue q = new SynchronousQueue(fair);
321         q.clear();
322         assertTrue(q.isEmpty());
323     }
324 
325     /**
326      * containsAll returns false unless empty
327      */
328     public void testContainsAll()      { testContainsAll(false); }
329     public void testContainsAll_fair() { testContainsAll(true); }
330     public void testContainsAll(boolean fair) {
331         final SynchronousQueue q = new SynchronousQueue(fair);
332         Integer[] empty = new Integer[0];
333         assertTrue(q.containsAll(Arrays.asList(empty)));
334         Integer[] ints = new Integer[1]; ints[0] = zero;
335         assertFalse(q.containsAll(Arrays.asList(ints)));
336     }
337 
338     /**
339      * retainAll returns false
340      */
341     public void testRetainAll()      { testRetainAll(false); }
342     public void testRetainAll_fair() { testRetainAll(true); }
343     public void testRetainAll(boolean fair) {
344         final SynchronousQueue q = new SynchronousQueue(fair);
345         Integer[] empty = new Integer[0];
346         assertFalse(q.retainAll(Arrays.asList(empty)));
347         Integer[] ints = new Integer[1]; ints[0] = zero;
348         assertFalse(q.retainAll(Arrays.asList(ints)));
349     }
350 
351     /**
352      * removeAll returns false
353      */
354     public void testRemoveAll()      { testRemoveAll(false); }
355     public void testRemoveAll_fair() { testRemoveAll(true); }
356     public void testRemoveAll(boolean fair) {
357         final SynchronousQueue q = new SynchronousQueue(fair);
358         Integer[] empty = new Integer[0];
359         assertFalse(q.removeAll(Arrays.asList(empty)));
360         Integer[] ints = new Integer[1]; ints[0] = zero;
361         assertFalse(q.containsAll(Arrays.asList(ints)));
362     }
363 
364     /**
365      * toArray is empty
366      */
367     public void testToArray()      { testToArray(false); }
368     public void testToArray_fair() { testToArray(true); }
369     public void testToArray(boolean fair) {
370         final SynchronousQueue q = new SynchronousQueue(fair);
371         Object[] o = q.toArray();
372         assertEquals(0, o.length);
373     }
374 
375     /**
376      * toArray(Integer array) returns its argument with the first
377      * element (if present) nulled out
378      */
379     public void testToArray2()      { testToArray2(false); }
380     public void testToArray2_fair() { testToArray2(true); }
381     public void testToArray2(boolean fair) {
382         final SynchronousQueue<Integer> q
383             = new SynchronousQueue<Integer>(fair);
384         Integer[] a;
385 
386         a = new Integer[0];
387         assertSame(a, q.toArray(a));
388 
389         a = new Integer[3];
390         Arrays.fill(a, 42);
391         assertSame(a, q.toArray(a));
392         assertNull(a[0]);
393         for (int i = 1; i < a.length; i++)
394             assertEquals(42, (int) a[i]);
395     }
396 
397     /**
398      * toArray(null) throws NPE
399      */
400     public void testToArray_null()      { testToArray_null(false); }
401     public void testToArray_null_fair() { testToArray_null(true); }
402     public void testToArray_null(boolean fair) {
403         final SynchronousQueue q = new SynchronousQueue(fair);
404         try {
405             Object o[] = q.toArray(null);
406             shouldThrow();
407         } catch (NullPointerException success) {}
408     }
409 
410     /**
411      * iterator does not traverse any elements
412      */
413     public void testIterator()      { testIterator(false); }
414     public void testIterator_fair() { testIterator(true); }
415     public void testIterator(boolean fair) {
416         final SynchronousQueue q = new SynchronousQueue(fair);
417         Iterator it = q.iterator();
418         assertFalse(it.hasNext());
419         try {
420             Object x = it.next();
421             shouldThrow();
422         } catch (NoSuchElementException success) {}
423     }
424 
425     /**
426      * iterator remove throws ISE
427      */
428     public void testIteratorRemove()      { testIteratorRemove(false); }
429     public void testIteratorRemove_fair() { testIteratorRemove(true); }
430     public void testIteratorRemove(boolean fair) {
431         final SynchronousQueue q = new SynchronousQueue(fair);
432         Iterator it = q.iterator();
433         try {
434             it.remove();
435             shouldThrow();
436         } catch (IllegalStateException success) {}
437     }
438 
439     /**
440      * toString returns a non-null string
441      */
442     public void testToString()      { testToString(false); }
443     public void testToString_fair() { testToString(true); }
444     public void testToString(boolean fair) {
445         final SynchronousQueue q = new SynchronousQueue(fair);
446         String s = q.toString();
447         assertNotNull(s);
448     }
449 
450     /**
451      * offer transfers elements across Executor tasks
452      */
453     public void testOfferInExecutor()      { testOfferInExecutor(false); }
454     public void testOfferInExecutor_fair() { testOfferInExecutor(true); }
455     public void testOfferInExecutor(boolean fair) {
456         final SynchronousQueue q = new SynchronousQueue(fair);
457         ExecutorService executor = Executors.newFixedThreadPool(2);
458         final CheckedBarrier threadsStarted = new CheckedBarrier(2);
459 
460         executor.execute(new CheckedRunnable() {
461             public void realRun() throws InterruptedException {
462                 assertFalse(q.offer(one));
463                 threadsStarted.await();
464                 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
465                 assertEquals(0, q.remainingCapacity());
466             }});
467 
468         executor.execute(new CheckedRunnable() {
469             public void realRun() throws InterruptedException {
470                 threadsStarted.await();
471                 assertSame(one, q.take());
472             }});
473 
474         joinPool(executor);
475     }
476 
477     /**
478      * timed poll retrieves elements across Executor threads
479      */
480     public void testPollInExecutor()      { testPollInExecutor(false); }
481     public void testPollInExecutor_fair() { testPollInExecutor(true); }
482     public void testPollInExecutor(boolean fair) {
483         final SynchronousQueue q = new SynchronousQueue(fair);
484         final CheckedBarrier threadsStarted = new CheckedBarrier(2);
485         ExecutorService executor = Executors.newFixedThreadPool(2);
486         executor.execute(new CheckedRunnable() {
487             public void realRun() throws InterruptedException {
488                 assertNull(q.poll());
489                 threadsStarted.await();
490                 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
491                 assertTrue(q.isEmpty());
492             }});
493 
494         executor.execute(new CheckedRunnable() {
495             public void realRun() throws InterruptedException {
496                 threadsStarted.await();
497                 q.put(one);
498             }});
499 
500         joinPool(executor);
501     }
502 
503     /**
504      * a deserialized serialized queue is usable
505      */
506     public void testSerialization() {
507         final SynchronousQueue x = new SynchronousQueue();
508         final SynchronousQueue y = new SynchronousQueue(false);
509         final SynchronousQueue z = new SynchronousQueue(true);
510         assertSerialEquals(x, y);
511         assertNotSerialEquals(x, z);
512         SynchronousQueue[] qs = { x, y, z };
513         for (SynchronousQueue q : qs) {
514             SynchronousQueue clone = serialClone(q);
515             assertNotSame(q, clone);
516             assertSerialEquals(q, clone);
517             assertTrue(clone.isEmpty());
518             assertEquals(0, clone.size());
519             assertEquals(0, clone.remainingCapacity());
520             assertFalse(clone.offer(zero));
521         }
522     }
523 
524     /**
525      * drainTo(c) of empty queue doesn't transfer elements
526      */
527     public void testDrainTo()      { testDrainTo(false); }
528     public void testDrainTo_fair() { testDrainTo(true); }
529     public void testDrainTo(boolean fair) {
530         final SynchronousQueue q = new SynchronousQueue(fair);
531         ArrayList l = new ArrayList();
532         q.drainTo(l);
533         assertEquals(0, q.size());
534         assertEquals(0, l.size());
535     }
536 
537     /**
538      * drainTo empties queue, unblocking a waiting put.
539      */
540     public void testDrainToWithActivePut()      { testDrainToWithActivePut(false); }
541     public void testDrainToWithActivePut_fair() { testDrainToWithActivePut(true); }
542     public void testDrainToWithActivePut(boolean fair) {
543         final SynchronousQueue q = new SynchronousQueue(fair);
544         Thread t = newStartedThread(new CheckedRunnable() {
545             public void realRun() throws InterruptedException {
546                 q.put(one);
547             }});
548 
549         ArrayList l = new ArrayList();
550         long startTime = System.nanoTime();
551         while (l.isEmpty()) {
552             q.drainTo(l);
553             if (millisElapsedSince(startTime) > LONG_DELAY_MS)
554                 fail("timed out");
555             Thread.yield();
556         }
557         assertTrue(l.size() == 1);
558         assertSame(one, l.get(0));
559         awaitTermination(t);
560     }
561 
562     /**
563      * drainTo(c, n) empties up to n elements of queue into c
564      */
565     public void testDrainToN() throws InterruptedException {
566         final SynchronousQueue q = new SynchronousQueue();
567         Thread t1 = newStartedThread(new CheckedRunnable() {
568             public void realRun() throws InterruptedException {
569                 q.put(one);
570             }});
571 
572         Thread t2 = newStartedThread(new CheckedRunnable() {
573             public void realRun() throws InterruptedException {
574                 q.put(two);
575             }});
576 
577         ArrayList l = new ArrayList();
578         delay(SHORT_DELAY_MS);
579         q.drainTo(l, 1);
580         assertEquals(1, l.size());
581         q.drainTo(l, 1);
582         assertEquals(2, l.size());
583         assertTrue(l.contains(one));
584         assertTrue(l.contains(two));
585         awaitTermination(t1);
586         awaitTermination(t2);
587     }
588 
589 }
590