1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/publicdomain/zero/1.0/
5  * Other contributors include John Vint
6  */
7 
8 package jsr166;
9 
10 import static java.util.concurrent.TimeUnit.MILLISECONDS;
11 
12 import java.util.ArrayList;
13 import java.util.Arrays;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.List;
17 import java.util.NoSuchElementException;
18 import java.util.Queue;
19 import java.util.concurrent.BlockingQueue;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.CountDownLatch;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.LinkedTransferQueue;
25 
26 import junit.framework.Test;
27 
28 @SuppressWarnings({"unchecked", "rawtypes"})
29 public class LinkedTransferQueueTest extends JSR166TestCase {
30     static class Implementation implements CollectionImplementation {
31         public Class<?> klazz() { return LinkedTransferQueue.class; }
32         public Collection emptyCollection() { return new LinkedTransferQueue(); }
33         public Object makeElement(int i) { return i; }
34         public boolean isConcurrent() { return true; }
35         public boolean permitsNulls() { return false; }
36     }
37 
38     // android-note: These tests have been moved into their own separate
39     // classes to work around CTS issues:
40     // LinkedTransferQueueBlockingQueueTest.java
41     // LinkedTransferQueueCollectionTest.java
42     //
43     // public static class Generic extends BlockingQueueTest {
44     //     protected BlockingQueue emptyCollection() {
45     //         return new LinkedTransferQueue();
46     //     }
47     // }
48 
49     // android-note: Removed because the CTS runner does a bad job of
50     // retrying tests that have suite() declarations.
51     //
52     // public static void main(String[] args) {
53     //     main(suite(), args);
54     // }
55     // public static Test suite() {
56     //     return newTestSuite(LinkedTransferQueueTest.class,
57     //                         new Generic().testSuite(),
58     //                         CollectionTest.testSuite(new Implementation()));
59     // }
60 
61     /**
62      * Constructor builds new queue with size being zero and empty
63      * being true
64      */
65     public void testConstructor1() {
66         assertEquals(0, new LinkedTransferQueue().size());
67         assertTrue(new LinkedTransferQueue().isEmpty());
68     }
69 
70     /**
71      * Initializing constructor with null collection throws
72      * NullPointerException
73      */
74     public void testConstructor2() {
75         try {
76             new LinkedTransferQueue(null);
77             shouldThrow();
78         } catch (NullPointerException success) {}
79     }
80 
81     /**
82      * Initializing from Collection of null elements throws
83      * NullPointerException
84      */
85     public void testConstructor3() {
86         Collection<Integer> elements = Arrays.asList(new Integer[SIZE]);
87         try {
88             new LinkedTransferQueue(elements);
89             shouldThrow();
90         } catch (NullPointerException success) {}
91     }
92 
93     /**
94      * Initializing constructor with a collection containing some null elements
95      * throws NullPointerException
96      */
97     public void testConstructor4() {
98         Integer[] ints = new Integer[SIZE];
99         for (int i = 0; i < SIZE - 1; ++i)
100             ints[i] = i;
101         Collection<Integer> elements = Arrays.asList(ints);
102         try {
103             new LinkedTransferQueue(elements);
104             shouldThrow();
105         } catch (NullPointerException success) {}
106     }
107 
108     /**
109      * Queue contains all elements of the collection it is initialized by
110      */
111     public void testConstructor5() {
112         Integer[] ints = new Integer[SIZE];
113         for (int i = 0; i < SIZE; ++i) {
114             ints[i] = i;
115         }
116         List intList = Arrays.asList(ints);
117         LinkedTransferQueue q
118             = new LinkedTransferQueue(intList);
119         assertEquals(q.size(), intList.size());
120         assertEquals(q.toString(), intList.toString());
121         assertTrue(Arrays.equals(q.toArray(),
122                                      intList.toArray()));
123         assertTrue(Arrays.equals(q.toArray(new Object[0]),
124                                  intList.toArray(new Object[0])));
125         assertTrue(Arrays.equals(q.toArray(new Object[SIZE]),
126                                  intList.toArray(new Object[SIZE])));
127         for (int i = 0; i < SIZE; ++i) {
128             assertEquals(ints[i], q.poll());
129         }
130     }
131 
132     /**
133      * remainingCapacity() always returns Integer.MAX_VALUE
134      */
135     public void testRemainingCapacity() {
136         BlockingQueue q = populatedQueue(SIZE);
137         for (int i = 0; i < SIZE; ++i) {
138             assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
139             assertEquals(SIZE - i, q.size());
140             assertEquals(i, q.remove());
141         }
142         for (int i = 0; i < SIZE; ++i) {
143             assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
144             assertEquals(i, q.size());
145             assertTrue(q.add(i));
146         }
147     }
148 
149     /**
150      * addAll(this) throws IllegalArgumentException
151      */
152     public void testAddAllSelf() {
153         LinkedTransferQueue q = populatedQueue(SIZE);
154         try {
155             q.addAll(q);
156             shouldThrow();
157         } catch (IllegalArgumentException success) {}
158     }
159 
160     /**
161      * addAll of a collection with any null elements throws
162      * NullPointerException after possibly adding some elements
163      */
164     public void testAddAll3() {
165         LinkedTransferQueue q = new LinkedTransferQueue();
166         Integer[] ints = new Integer[SIZE];
167         for (int i = 0; i < SIZE - 1; ++i)
168             ints[i] = i;
169         try {
170             q.addAll(Arrays.asList(ints));
171             shouldThrow();
172         } catch (NullPointerException success) {}
173     }
174 
175     /**
176      * Queue contains all elements, in traversal order, of successful addAll
177      */
178     public void testAddAll5() {
179         Integer[] empty = new Integer[0];
180         Integer[] ints = new Integer[SIZE];
181         for (int i = 0; i < SIZE; ++i) {
182             ints[i] = i;
183         }
184         LinkedTransferQueue q = new LinkedTransferQueue();
185         assertFalse(q.addAll(Arrays.asList(empty)));
186         assertTrue(q.addAll(Arrays.asList(ints)));
187         for (int i = 0; i < SIZE; ++i) {
188             assertEquals(ints[i], q.poll());
189         }
190     }
191 
192     /**
193      * all elements successfully put are contained
194      */
195     public void testPut() {
196         LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
197         for (int i = 0; i < SIZE; ++i) {
198             assertEquals(i, q.size());
199             q.put(i);
200             assertTrue(q.contains(i));
201         }
202     }
203 
204     /**
205      * take retrieves elements in FIFO order
206      */
207     public void testTake() throws InterruptedException {
208         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
209         for (int i = 0; i < SIZE; ++i) {
210             assertEquals(i, (int) q.take());
211         }
212     }
213 
214     /**
215      * take removes existing elements until empty, then blocks interruptibly
216      */
217     public void testBlockingTake() throws InterruptedException {
218         final BlockingQueue q = populatedQueue(SIZE);
219         final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
220         Thread t = newStartedThread(new CheckedRunnable() {
221             public void realRun() throws InterruptedException {
222                 for (int i = 0; i < SIZE; ++i) {
223                     assertEquals(i, q.take());
224                 }
225 
226                 Thread.currentThread().interrupt();
227                 try {
228                     q.take();
229                     shouldThrow();
230                 } catch (InterruptedException success) {}
231                 assertFalse(Thread.interrupted());
232 
233                 pleaseInterrupt.countDown();
234                 try {
235                     q.take();
236                     shouldThrow();
237                 } catch (InterruptedException success) {}
238                 assertFalse(Thread.interrupted());
239             }});
240 
241         await(pleaseInterrupt);
242         assertThreadStaysAlive(t);
243         t.interrupt();
244         awaitTermination(t);
245     }
246 
247     /**
248      * poll succeeds unless empty
249      */
250     public void testPoll() throws InterruptedException {
251         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
252         for (int i = 0; i < SIZE; ++i) {
253             assertEquals(i, (int) q.poll());
254         }
255         assertNull(q.poll());
256         checkEmpty(q);
257     }
258 
259     /**
260      * timed poll with zero timeout succeeds when non-empty, else times out
261      */
262     public void testTimedPoll0() throws InterruptedException {
263         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
264         for (int i = 0; i < SIZE; ++i) {
265             assertEquals(i, (int) q.poll(0, MILLISECONDS));
266         }
267         assertNull(q.poll(0, MILLISECONDS));
268         checkEmpty(q);
269     }
270 
271     /**
272      * timed poll with nonzero timeout succeeds when non-empty, else times out
273      */
274     public void testTimedPoll() throws InterruptedException {
275         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
276         long startTime = System.nanoTime();
277         for (int i = 0; i < SIZE; ++i)
278             assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
279         assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
280 
281         startTime = System.nanoTime();
282         assertNull(q.poll(timeoutMillis(), MILLISECONDS));
283         assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
284         checkEmpty(q);
285     }
286 
287     /**
288      * Interrupted timed poll throws InterruptedException instead of
289      * returning timeout status
290      */
291     public void testInterruptedTimedPoll() throws InterruptedException {
292         final BlockingQueue<Integer> q = populatedQueue(SIZE);
293         final CountDownLatch aboutToWait = new CountDownLatch(1);
294         Thread t = newStartedThread(new CheckedRunnable() {
295             public void realRun() throws InterruptedException {
296                 long startTime = System.nanoTime();
297                 for (int i = 0; i < SIZE; ++i)
298                     assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
299                 aboutToWait.countDown();
300                 try {
301                     q.poll(LONG_DELAY_MS, MILLISECONDS);
302                     shouldThrow();
303                 } catch (InterruptedException success) {}
304                 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
305             }});
306 
307         aboutToWait.await();
308         waitForThreadToEnterWaitState(t);
309         t.interrupt();
310         awaitTermination(t);
311         checkEmpty(q);
312     }
313 
314     /**
315      * timed poll after thread interrupted throws InterruptedException
316      * instead of returning timeout status
317      */
318     public void testTimedPollAfterInterrupt() throws InterruptedException {
319         final BlockingQueue<Integer> q = populatedQueue(SIZE);
320         Thread t = newStartedThread(new CheckedRunnable() {
321             public void realRun() throws InterruptedException {
322                 long startTime = System.nanoTime();
323                 Thread.currentThread().interrupt();
324                 for (int i = 0; i < SIZE; ++i)
325                     assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
326                 try {
327                     q.poll(LONG_DELAY_MS, MILLISECONDS);
328                     shouldThrow();
329                 } catch (InterruptedException success) {}
330                 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
331             }});
332 
333         awaitTermination(t);
334         checkEmpty(q);
335     }
336 
337     /**
338      * peek returns next element, or null if empty
339      */
340     public void testPeek() throws InterruptedException {
341         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
342         for (int i = 0; i < SIZE; ++i) {
343             assertEquals(i, (int) q.peek());
344             assertEquals(i, (int) q.poll());
345             assertTrue(q.peek() == null ||
346                        i != (int) q.peek());
347         }
348         assertNull(q.peek());
349         checkEmpty(q);
350     }
351 
352     /**
353      * element returns next element, or throws NoSuchElementException if empty
354      */
355     public void testElement() throws InterruptedException {
356         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
357         for (int i = 0; i < SIZE; ++i) {
358             assertEquals(i, (int) q.element());
359             assertEquals(i, (int) q.poll());
360         }
361         try {
362             q.element();
363             shouldThrow();
364         } catch (NoSuchElementException success) {}
365         checkEmpty(q);
366     }
367 
368     /**
369      * remove removes next element, or throws NoSuchElementException if empty
370      */
371     public void testRemove() throws InterruptedException {
372         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
373         for (int i = 0; i < SIZE; ++i) {
374             assertEquals(i, (int) q.remove());
375         }
376         try {
377             q.remove();
378             shouldThrow();
379         } catch (NoSuchElementException success) {}
380         checkEmpty(q);
381     }
382 
383     /**
384      * An add following remove(x) succeeds
385      */
386     public void testRemoveElementAndAdd() throws InterruptedException {
387         LinkedTransferQueue q = new LinkedTransferQueue();
388         assertTrue(q.add(one));
389         assertTrue(q.add(two));
390         assertTrue(q.remove(one));
391         assertTrue(q.remove(two));
392         assertTrue(q.add(three));
393         assertSame(q.take(), three);
394     }
395 
396     /**
397      * contains(x) reports true when elements added but not yet removed
398      */
399     public void testContains() {
400         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
401         for (int i = 0; i < SIZE; ++i) {
402             assertTrue(q.contains(i));
403             assertEquals(i, (int) q.poll());
404             assertFalse(q.contains(i));
405         }
406     }
407 
408     /**
409      * clear removes all elements
410      */
411     public void testClear() throws InterruptedException {
412         LinkedTransferQueue q = populatedQueue(SIZE);
413         q.clear();
414         checkEmpty(q);
415         assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
416         q.add(one);
417         assertFalse(q.isEmpty());
418         assertEquals(1, q.size());
419         assertTrue(q.contains(one));
420         q.clear();
421         checkEmpty(q);
422     }
423 
424     /**
425      * containsAll(c) is true when c contains a subset of elements
426      */
427     public void testContainsAll() {
428         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
429         LinkedTransferQueue<Integer> p = new LinkedTransferQueue<Integer>();
430         for (int i = 0; i < SIZE; ++i) {
431             assertTrue(q.containsAll(p));
432             assertFalse(p.containsAll(q));
433             p.add(i);
434         }
435         assertTrue(p.containsAll(q));
436     }
437 
438     /**
439      * retainAll(c) retains only those elements of c and reports true
440      * if changed
441      */
442     public void testRetainAll() {
443         LinkedTransferQueue q = populatedQueue(SIZE);
444         LinkedTransferQueue p = populatedQueue(SIZE);
445         for (int i = 0; i < SIZE; ++i) {
446             boolean changed = q.retainAll(p);
447             if (i == 0) {
448                 assertFalse(changed);
449             } else {
450                 assertTrue(changed);
451             }
452             assertTrue(q.containsAll(p));
453             assertEquals(SIZE - i, q.size());
454             p.remove();
455         }
456     }
457 
458     /**
459      * removeAll(c) removes only those elements of c and reports true
460      * if changed
461      */
462     public void testRemoveAll() {
463         for (int i = 1; i < SIZE; ++i) {
464             LinkedTransferQueue q = populatedQueue(SIZE);
465             LinkedTransferQueue p = populatedQueue(i);
466             assertTrue(q.removeAll(p));
467             assertEquals(SIZE - i, q.size());
468             for (int j = 0; j < i; ++j) {
469                 assertFalse(q.contains(p.remove()));
470             }
471         }
472     }
473 
474     /**
475      * toArray() contains all elements in FIFO order
476      */
477     public void testToArray() {
478         LinkedTransferQueue q = populatedQueue(SIZE);
479         Object[] o = q.toArray();
480         for (int i = 0; i < o.length; i++) {
481             assertSame(o[i], q.poll());
482         }
483     }
484 
485     /**
486      * toArray(a) contains all elements in FIFO order
487      */
488     public void testToArray2() {
489         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
490         Integer[] ints = new Integer[SIZE];
491         Integer[] array = q.toArray(ints);
492         assertSame(ints, array);
493         for (int i = 0; i < ints.length; i++) {
494             assertSame(ints[i], q.poll());
495         }
496     }
497 
498     /**
499      * toArray(incompatible array type) throws ArrayStoreException
500      */
501     public void testToArray1_BadArg() {
502         LinkedTransferQueue q = populatedQueue(SIZE);
503         try {
504             q.toArray(new String[10]);
505             shouldThrow();
506         } catch (ArrayStoreException success) {}
507     }
508 
509     /**
510      * iterator iterates through all elements
511      */
512     public void testIterator() throws InterruptedException {
513         LinkedTransferQueue q = populatedQueue(SIZE);
514         Iterator it = q.iterator();
515         int i;
516         for (i = 0; it.hasNext(); i++)
517             assertTrue(q.contains(it.next()));
518         assertEquals(i, SIZE);
519         assertIteratorExhausted(it);
520 
521         it = q.iterator();
522         for (i = 0; it.hasNext(); i++)
523             assertEquals(it.next(), q.take());
524         assertEquals(i, SIZE);
525         assertIteratorExhausted(it);
526     }
527 
528     /**
529      * iterator of empty collection has no elements
530      */
531     public void testEmptyIterator() {
532         assertIteratorExhausted(new LinkedTransferQueue().iterator());
533     }
534 
535     /**
536      * iterator.remove() removes current element
537      */
538     public void testIteratorRemove() {
539         final LinkedTransferQueue q = new LinkedTransferQueue();
540         q.add(two);
541         q.add(one);
542         q.add(three);
543 
544         Iterator it = q.iterator();
545         it.next();
546         it.remove();
547 
548         it = q.iterator();
549         assertSame(it.next(), one);
550         assertSame(it.next(), three);
551         assertFalse(it.hasNext());
552     }
553 
554     /**
555      * iterator ordering is FIFO
556      */
557     public void testIteratorOrdering() {
558         final LinkedTransferQueue<Integer> q
559             = new LinkedTransferQueue<Integer>();
560         assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
561         q.add(one);
562         q.add(two);
563         q.add(three);
564         assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
565         int k = 0;
566         for (Integer n : q) {
567             assertEquals(++k, (int) n);
568         }
569         assertEquals(3, k);
570     }
571 
572     /**
573      * Modifications do not cause iterators to fail
574      */
575     public void testWeaklyConsistentIteration() {
576         final LinkedTransferQueue q = new LinkedTransferQueue();
577         q.add(one);
578         q.add(two);
579         q.add(three);
580         for (Iterator it = q.iterator(); it.hasNext();) {
581             q.remove();
582             it.next();
583         }
584         assertEquals(0, q.size());
585     }
586 
587     /**
588      * toString contains toStrings of elements
589      */
590     public void testToString() {
591         LinkedTransferQueue q = populatedQueue(SIZE);
592         String s = q.toString();
593         for (int i = 0; i < SIZE; ++i) {
594             assertTrue(s.contains(String.valueOf(i)));
595         }
596     }
597 
598     /**
599      * offer transfers elements across Executor tasks
600      */
601     public void testOfferInExecutor() {
602         final LinkedTransferQueue q = new LinkedTransferQueue();
603         final CheckedBarrier threadsStarted = new CheckedBarrier(2);
604         final ExecutorService executor = Executors.newFixedThreadPool(2);
605         try (PoolCleaner cleaner = cleaner(executor)) {
606 
607             executor.execute(new CheckedRunnable() {
608                 public void realRun() throws InterruptedException {
609                     threadsStarted.await();
610                     long startTime = System.nanoTime();
611                     assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
612                     assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
613                 }});
614 
615             executor.execute(new CheckedRunnable() {
616                 public void realRun() throws InterruptedException {
617                     threadsStarted.await();
618                     assertSame(one, q.take());
619                     checkEmpty(q);
620                 }});
621         }
622     }
623 
624     /**
625      * timed poll retrieves elements across Executor threads
626      */
627     public void testPollInExecutor() {
628         final LinkedTransferQueue q = new LinkedTransferQueue();
629         final CheckedBarrier threadsStarted = new CheckedBarrier(2);
630         final ExecutorService executor = Executors.newFixedThreadPool(2);
631         try (PoolCleaner cleaner = cleaner(executor)) {
632 
633             executor.execute(new CheckedRunnable() {
634                 public void realRun() throws InterruptedException {
635                     assertNull(q.poll());
636                     threadsStarted.await();
637                     long startTime = System.nanoTime();
638                     assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
639                     assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
640                     checkEmpty(q);
641                 }});
642 
643             executor.execute(new CheckedRunnable() {
644                 public void realRun() throws InterruptedException {
645                     threadsStarted.await();
646                     q.put(one);
647                 }});
648         }
649     }
650 
651     /**
652      * A deserialized serialized queue has same elements in same order
653      */
654     public void testSerialization() throws Exception {
655         Queue x = populatedQueue(SIZE);
656         Queue y = serialClone(x);
657 
658         assertNotSame(y, x);
659         assertEquals(x.size(), y.size());
660         assertEquals(x.toString(), y.toString());
661         assertTrue(Arrays.equals(x.toArray(), y.toArray()));
662         while (!x.isEmpty()) {
663             assertFalse(y.isEmpty());
664             assertEquals(x.remove(), y.remove());
665         }
666         assertTrue(y.isEmpty());
667     }
668 
669     /**
670      * drainTo(c) empties queue into another collection c
671      */
672     public void testDrainTo() {
673         LinkedTransferQueue q = populatedQueue(SIZE);
674         ArrayList l = new ArrayList();
675         q.drainTo(l);
676         assertEquals(0, q.size());
677         assertEquals(SIZE, l.size());
678         for (int i = 0; i < SIZE; ++i) {
679             assertEquals(i, l.get(i));
680         }
681         q.add(zero);
682         q.add(one);
683         assertFalse(q.isEmpty());
684         assertTrue(q.contains(zero));
685         assertTrue(q.contains(one));
686         l.clear();
687         q.drainTo(l);
688         assertEquals(0, q.size());
689         assertEquals(2, l.size());
690         for (int i = 0; i < 2; ++i) {
691             assertEquals(i, l.get(i));
692         }
693     }
694 
695     /**
696      * drainTo(c) empties full queue, unblocking a waiting put.
697      */
698     public void testDrainToWithActivePut() throws InterruptedException {
699         final LinkedTransferQueue q = populatedQueue(SIZE);
700         Thread t = newStartedThread(new CheckedRunnable() {
701             public void realRun() {
702                 q.put(SIZE + 1);
703             }});
704         ArrayList l = new ArrayList();
705         q.drainTo(l);
706         assertTrue(l.size() >= SIZE);
707         for (int i = 0; i < SIZE; ++i)
708             assertEquals(i, l.get(i));
709         awaitTermination(t);
710         assertTrue(q.size() + l.size() >= SIZE);
711     }
712 
713     /**
714      * drainTo(c, n) empties first min(n, size) elements of queue into c
715      */
716     public void testDrainToN() {
717         LinkedTransferQueue q = new LinkedTransferQueue();
718         for (int i = 0; i < SIZE + 2; ++i) {
719             for (int j = 0; j < SIZE; j++) {
720                 assertTrue(q.offer(j));
721             }
722             ArrayList l = new ArrayList();
723             q.drainTo(l, i);
724             int k = (i < SIZE) ? i : SIZE;
725             assertEquals(k, l.size());
726             assertEquals(SIZE - k, q.size());
727             for (int j = 0; j < k; ++j)
728                 assertEquals(j, l.get(j));
729             do {} while (q.poll() != null);
730         }
731     }
732 
733     /**
734      * timed poll() or take() increments the waiting consumer count;
735      * offer(e) decrements the waiting consumer count
736      */
737     public void testWaitingConsumer() throws InterruptedException {
738         final LinkedTransferQueue q = new LinkedTransferQueue();
739         assertEquals(0, q.getWaitingConsumerCount());
740         assertFalse(q.hasWaitingConsumer());
741         final CountDownLatch threadStarted = new CountDownLatch(1);
742 
743         Thread t = newStartedThread(new CheckedRunnable() {
744             public void realRun() throws InterruptedException {
745                 threadStarted.countDown();
746                 long startTime = System.nanoTime();
747                 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
748                 assertEquals(0, q.getWaitingConsumerCount());
749                 assertFalse(q.hasWaitingConsumer());
750                 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
751             }});
752 
753         threadStarted.await();
754         Callable<Boolean> oneConsumer
755             = new Callable<Boolean>() { public Boolean call() {
756                 return q.hasWaitingConsumer()
757                 && q.getWaitingConsumerCount() == 1; }};
758         waitForThreadToEnterWaitState(t, oneConsumer);
759 
760         assertTrue(q.offer(one));
761         assertEquals(0, q.getWaitingConsumerCount());
762         assertFalse(q.hasWaitingConsumer());
763 
764         awaitTermination(t);
765     }
766 
767     /**
768      * transfer(null) throws NullPointerException
769      */
770     public void testTransfer1() throws InterruptedException {
771         try {
772             LinkedTransferQueue q = new LinkedTransferQueue();
773             q.transfer(null);
774             shouldThrow();
775         } catch (NullPointerException success) {}
776     }
777 
778     /**
779      * transfer waits until a poll occurs. The transfered element
780      * is returned by this associated poll.
781      */
782     public void testTransfer2() throws InterruptedException {
783         final LinkedTransferQueue<Integer> q
784             = new LinkedTransferQueue<Integer>();
785         final CountDownLatch threadStarted = new CountDownLatch(1);
786 
787         Thread t = newStartedThread(new CheckedRunnable() {
788             public void realRun() throws InterruptedException {
789                 threadStarted.countDown();
790                 q.transfer(five);
791                 checkEmpty(q);
792             }});
793 
794         threadStarted.await();
795         Callable<Boolean> oneElement
796             = new Callable<Boolean>() { public Boolean call() {
797                 return !q.isEmpty() && q.size() == 1; }};
798         waitForThreadToEnterWaitState(t, oneElement);
799 
800         assertSame(five, q.poll());
801         checkEmpty(q);
802         awaitTermination(t);
803     }
804 
805     /**
806      * transfer waits until a poll occurs, and then transfers in fifo order
807      */
808     public void testTransfer3() throws InterruptedException {
809         final LinkedTransferQueue<Integer> q
810             = new LinkedTransferQueue<Integer>();
811 
812         Thread first = newStartedThread(new CheckedRunnable() {
813             public void realRun() throws InterruptedException {
814                 q.transfer(four);
815                 assertTrue(!q.contains(four));
816                 assertEquals(1, q.size());
817             }});
818 
819         Thread interruptedThread = newStartedThread(
820             new CheckedInterruptedRunnable() {
821                 public void realRun() throws InterruptedException {
822                     while (q.isEmpty())
823                         Thread.yield();
824                     q.transfer(five);
825                 }});
826 
827         while (q.size() < 2)
828             Thread.yield();
829         assertEquals(2, q.size());
830         assertSame(four, q.poll());
831         first.join();
832         assertEquals(1, q.size());
833         interruptedThread.interrupt();
834         interruptedThread.join();
835         checkEmpty(q);
836     }
837 
838     /**
839      * transfer waits until a poll occurs, at which point the polling
840      * thread returns the element
841      */
842     public void testTransfer4() throws InterruptedException {
843         final LinkedTransferQueue q = new LinkedTransferQueue();
844 
845         Thread t = newStartedThread(new CheckedRunnable() {
846             public void realRun() throws InterruptedException {
847                 q.transfer(four);
848                 assertFalse(q.contains(four));
849                 assertSame(three, q.poll());
850             }});
851 
852         while (q.isEmpty())
853             Thread.yield();
854         assertFalse(q.isEmpty());
855         assertEquals(1, q.size());
856         assertTrue(q.offer(three));
857         assertSame(four, q.poll());
858         awaitTermination(t);
859     }
860 
861     /**
862      * transfer waits until a take occurs. The transfered element
863      * is returned by this associated take.
864      */
865     public void testTransfer5() throws InterruptedException {
866         final LinkedTransferQueue<Integer> q
867             = new LinkedTransferQueue<Integer>();
868 
869         Thread t = newStartedThread(new CheckedRunnable() {
870             public void realRun() throws InterruptedException {
871                 q.transfer(four);
872                 checkEmpty(q);
873             }});
874 
875         while (q.isEmpty())
876             Thread.yield();
877         assertFalse(q.isEmpty());
878         assertEquals(1, q.size());
879         assertSame(four, q.take());
880         checkEmpty(q);
881         awaitTermination(t);
882     }
883 
884     /**
885      * tryTransfer(null) throws NullPointerException
886      */
887     public void testTryTransfer1() {
888         final LinkedTransferQueue q = new LinkedTransferQueue();
889         try {
890             q.tryTransfer(null);
891             shouldThrow();
892         } catch (NullPointerException success) {}
893     }
894 
895     /**
896      * tryTransfer returns false and does not enqueue if there are no
897      * consumers waiting to poll or take.
898      */
899     public void testTryTransfer2() throws InterruptedException {
900         final LinkedTransferQueue q = new LinkedTransferQueue();
901         assertFalse(q.tryTransfer(new Object()));
902         assertFalse(q.hasWaitingConsumer());
903         checkEmpty(q);
904     }
905 
906     /**
907      * If there is a consumer waiting in timed poll, tryTransfer
908      * returns true while successfully transfering object.
909      */
910     public void testTryTransfer3() throws InterruptedException {
911         final Object hotPotato = new Object();
912         final LinkedTransferQueue q = new LinkedTransferQueue();
913 
914         Thread t = newStartedThread(new CheckedRunnable() {
915             public void realRun() {
916                 while (! q.hasWaitingConsumer())
917                     Thread.yield();
918                 assertTrue(q.hasWaitingConsumer());
919                 checkEmpty(q);
920                 assertTrue(q.tryTransfer(hotPotato));
921             }});
922 
923         long startTime = System.nanoTime();
924         assertSame(hotPotato, q.poll(LONG_DELAY_MS, MILLISECONDS));
925         assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
926         checkEmpty(q);
927         awaitTermination(t);
928     }
929 
930     /**
931      * If there is a consumer waiting in take, tryTransfer returns
932      * true while successfully transfering object.
933      */
934     public void testTryTransfer4() throws InterruptedException {
935         final Object hotPotato = new Object();
936         final LinkedTransferQueue q = new LinkedTransferQueue();
937 
938         Thread t = newStartedThread(new CheckedRunnable() {
939             public void realRun() {
940                 while (! q.hasWaitingConsumer())
941                     Thread.yield();
942                 assertTrue(q.hasWaitingConsumer());
943                 checkEmpty(q);
944                 assertTrue(q.tryTransfer(hotPotato));
945             }});
946 
947         assertSame(q.take(), hotPotato);
948         checkEmpty(q);
949         awaitTermination(t);
950     }
951 
952     /**
953      * tryTransfer blocks interruptibly if no takers
954      */
955     public void testTryTransfer5() throws InterruptedException {
956         final LinkedTransferQueue q = new LinkedTransferQueue();
957         final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
958         assertTrue(q.isEmpty());
959 
960         Thread t = newStartedThread(new CheckedRunnable() {
961             public void realRun() throws InterruptedException {
962                 long startTime = System.nanoTime();
963                 Thread.currentThread().interrupt();
964                 try {
965                     q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
966                     shouldThrow();
967                 } catch (InterruptedException success) {}
968                 assertFalse(Thread.interrupted());
969 
970                 pleaseInterrupt.countDown();
971                 try {
972                     q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
973                     shouldThrow();
974                 } catch (InterruptedException success) {}
975                 assertFalse(Thread.interrupted());
976                 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
977             }});
978 
979         await(pleaseInterrupt);
980         assertThreadStaysAlive(t);
981         t.interrupt();
982         awaitTermination(t);
983         checkEmpty(q);
984     }
985 
986     /**
987      * tryTransfer gives up after the timeout and returns false
988      */
989     public void testTryTransfer6() throws InterruptedException {
990         final LinkedTransferQueue q = new LinkedTransferQueue();
991 
992         Thread t = newStartedThread(new CheckedRunnable() {
993             public void realRun() throws InterruptedException {
994                 long startTime = System.nanoTime();
995                 assertFalse(q.tryTransfer(new Object(),
996                                           timeoutMillis(), MILLISECONDS));
997                 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
998                 checkEmpty(q);
999             }});
1000 
1001         awaitTermination(t);
1002         checkEmpty(q);
1003     }
1004 
1005     /**
1006      * tryTransfer waits for any elements previously in to be removed
1007      * before transfering to a poll or take
1008      */
1009     public void testTryTransfer7() throws InterruptedException {
1010         final LinkedTransferQueue q = new LinkedTransferQueue();
1011         assertTrue(q.offer(four));
1012 
1013         Thread t = newStartedThread(new CheckedRunnable() {
1014             public void realRun() throws InterruptedException {
1015                 long startTime = System.nanoTime();
1016                 assertTrue(q.tryTransfer(five, LONG_DELAY_MS, MILLISECONDS));
1017                 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1018                 checkEmpty(q);
1019             }});
1020 
1021         while (q.size() != 2)
1022             Thread.yield();
1023         assertEquals(2, q.size());
1024         assertSame(four, q.poll());
1025         assertSame(five, q.poll());
1026         checkEmpty(q);
1027         awaitTermination(t);
1028     }
1029 
1030     /**
1031      * tryTransfer attempts to enqueue into the queue and fails
1032      * returning false not enqueueing and the successive poll is null
1033      */
1034     public void testTryTransfer8() throws InterruptedException {
1035         final LinkedTransferQueue q = new LinkedTransferQueue();
1036         assertTrue(q.offer(four));
1037         assertEquals(1, q.size());
1038         long startTime = System.nanoTime();
1039         assertFalse(q.tryTransfer(five, timeoutMillis(), MILLISECONDS));
1040         assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
1041         assertEquals(1, q.size());
1042         assertSame(four, q.poll());
1043         assertNull(q.poll());
1044         checkEmpty(q);
1045     }
1046 
1047     private LinkedTransferQueue<Integer> populatedQueue(int n) {
1048         LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
1049         checkEmpty(q);
1050         for (int i = 0; i < n; i++) {
1051             assertEquals(i, q.size());
1052             assertTrue(q.offer(i));
1053             assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
1054         }
1055         assertFalse(q.isEmpty());
1056         return q;
1057     }
1058 
1059     /**
1060      * remove(null), contains(null) always return false
1061      */
1062     public void testNeverContainsNull() {
1063         Collection<?>[] qs = {
1064             new LinkedTransferQueue<Object>(),
1065             populatedQueue(2),
1066         };
1067 
1068         for (Collection<?> q : qs) {
1069             assertFalse(q.contains(null));
1070             assertFalse(q.remove(null));
1071         }
1072     }
1073 }
1074