1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/publicdomain/zero/1.0/
5  * Other contributors include John Vint
6  */
7 
8 package jsr166;
9 
10 import static java.util.concurrent.TimeUnit.MILLISECONDS;
11 
12 import java.util.ArrayList;
13 import java.util.Arrays;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.List;
17 import java.util.NoSuchElementException;
18 import java.util.Queue;
19 import java.util.concurrent.BlockingQueue;
20 import java.util.concurrent.CountDownLatch;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.LinkedTransferQueue;
24 
25 import junit.framework.Test;
26 
27 @SuppressWarnings({"unchecked", "rawtypes"})
28 public class LinkedTransferQueueTest extends JSR166TestCase {
29     static class Implementation implements CollectionImplementation {
klazz()30         public Class<?> klazz() { return LinkedTransferQueue.class; }
emptyCollection()31         public Collection emptyCollection() { return new LinkedTransferQueue(); }
makeElement(int i)32         public Object makeElement(int i) { return i; }
isConcurrent()33         public boolean isConcurrent() { return true; }
permitsNulls()34         public boolean permitsNulls() { return false; }
35     }
36 
37     // android-note: These tests have been moved into their own separate
38     // classes to work around CTS issues:
39     // LinkedTransferQueueBlockingQueueTest.java
40     // LinkedTransferQueueCollectionTest.java
41     //
42     // public static class Generic extends BlockingQueueTest {
43     //     protected BlockingQueue emptyCollection() {
44     //         return new LinkedTransferQueue();
45     //     }
46     // }
47 
48     // android-note: Removed because the CTS runner does a bad job of
49     // retrying tests that have suite() declarations.
50     //
51     // public static void main(String[] args) {
52     //     main(suite(), args);
53     // }
54     // public static Test suite() {
55     //     return newTestSuite(LinkedTransferQueueTest.class,
56     //                         new Generic().testSuite(),
57     //                         CollectionTest.testSuite(new Implementation()));
58     // }
59 
60     /**
61      * Constructor builds new queue with size being zero and empty
62      * being true
63      */
testConstructor1()64     public void testConstructor1() {
65         assertEquals(0, new LinkedTransferQueue().size());
66         assertTrue(new LinkedTransferQueue().isEmpty());
67     }
68 
69     /**
70      * Initializing constructor with null collection throws
71      * NullPointerException
72      */
testConstructor2()73     public void testConstructor2() {
74         try {
75             new LinkedTransferQueue(null);
76             shouldThrow();
77         } catch (NullPointerException success) {}
78     }
79 
80     /**
81      * Initializing from Collection of null elements throws
82      * NullPointerException
83      */
testConstructor3()84     public void testConstructor3() {
85         Collection<Integer> elements = Arrays.asList(new Integer[SIZE]);
86         try {
87             new LinkedTransferQueue(elements);
88             shouldThrow();
89         } catch (NullPointerException success) {}
90     }
91 
92     /**
93      * Initializing constructor with a collection containing some null elements
94      * throws NullPointerException
95      */
testConstructor4()96     public void testConstructor4() {
97         Integer[] ints = new Integer[SIZE];
98         for (int i = 0; i < SIZE - 1; ++i)
99             ints[i] = i;
100         Collection<Integer> elements = Arrays.asList(ints);
101         try {
102             new LinkedTransferQueue(elements);
103             shouldThrow();
104         } catch (NullPointerException success) {}
105     }
106 
107     /**
108      * Queue contains all elements of the collection it is initialized by
109      */
testConstructor5()110     public void testConstructor5() {
111         Integer[] ints = new Integer[SIZE];
112         for (int i = 0; i < SIZE; ++i) {
113             ints[i] = i;
114         }
115         List intList = Arrays.asList(ints);
116         LinkedTransferQueue q
117             = new LinkedTransferQueue(intList);
118         assertEquals(q.size(), intList.size());
119         assertEquals(q.toString(), intList.toString());
120         assertTrue(Arrays.equals(q.toArray(),
121                                      intList.toArray()));
122         assertTrue(Arrays.equals(q.toArray(new Object[0]),
123                                  intList.toArray(new Object[0])));
124         assertTrue(Arrays.equals(q.toArray(new Object[SIZE]),
125                                  intList.toArray(new Object[SIZE])));
126         for (int i = 0; i < SIZE; ++i) {
127             assertEquals(ints[i], q.poll());
128         }
129     }
130 
131     /**
132      * remainingCapacity() always returns Integer.MAX_VALUE
133      */
testRemainingCapacity()134     public void testRemainingCapacity() {
135         BlockingQueue q = populatedQueue(SIZE);
136         for (int i = 0; i < SIZE; ++i) {
137             assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
138             assertEquals(SIZE - i, q.size());
139             assertEquals(i, q.remove());
140         }
141         for (int i = 0; i < SIZE; ++i) {
142             assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
143             assertEquals(i, q.size());
144             assertTrue(q.add(i));
145         }
146     }
147 
148     /**
149      * addAll(this) throws IllegalArgumentException
150      */
testAddAllSelf()151     public void testAddAllSelf() {
152         LinkedTransferQueue q = populatedQueue(SIZE);
153         try {
154             q.addAll(q);
155             shouldThrow();
156         } catch (IllegalArgumentException success) {}
157     }
158 
159     /**
160      * addAll of a collection with any null elements throws
161      * NullPointerException after possibly adding some elements
162      */
testAddAll3()163     public void testAddAll3() {
164         LinkedTransferQueue q = new LinkedTransferQueue();
165         Integer[] ints = new Integer[SIZE];
166         for (int i = 0; i < SIZE - 1; ++i)
167             ints[i] = i;
168         try {
169             q.addAll(Arrays.asList(ints));
170             shouldThrow();
171         } catch (NullPointerException success) {}
172     }
173 
174     /**
175      * Queue contains all elements, in traversal order, of successful addAll
176      */
testAddAll5()177     public void testAddAll5() {
178         Integer[] empty = new Integer[0];
179         Integer[] ints = new Integer[SIZE];
180         for (int i = 0; i < SIZE; ++i) {
181             ints[i] = i;
182         }
183         LinkedTransferQueue q = new LinkedTransferQueue();
184         assertFalse(q.addAll(Arrays.asList(empty)));
185         assertTrue(q.addAll(Arrays.asList(ints)));
186         for (int i = 0; i < SIZE; ++i) {
187             assertEquals(ints[i], q.poll());
188         }
189     }
190 
191     /**
192      * all elements successfully put are contained
193      */
testPut()194     public void testPut() {
195         LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
196         for (int i = 0; i < SIZE; ++i) {
197             assertEquals(i, q.size());
198             q.put(i);
199             assertTrue(q.contains(i));
200         }
201     }
202 
203     /**
204      * take retrieves elements in FIFO order
205      */
testTake()206     public void testTake() throws InterruptedException {
207         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
208         for (int i = 0; i < SIZE; ++i) {
209             assertEquals(i, (int) q.take());
210         }
211     }
212 
213     /**
214      * take removes existing elements until empty, then blocks interruptibly
215      */
testBlockingTake()216     public void testBlockingTake() throws InterruptedException {
217         final BlockingQueue q = populatedQueue(SIZE);
218         final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
219         Thread t = newStartedThread(new CheckedRunnable() {
220             public void realRun() throws InterruptedException {
221                 for (int i = 0; i < SIZE; ++i) {
222                     assertEquals(i, q.take());
223                 }
224 
225                 Thread.currentThread().interrupt();
226                 try {
227                     q.take();
228                     shouldThrow();
229                 } catch (InterruptedException success) {}
230                 assertFalse(Thread.interrupted());
231 
232                 pleaseInterrupt.countDown();
233                 try {
234                     q.take();
235                     shouldThrow();
236                 } catch (InterruptedException success) {}
237                 assertFalse(Thread.interrupted());
238             }});
239 
240         await(pleaseInterrupt);
241         assertThreadStaysAlive(t);
242         t.interrupt();
243         awaitTermination(t);
244     }
245 
246     /**
247      * poll succeeds unless empty
248      */
testPoll()249     public void testPoll() throws InterruptedException {
250         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
251         for (int i = 0; i < SIZE; ++i) {
252             assertEquals(i, (int) q.poll());
253         }
254         assertNull(q.poll());
255         checkEmpty(q);
256     }
257 
258     /**
259      * timed poll with zero timeout succeeds when non-empty, else times out
260      */
testTimedPoll0()261     public void testTimedPoll0() throws InterruptedException {
262         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
263         for (int i = 0; i < SIZE; ++i) {
264             assertEquals(i, (int) q.poll(0, MILLISECONDS));
265         }
266         assertNull(q.poll(0, MILLISECONDS));
267         checkEmpty(q);
268     }
269 
270     /**
271      * timed poll with nonzero timeout succeeds when non-empty, else times out
272      */
testTimedPoll()273     public void testTimedPoll() throws InterruptedException {
274         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
275         long startTime = System.nanoTime();
276         for (int i = 0; i < SIZE; ++i)
277             assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
278         assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
279 
280         startTime = System.nanoTime();
281         assertNull(q.poll(timeoutMillis(), MILLISECONDS));
282         assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
283         checkEmpty(q);
284     }
285 
286     /**
287      * Interrupted timed poll throws InterruptedException instead of
288      * returning timeout status
289      */
testInterruptedTimedPoll()290     public void testInterruptedTimedPoll() throws InterruptedException {
291         final BlockingQueue<Integer> q = populatedQueue(SIZE);
292         final CountDownLatch aboutToWait = new CountDownLatch(1);
293         Thread t = newStartedThread(new CheckedRunnable() {
294             public void realRun() throws InterruptedException {
295                 long startTime = System.nanoTime();
296                 for (int i = 0; i < SIZE; ++i)
297                     assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
298                 aboutToWait.countDown();
299                 try {
300                     q.poll(LONG_DELAY_MS, MILLISECONDS);
301                     shouldThrow();
302                 } catch (InterruptedException success) {}
303                 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
304             }});
305 
306         aboutToWait.await();
307         waitForThreadToEnterWaitState(t);
308         t.interrupt();
309         awaitTermination(t);
310         checkEmpty(q);
311     }
312 
313     /**
314      * timed poll after thread interrupted throws InterruptedException
315      * instead of returning timeout status
316      */
testTimedPollAfterInterrupt()317     public void testTimedPollAfterInterrupt() throws InterruptedException {
318         final BlockingQueue<Integer> q = populatedQueue(SIZE);
319         Thread t = newStartedThread(new CheckedRunnable() {
320             public void realRun() throws InterruptedException {
321                 long startTime = System.nanoTime();
322                 Thread.currentThread().interrupt();
323                 for (int i = 0; i < SIZE; ++i)
324                     assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
325                 try {
326                     q.poll(LONG_DELAY_MS, MILLISECONDS);
327                     shouldThrow();
328                 } catch (InterruptedException success) {}
329                 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
330             }});
331 
332         awaitTermination(t);
333         checkEmpty(q);
334     }
335 
336     /**
337      * peek returns next element, or null if empty
338      */
testPeek()339     public void testPeek() throws InterruptedException {
340         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
341         for (int i = 0; i < SIZE; ++i) {
342             assertEquals(i, (int) q.peek());
343             assertEquals(i, (int) q.poll());
344             assertTrue(q.peek() == null ||
345                        i != (int) q.peek());
346         }
347         assertNull(q.peek());
348         checkEmpty(q);
349     }
350 
351     /**
352      * element returns next element, or throws NoSuchElementException if empty
353      */
testElement()354     public void testElement() throws InterruptedException {
355         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
356         for (int i = 0; i < SIZE; ++i) {
357             assertEquals(i, (int) q.element());
358             assertEquals(i, (int) q.poll());
359         }
360         try {
361             q.element();
362             shouldThrow();
363         } catch (NoSuchElementException success) {}
364         checkEmpty(q);
365     }
366 
367     /**
368      * remove removes next element, or throws NoSuchElementException if empty
369      */
testRemove()370     public void testRemove() throws InterruptedException {
371         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
372         for (int i = 0; i < SIZE; ++i) {
373             assertEquals(i, (int) q.remove());
374         }
375         try {
376             q.remove();
377             shouldThrow();
378         } catch (NoSuchElementException success) {}
379         checkEmpty(q);
380     }
381 
382     /**
383      * An add following remove(x) succeeds
384      */
testRemoveElementAndAdd()385     public void testRemoveElementAndAdd() throws InterruptedException {
386         LinkedTransferQueue q = new LinkedTransferQueue();
387         assertTrue(q.add(one));
388         assertTrue(q.add(two));
389         assertTrue(q.remove(one));
390         assertTrue(q.remove(two));
391         assertTrue(q.add(three));
392         assertSame(q.take(), three);
393     }
394 
395     /**
396      * contains(x) reports true when elements added but not yet removed
397      */
testContains()398     public void testContains() {
399         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
400         for (int i = 0; i < SIZE; ++i) {
401             assertTrue(q.contains(i));
402             assertEquals(i, (int) q.poll());
403             assertFalse(q.contains(i));
404         }
405     }
406 
407     /**
408      * clear removes all elements
409      */
testClear()410     public void testClear() throws InterruptedException {
411         LinkedTransferQueue q = populatedQueue(SIZE);
412         q.clear();
413         checkEmpty(q);
414         assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
415         q.add(one);
416         assertFalse(q.isEmpty());
417         assertEquals(1, q.size());
418         assertTrue(q.contains(one));
419         q.clear();
420         checkEmpty(q);
421     }
422 
423     /**
424      * containsAll(c) is true when c contains a subset of elements
425      */
testContainsAll()426     public void testContainsAll() {
427         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
428         LinkedTransferQueue<Integer> p = new LinkedTransferQueue<Integer>();
429         for (int i = 0; i < SIZE; ++i) {
430             assertTrue(q.containsAll(p));
431             assertFalse(p.containsAll(q));
432             p.add(i);
433         }
434         assertTrue(p.containsAll(q));
435     }
436 
437     /**
438      * retainAll(c) retains only those elements of c and reports true
439      * if changed
440      */
testRetainAll()441     public void testRetainAll() {
442         LinkedTransferQueue q = populatedQueue(SIZE);
443         LinkedTransferQueue p = populatedQueue(SIZE);
444         for (int i = 0; i < SIZE; ++i) {
445             boolean changed = q.retainAll(p);
446             if (i == 0) {
447                 assertFalse(changed);
448             } else {
449                 assertTrue(changed);
450             }
451             assertTrue(q.containsAll(p));
452             assertEquals(SIZE - i, q.size());
453             p.remove();
454         }
455     }
456 
457     /**
458      * removeAll(c) removes only those elements of c and reports true
459      * if changed
460      */
testRemoveAll()461     public void testRemoveAll() {
462         for (int i = 1; i < SIZE; ++i) {
463             LinkedTransferQueue q = populatedQueue(SIZE);
464             LinkedTransferQueue p = populatedQueue(i);
465             assertTrue(q.removeAll(p));
466             assertEquals(SIZE - i, q.size());
467             for (int j = 0; j < i; ++j) {
468                 assertFalse(q.contains(p.remove()));
469             }
470         }
471     }
472 
473     /**
474      * toArray() contains all elements in FIFO order
475      */
testToArray()476     public void testToArray() {
477         LinkedTransferQueue q = populatedQueue(SIZE);
478         Object[] o = q.toArray();
479         for (int i = 0; i < o.length; i++) {
480             assertSame(o[i], q.poll());
481         }
482     }
483 
484     /**
485      * toArray(a) contains all elements in FIFO order
486      */
testToArray2()487     public void testToArray2() {
488         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
489         Integer[] ints = new Integer[SIZE];
490         Integer[] array = q.toArray(ints);
491         assertSame(ints, array);
492         for (int i = 0; i < ints.length; i++) {
493             assertSame(ints[i], q.poll());
494         }
495     }
496 
497     /**
498      * toArray(incompatible array type) throws ArrayStoreException
499      */
testToArray1_BadArg()500     public void testToArray1_BadArg() {
501         LinkedTransferQueue q = populatedQueue(SIZE);
502         try {
503             q.toArray(new String[10]);
504             shouldThrow();
505         } catch (ArrayStoreException success) {}
506     }
507 
508     /**
509      * iterator iterates through all elements
510      */
testIterator()511     public void testIterator() throws InterruptedException {
512         LinkedTransferQueue q = populatedQueue(SIZE);
513         Iterator it = q.iterator();
514         int i;
515         for (i = 0; it.hasNext(); i++)
516             assertTrue(q.contains(it.next()));
517         assertEquals(i, SIZE);
518         assertIteratorExhausted(it);
519 
520         it = q.iterator();
521         for (i = 0; it.hasNext(); i++)
522             assertEquals(it.next(), q.take());
523         assertEquals(i, SIZE);
524         assertIteratorExhausted(it);
525     }
526 
527     /**
528      * iterator of empty collection has no elements
529      */
testEmptyIterator()530     public void testEmptyIterator() {
531         assertIteratorExhausted(new LinkedTransferQueue().iterator());
532     }
533 
534     /**
535      * iterator.remove() removes current element
536      */
testIteratorRemove()537     public void testIteratorRemove() {
538         final LinkedTransferQueue q = new LinkedTransferQueue();
539         q.add(two);
540         q.add(one);
541         q.add(three);
542 
543         Iterator it = q.iterator();
544         it.next();
545         it.remove();
546 
547         it = q.iterator();
548         assertSame(it.next(), one);
549         assertSame(it.next(), three);
550         assertFalse(it.hasNext());
551     }
552 
553     /**
554      * iterator ordering is FIFO
555      */
testIteratorOrdering()556     public void testIteratorOrdering() {
557         final LinkedTransferQueue<Integer> q
558             = new LinkedTransferQueue<Integer>();
559         assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
560         q.add(one);
561         q.add(two);
562         q.add(three);
563         assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
564         int k = 0;
565         for (Integer n : q) {
566             assertEquals(++k, (int) n);
567         }
568         assertEquals(3, k);
569     }
570 
571     /**
572      * Modifications do not cause iterators to fail
573      */
testWeaklyConsistentIteration()574     public void testWeaklyConsistentIteration() {
575         final LinkedTransferQueue q = new LinkedTransferQueue();
576         q.add(one);
577         q.add(two);
578         q.add(three);
579         for (Iterator it = q.iterator(); it.hasNext();) {
580             q.remove();
581             it.next();
582         }
583         assertEquals(0, q.size());
584     }
585 
586     /**
587      * toString contains toStrings of elements
588      */
testToString()589     public void testToString() {
590         LinkedTransferQueue q = populatedQueue(SIZE);
591         String s = q.toString();
592         for (int i = 0; i < SIZE; ++i) {
593             assertTrue(s.contains(String.valueOf(i)));
594         }
595     }
596 
597     /**
598      * offer transfers elements across Executor tasks
599      */
testOfferInExecutor()600     public void testOfferInExecutor() {
601         final LinkedTransferQueue q = new LinkedTransferQueue();
602         final CheckedBarrier threadsStarted = new CheckedBarrier(2);
603         final ExecutorService executor = Executors.newFixedThreadPool(2);
604         try (PoolCleaner cleaner = cleaner(executor)) {
605 
606             executor.execute(new CheckedRunnable() {
607                 public void realRun() throws InterruptedException {
608                     threadsStarted.await();
609                     long startTime = System.nanoTime();
610                     assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
611                     assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
612                 }});
613 
614             executor.execute(new CheckedRunnable() {
615                 public void realRun() throws InterruptedException {
616                     threadsStarted.await();
617                     assertSame(one, q.take());
618                     checkEmpty(q);
619                 }});
620         }
621     }
622 
623     /**
624      * timed poll retrieves elements across Executor threads
625      */
testPollInExecutor()626     public void testPollInExecutor() {
627         final LinkedTransferQueue q = new LinkedTransferQueue();
628         final CheckedBarrier threadsStarted = new CheckedBarrier(2);
629         final ExecutorService executor = Executors.newFixedThreadPool(2);
630         try (PoolCleaner cleaner = cleaner(executor)) {
631 
632             executor.execute(new CheckedRunnable() {
633                 public void realRun() throws InterruptedException {
634                     assertNull(q.poll());
635                     threadsStarted.await();
636                     long startTime = System.nanoTime();
637                     assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
638                     assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
639                     checkEmpty(q);
640                 }});
641 
642             executor.execute(new CheckedRunnable() {
643                 public void realRun() throws InterruptedException {
644                     threadsStarted.await();
645                     q.put(one);
646                 }});
647         }
648     }
649 
650     /**
651      * A deserialized serialized queue has same elements in same order
652      */
testSerialization()653     public void testSerialization() throws Exception {
654         Queue x = populatedQueue(SIZE);
655         Queue y = serialClone(x);
656 
657         assertNotSame(y, x);
658         assertEquals(x.size(), y.size());
659         assertEquals(x.toString(), y.toString());
660         assertTrue(Arrays.equals(x.toArray(), y.toArray()));
661         while (!x.isEmpty()) {
662             assertFalse(y.isEmpty());
663             assertEquals(x.remove(), y.remove());
664         }
665         assertTrue(y.isEmpty());
666     }
667 
668     /**
669      * drainTo(c) empties queue into another collection c
670      */
testDrainTo()671     public void testDrainTo() {
672         LinkedTransferQueue q = populatedQueue(SIZE);
673         ArrayList l = new ArrayList();
674         q.drainTo(l);
675         assertEquals(0, q.size());
676         assertEquals(SIZE, l.size());
677         for (int i = 0; i < SIZE; ++i) {
678             assertEquals(i, l.get(i));
679         }
680         q.add(zero);
681         q.add(one);
682         assertFalse(q.isEmpty());
683         assertTrue(q.contains(zero));
684         assertTrue(q.contains(one));
685         l.clear();
686         q.drainTo(l);
687         assertEquals(0, q.size());
688         assertEquals(2, l.size());
689         for (int i = 0; i < 2; ++i) {
690             assertEquals(i, l.get(i));
691         }
692     }
693 
694     /**
695      * drainTo(c) empties full queue, unblocking a waiting put.
696      */
testDrainToWithActivePut()697     public void testDrainToWithActivePut() throws InterruptedException {
698         final LinkedTransferQueue q = populatedQueue(SIZE);
699         Thread t = newStartedThread(new CheckedRunnable() {
700             public void realRun() {
701                 q.put(SIZE + 1);
702             }});
703         ArrayList l = new ArrayList();
704         q.drainTo(l);
705         assertTrue(l.size() >= SIZE);
706         for (int i = 0; i < SIZE; ++i)
707             assertEquals(i, l.get(i));
708         awaitTermination(t);
709         assertTrue(q.size() + l.size() >= SIZE);
710     }
711 
712     /**
713      * drainTo(c, n) empties first min(n, size) elements of queue into c
714      */
testDrainToN()715     public void testDrainToN() {
716         LinkedTransferQueue q = new LinkedTransferQueue();
717         for (int i = 0; i < SIZE + 2; ++i) {
718             for (int j = 0; j < SIZE; j++) {
719                 assertTrue(q.offer(j));
720             }
721             ArrayList l = new ArrayList();
722             q.drainTo(l, i);
723             int k = (i < SIZE) ? i : SIZE;
724             assertEquals(k, l.size());
725             assertEquals(SIZE - k, q.size());
726             for (int j = 0; j < k; ++j)
727                 assertEquals(j, l.get(j));
728             do {} while (q.poll() != null);
729         }
730     }
731 
732     /**
733      * timed poll() or take() increments the waiting consumer count;
734      * offer(e) decrements the waiting consumer count
735      */
testWaitingConsumer()736     public void testWaitingConsumer() throws InterruptedException {
737         final LinkedTransferQueue q = new LinkedTransferQueue();
738         assertEquals(0, q.getWaitingConsumerCount());
739         assertFalse(q.hasWaitingConsumer());
740         final CountDownLatch threadStarted = new CountDownLatch(1);
741 
742         Thread t = newStartedThread(new CheckedRunnable() {
743             public void realRun() throws InterruptedException {
744                 threadStarted.countDown();
745                 long startTime = System.nanoTime();
746                 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
747                 assertEquals(0, q.getWaitingConsumerCount());
748                 assertFalse(q.hasWaitingConsumer());
749                 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
750             }});
751 
752         threadStarted.await();
753         waitForThreadToEnterWaitState(t);
754         assertEquals(1, q.getWaitingConsumerCount());
755         assertTrue(q.hasWaitingConsumer());
756 
757         assertTrue(q.offer(one));
758         assertEquals(0, q.getWaitingConsumerCount());
759         assertFalse(q.hasWaitingConsumer());
760 
761         awaitTermination(t);
762     }
763 
764     /**
765      * transfer(null) throws NullPointerException
766      */
testTransfer1()767     public void testTransfer1() throws InterruptedException {
768         try {
769             LinkedTransferQueue q = new LinkedTransferQueue();
770             q.transfer(null);
771             shouldThrow();
772         } catch (NullPointerException success) {}
773     }
774 
775     /**
776      * transfer waits until a poll occurs. The transfered element
777      * is returned by this associated poll.
778      */
testTransfer2()779     public void testTransfer2() throws InterruptedException {
780         final LinkedTransferQueue<Integer> q
781             = new LinkedTransferQueue<Integer>();
782         final CountDownLatch threadStarted = new CountDownLatch(1);
783 
784         Thread t = newStartedThread(new CheckedRunnable() {
785             public void realRun() throws InterruptedException {
786                 threadStarted.countDown();
787                 q.transfer(five);
788                 checkEmpty(q);
789             }});
790 
791         threadStarted.await();
792         waitForThreadToEnterWaitState(t);
793         assertEquals(1, q.size());
794         assertSame(five, q.poll());
795         checkEmpty(q);
796         awaitTermination(t);
797     }
798 
799     /**
800      * transfer waits until a poll occurs, and then transfers in fifo order
801      */
testTransfer3()802     public void testTransfer3() throws InterruptedException {
803         final LinkedTransferQueue<Integer> q
804             = new LinkedTransferQueue<Integer>();
805 
806         Thread first = newStartedThread(new CheckedRunnable() {
807             public void realRun() throws InterruptedException {
808                 q.transfer(four);
809                 assertTrue(!q.contains(four));
810                 assertEquals(1, q.size());
811             }});
812 
813         Thread interruptedThread = newStartedThread(
814             new CheckedInterruptedRunnable() {
815                 public void realRun() throws InterruptedException {
816                     while (q.isEmpty())
817                         Thread.yield();
818                     q.transfer(five);
819                 }});
820 
821         while (q.size() < 2)
822             Thread.yield();
823         assertEquals(2, q.size());
824         assertSame(four, q.poll());
825         first.join();
826         assertEquals(1, q.size());
827         interruptedThread.interrupt();
828         interruptedThread.join();
829         checkEmpty(q);
830     }
831 
832     /**
833      * transfer waits until a poll occurs, at which point the polling
834      * thread returns the element
835      */
testTransfer4()836     public void testTransfer4() throws InterruptedException {
837         final LinkedTransferQueue q = new LinkedTransferQueue();
838 
839         Thread t = newStartedThread(new CheckedRunnable() {
840             public void realRun() throws InterruptedException {
841                 q.transfer(four);
842                 assertFalse(q.contains(four));
843                 assertSame(three, q.poll());
844             }});
845 
846         while (q.isEmpty())
847             Thread.yield();
848         assertFalse(q.isEmpty());
849         assertEquals(1, q.size());
850         assertTrue(q.offer(three));
851         assertSame(four, q.poll());
852         awaitTermination(t);
853     }
854 
855     /**
856      * transfer waits until a take occurs. The transfered element
857      * is returned by this associated take.
858      */
testTransfer5()859     public void testTransfer5() throws InterruptedException {
860         final LinkedTransferQueue<Integer> q
861             = new LinkedTransferQueue<Integer>();
862 
863         Thread t = newStartedThread(new CheckedRunnable() {
864             public void realRun() throws InterruptedException {
865                 q.transfer(four);
866                 checkEmpty(q);
867             }});
868 
869         while (q.isEmpty())
870             Thread.yield();
871         assertFalse(q.isEmpty());
872         assertEquals(1, q.size());
873         assertSame(four, q.take());
874         checkEmpty(q);
875         awaitTermination(t);
876     }
877 
878     /**
879      * tryTransfer(null) throws NullPointerException
880      */
testTryTransfer1()881     public void testTryTransfer1() {
882         final LinkedTransferQueue q = new LinkedTransferQueue();
883         try {
884             q.tryTransfer(null);
885             shouldThrow();
886         } catch (NullPointerException success) {}
887     }
888 
889     /**
890      * tryTransfer returns false and does not enqueue if there are no
891      * consumers waiting to poll or take.
892      */
testTryTransfer2()893     public void testTryTransfer2() throws InterruptedException {
894         final LinkedTransferQueue q = new LinkedTransferQueue();
895         assertFalse(q.tryTransfer(new Object()));
896         assertFalse(q.hasWaitingConsumer());
897         checkEmpty(q);
898     }
899 
900     /**
901      * If there is a consumer waiting in timed poll, tryTransfer
902      * returns true while successfully transfering object.
903      */
testTryTransfer3()904     public void testTryTransfer3() throws InterruptedException {
905         final Object hotPotato = new Object();
906         final LinkedTransferQueue q = new LinkedTransferQueue();
907 
908         Thread t = newStartedThread(new CheckedRunnable() {
909             public void realRun() {
910                 while (! q.hasWaitingConsumer())
911                     Thread.yield();
912                 assertTrue(q.hasWaitingConsumer());
913                 checkEmpty(q);
914                 assertTrue(q.tryTransfer(hotPotato));
915             }});
916 
917         long startTime = System.nanoTime();
918         assertSame(hotPotato, q.poll(LONG_DELAY_MS, MILLISECONDS));
919         assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
920         checkEmpty(q);
921         awaitTermination(t);
922     }
923 
924     /**
925      * If there is a consumer waiting in take, tryTransfer returns
926      * true while successfully transfering object.
927      */
928     public void testTryTransfer4() throws InterruptedException {
929         final Object hotPotato = new Object();
930         final LinkedTransferQueue q = new LinkedTransferQueue();
931 
932         Thread t = newStartedThread(new CheckedRunnable() {
933             public void realRun() {
934                 while (! q.hasWaitingConsumer())
935                     Thread.yield();
936                 assertTrue(q.hasWaitingConsumer());
937                 checkEmpty(q);
938                 assertTrue(q.tryTransfer(hotPotato));
939             }});
940 
941         assertSame(q.take(), hotPotato);
942         checkEmpty(q);
943         awaitTermination(t);
944     }
945 
946     /**
947      * tryTransfer blocks interruptibly if no takers
948      */
949     public void testTryTransfer5() throws InterruptedException {
950         final LinkedTransferQueue q = new LinkedTransferQueue();
951         final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
952         assertTrue(q.isEmpty());
953 
954         Thread t = newStartedThread(new CheckedRunnable() {
955             public void realRun() throws InterruptedException {
956                 long startTime = System.nanoTime();
957                 Thread.currentThread().interrupt();
958                 try {
959                     q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
960                     shouldThrow();
961                 } catch (InterruptedException success) {}
962                 assertFalse(Thread.interrupted());
963 
964                 pleaseInterrupt.countDown();
965                 try {
966                     q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
967                     shouldThrow();
968                 } catch (InterruptedException success) {}
969                 assertFalse(Thread.interrupted());
970                 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
971             }});
972 
973         await(pleaseInterrupt);
974         assertThreadStaysAlive(t);
975         t.interrupt();
976         awaitTermination(t);
977         checkEmpty(q);
978     }
979 
980     /**
981      * tryTransfer gives up after the timeout and returns false
982      */
983     public void testTryTransfer6() throws InterruptedException {
984         final LinkedTransferQueue q = new LinkedTransferQueue();
985 
986         Thread t = newStartedThread(new CheckedRunnable() {
987             public void realRun() throws InterruptedException {
988                 long startTime = System.nanoTime();
989                 assertFalse(q.tryTransfer(new Object(),
990                                           timeoutMillis(), MILLISECONDS));
991                 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
992                 checkEmpty(q);
993             }});
994 
995         awaitTermination(t);
996         checkEmpty(q);
997     }
998 
999     /**
1000      * tryTransfer waits for any elements previously in to be removed
1001      * before transfering to a poll or take
1002      */
1003     public void testTryTransfer7() throws InterruptedException {
1004         final LinkedTransferQueue q = new LinkedTransferQueue();
1005         assertTrue(q.offer(four));
1006 
1007         Thread t = newStartedThread(new CheckedRunnable() {
1008             public void realRun() throws InterruptedException {
1009                 long startTime = System.nanoTime();
1010                 assertTrue(q.tryTransfer(five, LONG_DELAY_MS, MILLISECONDS));
1011                 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1012                 checkEmpty(q);
1013             }});
1014 
1015         while (q.size() != 2)
1016             Thread.yield();
1017         assertEquals(2, q.size());
1018         assertSame(four, q.poll());
1019         assertSame(five, q.poll());
1020         checkEmpty(q);
1021         awaitTermination(t);
1022     }
1023 
1024     /**
1025      * tryTransfer attempts to enqueue into the queue and fails
1026      * returning false not enqueueing and the successive poll is null
1027      */
1028     public void testTryTransfer8() throws InterruptedException {
1029         final LinkedTransferQueue q = new LinkedTransferQueue();
1030         assertTrue(q.offer(four));
1031         assertEquals(1, q.size());
1032         long startTime = System.nanoTime();
1033         assertFalse(q.tryTransfer(five, timeoutMillis(), MILLISECONDS));
1034         assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
1035         assertEquals(1, q.size());
1036         assertSame(four, q.poll());
1037         assertNull(q.poll());
1038         checkEmpty(q);
1039     }
1040 
1041     private LinkedTransferQueue<Integer> populatedQueue(int n) {
1042         LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
1043         checkEmpty(q);
1044         for (int i = 0; i < n; i++) {
1045             assertEquals(i, q.size());
1046             assertTrue(q.offer(i));
1047             assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
1048         }
1049         assertFalse(q.isEmpty());
1050         return q;
1051     }
1052 
1053     /**
1054      * remove(null), contains(null) always return false
1055      */
1056     public void testNeverContainsNull() {
1057         Collection<?>[] qs = {
1058             new LinkedTransferQueue<Object>(),
1059             populatedQueue(2),
1060         };
1061 
1062         for (Collection<?> q : qs) {
1063             assertFalse(q.contains(null));
1064             assertFalse(q.remove(null));
1065         }
1066     }
1067 }
1068