1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/publicdomain/zero/1.0/
5  * Other contributors include John Vint
6  */
7 
8 package jsr166;
9 
10 import junit.framework.*;
11 import java.util.Arrays;
12 import java.util.ArrayList;
13 import java.util.Collection;
14 import java.util.Iterator;
15 import java.util.List;
16 import java.util.NoSuchElementException;
17 import java.util.Queue;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.CountDownLatch;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.LinkedTransferQueue;
23 import static java.util.concurrent.TimeUnit.MILLISECONDS;
24 import static java.util.concurrent.TimeUnit.NANOSECONDS;
25 
26 @SuppressWarnings({"unchecked", "rawtypes"})
27 public class LinkedTransferQueueTest extends BlockingQueueTest {
28 
emptyCollection()29     protected BlockingQueue emptyCollection() {
30         return new LinkedTransferQueue();
31     }
32 
33     /**
34      * Constructor builds new queue with size being zero and empty
35      * being true
36      */
testConstructor1()37     public void testConstructor1() {
38         assertEquals(0, new LinkedTransferQueue().size());
39         assertTrue(new LinkedTransferQueue().isEmpty());
40     }
41 
42     /**
43      * Initializing constructor with null collection throws
44      * NullPointerException
45      */
testConstructor2()46     public void testConstructor2() {
47         try {
48             new LinkedTransferQueue(null);
49             shouldThrow();
50         } catch (NullPointerException success) {}
51     }
52 
53     /**
54      * Initializing from Collection of null elements throws
55      * NullPointerException
56      */
testConstructor3()57     public void testConstructor3() {
58         Collection<Integer> elements = Arrays.asList(new Integer[SIZE]);
59         try {
60             new LinkedTransferQueue(elements);
61             shouldThrow();
62         } catch (NullPointerException success) {}
63     }
64 
65     /**
66      * Initializing constructor with a collection containing some null elements
67      * throws NullPointerException
68      */
testConstructor4()69     public void testConstructor4() {
70         Integer[] ints = new Integer[SIZE];
71         for (int i = 0; i < SIZE-1; ++i)
72             ints[i] = i;
73         Collection<Integer> elements = Arrays.asList(ints);
74         try {
75             new LinkedTransferQueue(elements);
76             shouldThrow();
77         } catch (NullPointerException success) {}
78     }
79 
80     /**
81      * Queue contains all elements of the collection it is initialized by
82      */
testConstructor5()83     public void testConstructor5() {
84         Integer[] ints = new Integer[SIZE];
85         for (int i = 0; i < SIZE; ++i) {
86             ints[i] = i;
87         }
88         List intList = Arrays.asList(ints);
89         LinkedTransferQueue q
90             = new LinkedTransferQueue(intList);
91         assertEquals(q.size(), intList.size());
92         assertEquals(q.toString(), intList.toString());
93         assertTrue(Arrays.equals(q.toArray(),
94                                      intList.toArray()));
95         assertTrue(Arrays.equals(q.toArray(new Object[0]),
96                                  intList.toArray(new Object[0])));
97         assertTrue(Arrays.equals(q.toArray(new Object[SIZE]),
98                                  intList.toArray(new Object[SIZE])));
99         for (int i = 0; i < SIZE; ++i) {
100             assertEquals(ints[i], q.poll());
101         }
102     }
103 
104     /**
105      * remainingCapacity() always returns Integer.MAX_VALUE
106      */
testRemainingCapacity()107     public void testRemainingCapacity() {
108         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
109         for (int i = 0; i < SIZE; ++i) {
110             assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
111             assertEquals(SIZE - i, q.size());
112             q.remove();
113         }
114         for (int i = 0; i < SIZE; ++i) {
115             assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
116             assertEquals(i, q.size());
117             q.add(i);
118         }
119     }
120 
121     /**
122      * addAll(this) throws IllegalArgumentException
123      */
testAddAllSelf()124     public void testAddAllSelf() {
125         try {
126             LinkedTransferQueue q = populatedQueue(SIZE);
127             q.addAll(q);
128             shouldThrow();
129         } catch (IllegalArgumentException success) {}
130     }
131 
132     /**
133      * addAll of a collection with any null elements throws
134      * NullPointerException after possibly adding some elements
135      */
testAddAll3()136     public void testAddAll3() {
137         try {
138             LinkedTransferQueue q = new LinkedTransferQueue();
139             Integer[] ints = new Integer[SIZE];
140             for (int i = 0; i < SIZE - 1; ++i) {
141                 ints[i] = i;
142             }
143             q.addAll(Arrays.asList(ints));
144             shouldThrow();
145         } catch (NullPointerException success) {}
146     }
147 
148     /**
149      * Queue contains all elements, in traversal order, of successful addAll
150      */
testAddAll5()151     public void testAddAll5() {
152         Integer[] empty = new Integer[0];
153         Integer[] ints = new Integer[SIZE];
154         for (int i = 0; i < SIZE; ++i) {
155             ints[i] = i;
156         }
157         LinkedTransferQueue q = new LinkedTransferQueue();
158         assertFalse(q.addAll(Arrays.asList(empty)));
159         assertTrue(q.addAll(Arrays.asList(ints)));
160         for (int i = 0; i < SIZE; ++i) {
161             assertEquals(ints[i], q.poll());
162         }
163     }
164 
165     /**
166      * all elements successfully put are contained
167      */
testPut()168     public void testPut() {
169         LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
170         for (int i = 0; i < SIZE; ++i) {
171             assertEquals(i, q.size());
172             q.put(i);
173             assertTrue(q.contains(i));
174         }
175     }
176 
177     /**
178      * take retrieves elements in FIFO order
179      */
testTake()180     public void testTake() throws InterruptedException {
181         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
182         for (int i = 0; i < SIZE; ++i) {
183             assertEquals(i, (int) q.take());
184         }
185     }
186 
187     /**
188      * take removes existing elements until empty, then blocks interruptibly
189      */
testBlockingTake()190     public void testBlockingTake() throws InterruptedException {
191         final BlockingQueue q = populatedQueue(SIZE);
192         final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
193         Thread t = newStartedThread(new CheckedRunnable() {
194             public void realRun() throws InterruptedException {
195                 for (int i = 0; i < SIZE; ++i) {
196                     assertEquals(i, q.take());
197                 }
198 
199                 Thread.currentThread().interrupt();
200                 try {
201                     q.take();
202                     shouldThrow();
203                 } catch (InterruptedException success) {}
204                 assertFalse(Thread.interrupted());
205 
206                 pleaseInterrupt.countDown();
207                 try {
208                     q.take();
209                     shouldThrow();
210                 } catch (InterruptedException success) {}
211                 assertFalse(Thread.interrupted());
212             }});
213 
214         await(pleaseInterrupt);
215         assertThreadStaysAlive(t);
216         t.interrupt();
217         awaitTermination(t);
218     }
219 
220     /**
221      * poll succeeds unless empty
222      */
testPoll()223     public void testPoll() throws InterruptedException {
224         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
225         for (int i = 0; i < SIZE; ++i) {
226             assertEquals(i, (int) q.poll());
227         }
228         assertNull(q.poll());
229         checkEmpty(q);
230     }
231 
232     /**
233      * timed poll with zero timeout succeeds when non-empty, else times out
234      */
testTimedPoll0()235     public void testTimedPoll0() throws InterruptedException {
236         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
237         for (int i = 0; i < SIZE; ++i) {
238             assertEquals(i, (int) q.poll(0, MILLISECONDS));
239         }
240         assertNull(q.poll(0, MILLISECONDS));
241         checkEmpty(q);
242     }
243 
244     /**
245      * timed poll with nonzero timeout succeeds when non-empty, else times out
246      */
testTimedPoll()247     public void testTimedPoll() throws InterruptedException {
248         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
249         for (int i = 0; i < SIZE; ++i) {
250             long startTime = System.nanoTime();
251             assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
252             assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
253         }
254         long startTime = System.nanoTime();
255         assertNull(q.poll(timeoutMillis(), MILLISECONDS));
256         assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
257         checkEmpty(q);
258     }
259 
260     /**
261      * Interrupted timed poll throws InterruptedException instead of
262      * returning timeout status
263      */
testInterruptedTimedPoll()264     public void testInterruptedTimedPoll() throws InterruptedException {
265         final BlockingQueue<Integer> q = populatedQueue(SIZE);
266         final CountDownLatch aboutToWait = new CountDownLatch(1);
267         Thread t = newStartedThread(new CheckedRunnable() {
268             public void realRun() throws InterruptedException {
269                 for (int i = 0; i < SIZE; ++i) {
270                     long t0 = System.nanoTime();
271                     assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
272                     assertTrue(millisElapsedSince(t0) < SMALL_DELAY_MS);
273                 }
274                 long t0 = System.nanoTime();
275                 aboutToWait.countDown();
276                 try {
277                     q.poll(MEDIUM_DELAY_MS, MILLISECONDS);
278                     shouldThrow();
279                 } catch (InterruptedException success) {
280                     assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS);
281                 }
282             }});
283 
284         aboutToWait.await();
285         waitForThreadToEnterWaitState(t, SMALL_DELAY_MS);
286         t.interrupt();
287         awaitTermination(t, MEDIUM_DELAY_MS);
288         checkEmpty(q);
289     }
290 
291     /**
292      * timed poll after thread interrupted throws InterruptedException
293      * instead of returning timeout status
294      */
testTimedPollAfterInterrupt()295     public void testTimedPollAfterInterrupt() throws InterruptedException {
296         final BlockingQueue<Integer> q = populatedQueue(SIZE);
297         Thread t = newStartedThread(new CheckedRunnable() {
298             public void realRun() throws InterruptedException {
299                 Thread.currentThread().interrupt();
300                 for (int i = 0; i < SIZE; ++i) {
301                     long t0 = System.nanoTime();
302                     assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
303                     assertTrue(millisElapsedSince(t0) < SMALL_DELAY_MS);
304                 }
305                 try {
306                     q.poll(MEDIUM_DELAY_MS, MILLISECONDS);
307                     shouldThrow();
308                 } catch (InterruptedException success) {}
309             }});
310 
311         awaitTermination(t, MEDIUM_DELAY_MS);
312         checkEmpty(q);
313     }
314 
315     /**
316      * peek returns next element, or null if empty
317      */
testPeek()318     public void testPeek() throws InterruptedException {
319         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
320         for (int i = 0; i < SIZE; ++i) {
321             assertEquals(i, (int) q.peek());
322             assertEquals(i, (int) q.poll());
323             assertTrue(q.peek() == null ||
324                        i != (int) q.peek());
325         }
326         assertNull(q.peek());
327         checkEmpty(q);
328     }
329 
330     /**
331      * element returns next element, or throws NoSuchElementException if empty
332      */
testElement()333     public void testElement() throws InterruptedException {
334         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
335         for (int i = 0; i < SIZE; ++i) {
336             assertEquals(i, (int) q.element());
337             assertEquals(i, (int) q.poll());
338         }
339         try {
340             q.element();
341             shouldThrow();
342         } catch (NoSuchElementException success) {}
343         checkEmpty(q);
344     }
345 
346     /**
347      * remove removes next element, or throws NoSuchElementException if empty
348      */
testRemove()349     public void testRemove() throws InterruptedException {
350         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
351         for (int i = 0; i < SIZE; ++i) {
352             assertEquals(i, (int) q.remove());
353         }
354         try {
355             q.remove();
356             shouldThrow();
357         } catch (NoSuchElementException success) {}
358         checkEmpty(q);
359     }
360 
361     /**
362      * An add following remove(x) succeeds
363      */
testRemoveElementAndAdd()364     public void testRemoveElementAndAdd() throws InterruptedException {
365         LinkedTransferQueue q = new LinkedTransferQueue();
366         assertTrue(q.add(one));
367         assertTrue(q.add(two));
368         assertTrue(q.remove(one));
369         assertTrue(q.remove(two));
370         assertTrue(q.add(three));
371         assertSame(q.take(), three);
372     }
373 
374     /**
375      * contains(x) reports true when elements added but not yet removed
376      */
testContains()377     public void testContains() {
378         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
379         for (int i = 0; i < SIZE; ++i) {
380             assertTrue(q.contains(i));
381             assertEquals(i, (int) q.poll());
382             assertFalse(q.contains(i));
383         }
384     }
385 
386     /**
387      * clear removes all elements
388      */
testClear()389     public void testClear() throws InterruptedException {
390         LinkedTransferQueue q = populatedQueue(SIZE);
391         q.clear();
392         checkEmpty(q);
393         assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
394         q.add(one);
395         assertFalse(q.isEmpty());
396         assertEquals(1, q.size());
397         assertTrue(q.contains(one));
398         q.clear();
399         checkEmpty(q);
400     }
401 
402     /**
403      * containsAll(c) is true when c contains a subset of elements
404      */
testContainsAll()405     public void testContainsAll() {
406         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
407         LinkedTransferQueue<Integer> p = new LinkedTransferQueue<Integer>();
408         for (int i = 0; i < SIZE; ++i) {
409             assertTrue(q.containsAll(p));
410             assertFalse(p.containsAll(q));
411             p.add(i);
412         }
413         assertTrue(p.containsAll(q));
414     }
415 
416     /**
417      * retainAll(c) retains only those elements of c and reports true
418      * if changed
419      */
testRetainAll()420     public void testRetainAll() {
421         LinkedTransferQueue q = populatedQueue(SIZE);
422         LinkedTransferQueue p = populatedQueue(SIZE);
423         for (int i = 0; i < SIZE; ++i) {
424             boolean changed = q.retainAll(p);
425             if (i == 0) {
426                 assertFalse(changed);
427             } else {
428                 assertTrue(changed);
429             }
430             assertTrue(q.containsAll(p));
431             assertEquals(SIZE - i, q.size());
432             p.remove();
433         }
434     }
435 
436     /**
437      * removeAll(c) removes only those elements of c and reports true
438      * if changed
439      */
testRemoveAll()440     public void testRemoveAll() {
441         for (int i = 1; i < SIZE; ++i) {
442             LinkedTransferQueue q = populatedQueue(SIZE);
443             LinkedTransferQueue p = populatedQueue(i);
444             assertTrue(q.removeAll(p));
445             assertEquals(SIZE - i, q.size());
446             for (int j = 0; j < i; ++j) {
447                 assertFalse(q.contains(p.remove()));
448             }
449         }
450     }
451 
452     /**
453      * toArray() contains all elements in FIFO order
454      */
testToArray()455     public void testToArray() {
456         LinkedTransferQueue q = populatedQueue(SIZE);
457         Object[] o = q.toArray();
458         for (int i = 0; i < o.length; i++) {
459             assertSame(o[i], q.poll());
460         }
461     }
462 
463     /**
464      * toArray(a) contains all elements in FIFO order
465      */
testToArray2()466     public void testToArray2() {
467         LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
468         Integer[] ints = new Integer[SIZE];
469         Integer[] array = q.toArray(ints);
470         assertSame(ints, array);
471         for (int i = 0; i < ints.length; i++) {
472             assertSame(ints[i], q.poll());
473         }
474     }
475 
476     /**
477      * toArray(incompatible array type) throws ArrayStoreException
478      */
testToArray1_BadArg()479     public void testToArray1_BadArg() {
480         LinkedTransferQueue q = populatedQueue(SIZE);
481         try {
482             q.toArray(new String[10]);
483             shouldThrow();
484         } catch (ArrayStoreException success) {}
485     }
486 
487     /**
488      * iterator iterates through all elements
489      */
testIterator()490     public void testIterator() throws InterruptedException {
491         LinkedTransferQueue q = populatedQueue(SIZE);
492         Iterator it = q.iterator();
493         int i = 0;
494         while (it.hasNext()) {
495             assertEquals(it.next(), i++);
496         }
497         assertEquals(i, SIZE);
498     }
499 
500     /**
501      * iterator.remove() removes current element
502      */
testIteratorRemove()503     public void testIteratorRemove() {
504         final LinkedTransferQueue q = new LinkedTransferQueue();
505         q.add(two);
506         q.add(one);
507         q.add(three);
508 
509         Iterator it = q.iterator();
510         it.next();
511         it.remove();
512 
513         it = q.iterator();
514         assertSame(it.next(), one);
515         assertSame(it.next(), three);
516         assertFalse(it.hasNext());
517     }
518 
519     /**
520      * iterator ordering is FIFO
521      */
testIteratorOrdering()522     public void testIteratorOrdering() {
523         final LinkedTransferQueue<Integer> q
524             = new LinkedTransferQueue<Integer>();
525         assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
526         q.add(one);
527         q.add(two);
528         q.add(three);
529         assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
530         int k = 0;
531         for (Integer n : q) {
532             assertEquals(++k, (int) n);
533         }
534         assertEquals(3, k);
535     }
536 
537     /**
538      * Modifications do not cause iterators to fail
539      */
testWeaklyConsistentIteration()540     public void testWeaklyConsistentIteration() {
541         final LinkedTransferQueue q = new LinkedTransferQueue();
542         q.add(one);
543         q.add(two);
544         q.add(three);
545         for (Iterator it = q.iterator(); it.hasNext();) {
546             q.remove();
547             it.next();
548         }
549         assertEquals(0, q.size());
550     }
551 
552     /**
553      * toString contains toStrings of elements
554      */
testToString()555     public void testToString() {
556         LinkedTransferQueue q = populatedQueue(SIZE);
557         String s = q.toString();
558         for (int i = 0; i < SIZE; ++i) {
559             assertTrue(s.contains(String.valueOf(i)));
560         }
561     }
562 
563     /**
564      * offer transfers elements across Executor tasks
565      */
testOfferInExecutor()566     public void testOfferInExecutor() {
567         final LinkedTransferQueue q = new LinkedTransferQueue();
568         final CheckedBarrier threadsStarted = new CheckedBarrier(2);
569         ExecutorService executor = Executors.newFixedThreadPool(2);
570 
571         executor.execute(new CheckedRunnable() {
572             public void realRun() throws InterruptedException {
573                 threadsStarted.await();
574                 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
575             }});
576 
577         executor.execute(new CheckedRunnable() {
578             public void realRun() throws InterruptedException {
579                 threadsStarted.await();
580                 assertSame(one, q.take());
581                 checkEmpty(q);
582             }});
583 
584         joinPool(executor);
585     }
586 
587     /**
588      * timed poll retrieves elements across Executor threads
589      */
testPollInExecutor()590     public void testPollInExecutor() {
591         final LinkedTransferQueue q = new LinkedTransferQueue();
592         final CheckedBarrier threadsStarted = new CheckedBarrier(2);
593         ExecutorService executor = Executors.newFixedThreadPool(2);
594 
595         executor.execute(new CheckedRunnable() {
596             public void realRun() throws InterruptedException {
597                 assertNull(q.poll());
598                 threadsStarted.await();
599                 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
600                 checkEmpty(q);
601             }});
602 
603         executor.execute(new CheckedRunnable() {
604             public void realRun() throws InterruptedException {
605                 threadsStarted.await();
606                 q.put(one);
607             }});
608 
609         joinPool(executor);
610     }
611 
612     /**
613      * A deserialized serialized queue has same elements in same order
614      */
testSerialization()615     public void testSerialization() throws Exception {
616         Queue x = populatedQueue(SIZE);
617         Queue y = serialClone(x);
618 
619         assertNotSame(y, x);
620         assertEquals(x.size(), y.size());
621         assertEquals(x.toString(), y.toString());
622         assertTrue(Arrays.equals(x.toArray(), y.toArray()));
623         while (!x.isEmpty()) {
624             assertFalse(y.isEmpty());
625             assertEquals(x.remove(), y.remove());
626         }
627         assertTrue(y.isEmpty());
628     }
629 
630     /**
631      * drainTo(c) empties queue into another collection c
632      */
testDrainTo()633     public void testDrainTo() {
634         LinkedTransferQueue q = populatedQueue(SIZE);
635         ArrayList l = new ArrayList();
636         q.drainTo(l);
637         assertEquals(0, q.size());
638         assertEquals(SIZE, l.size());
639         for (int i = 0; i < SIZE; ++i) {
640             assertEquals(i, l.get(i));
641         }
642         q.add(zero);
643         q.add(one);
644         assertFalse(q.isEmpty());
645         assertTrue(q.contains(zero));
646         assertTrue(q.contains(one));
647         l.clear();
648         q.drainTo(l);
649         assertEquals(0, q.size());
650         assertEquals(2, l.size());
651         for (int i = 0; i < 2; ++i) {
652             assertEquals(i, l.get(i));
653         }
654     }
655 
656     /**
657      * drainTo(c) empties full queue, unblocking a waiting put.
658      */
testDrainToWithActivePut()659     public void testDrainToWithActivePut() throws InterruptedException {
660         final LinkedTransferQueue q = populatedQueue(SIZE);
661         Thread t = newStartedThread(new CheckedRunnable() {
662             public void realRun() {
663                 q.put(SIZE + 1);
664             }});
665         ArrayList l = new ArrayList();
666         q.drainTo(l);
667         assertTrue(l.size() >= SIZE);
668         for (int i = 0; i < SIZE; ++i)
669             assertEquals(i, l.get(i));
670         awaitTermination(t, MEDIUM_DELAY_MS);
671         assertTrue(q.size() + l.size() >= SIZE);
672     }
673 
674     /**
675      * drainTo(c, n) empties first min(n, size) elements of queue into c
676      */
testDrainToN()677     public void testDrainToN() {
678         LinkedTransferQueue q = new LinkedTransferQueue();
679         for (int i = 0; i < SIZE + 2; ++i) {
680             for (int j = 0; j < SIZE; j++) {
681                 assertTrue(q.offer(j));
682             }
683             ArrayList l = new ArrayList();
684             q.drainTo(l, i);
685             int k = (i < SIZE) ? i : SIZE;
686             assertEquals(k, l.size());
687             assertEquals(SIZE - k, q.size());
688             for (int j = 0; j < k; ++j)
689                 assertEquals(j, l.get(j));
690             while (q.poll() != null)
691                 ;
692         }
693     }
694 
695     /**
696      * timed poll() or take() increments the waiting consumer count;
697      * offer(e) decrements the waiting consumer count
698      */
testWaitingConsumer()699     public void testWaitingConsumer() throws InterruptedException {
700         final LinkedTransferQueue q = new LinkedTransferQueue();
701         assertEquals(0, q.getWaitingConsumerCount());
702         assertFalse(q.hasWaitingConsumer());
703         final CountDownLatch threadStarted = new CountDownLatch(1);
704 
705         Thread t = newStartedThread(new CheckedRunnable() {
706             public void realRun() throws InterruptedException {
707                 threadStarted.countDown();
708                 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
709                 assertEquals(0, q.getWaitingConsumerCount());
710                 assertFalse(q.hasWaitingConsumer());
711             }});
712 
713         threadStarted.await();
714         waitForThreadToEnterWaitState(t, SMALL_DELAY_MS);
715         assertEquals(1, q.getWaitingConsumerCount());
716         assertTrue(q.hasWaitingConsumer());
717 
718         assertTrue(q.offer(one));
719         assertEquals(0, q.getWaitingConsumerCount());
720         assertFalse(q.hasWaitingConsumer());
721 
722         awaitTermination(t, MEDIUM_DELAY_MS);
723     }
724 
725     /**
726      * transfer(null) throws NullPointerException
727      */
testTransfer1()728     public void testTransfer1() throws InterruptedException {
729         try {
730             LinkedTransferQueue q = new LinkedTransferQueue();
731             q.transfer(null);
732             shouldThrow();
733         } catch (NullPointerException success) {}
734     }
735 
736     /**
737      * transfer waits until a poll occurs. The transfered element
738      * is returned by this associated poll.
739      */
testTransfer2()740     public void testTransfer2() throws InterruptedException {
741         final LinkedTransferQueue<Integer> q
742             = new LinkedTransferQueue<Integer>();
743         final CountDownLatch threadStarted = new CountDownLatch(1);
744 
745         Thread t = newStartedThread(new CheckedRunnable() {
746             public void realRun() throws InterruptedException {
747                 threadStarted.countDown();
748                 q.transfer(five);
749                 checkEmpty(q);
750             }});
751 
752         threadStarted.await();
753         waitForThreadToEnterWaitState(t, SMALL_DELAY_MS);
754         assertEquals(1, q.size());
755         assertSame(five, q.poll());
756         checkEmpty(q);
757         awaitTermination(t, MEDIUM_DELAY_MS);
758     }
759 
760     /**
761      * transfer waits until a poll occurs, and then transfers in fifo order
762      */
testTransfer3()763     public void testTransfer3() throws InterruptedException {
764         final LinkedTransferQueue<Integer> q
765             = new LinkedTransferQueue<Integer>();
766 
767         Thread first = newStartedThread(new CheckedRunnable() {
768             public void realRun() throws InterruptedException {
769                 q.transfer(four);
770                 assertTrue(!q.contains(four));
771                 assertEquals(1, q.size());
772             }});
773 
774         Thread interruptedThread = newStartedThread(
775             new CheckedInterruptedRunnable() {
776                 public void realRun() throws InterruptedException {
777                     while (q.isEmpty())
778                         Thread.yield();
779                     q.transfer(five);
780                 }});
781 
782         while (q.size() < 2)
783             Thread.yield();
784         assertEquals(2, q.size());
785         assertSame(four, q.poll());
786         first.join();
787         assertEquals(1, q.size());
788         interruptedThread.interrupt();
789         interruptedThread.join();
790         checkEmpty(q);
791     }
792 
793     /**
794      * transfer waits until a poll occurs, at which point the polling
795      * thread returns the element
796      */
testTransfer4()797     public void testTransfer4() throws InterruptedException {
798         final LinkedTransferQueue q = new LinkedTransferQueue();
799 
800         Thread t = newStartedThread(new CheckedRunnable() {
801             public void realRun() throws InterruptedException {
802                 q.transfer(four);
803                 assertFalse(q.contains(four));
804                 assertSame(three, q.poll());
805             }});
806 
807         while (q.isEmpty())
808             Thread.yield();
809         assertFalse(q.isEmpty());
810         assertEquals(1, q.size());
811         assertTrue(q.offer(three));
812         assertSame(four, q.poll());
813         awaitTermination(t, MEDIUM_DELAY_MS);
814     }
815 
816     /**
817      * transfer waits until a take occurs. The transfered element
818      * is returned by this associated take.
819      */
testTransfer5()820     public void testTransfer5() throws InterruptedException {
821         final LinkedTransferQueue<Integer> q
822             = new LinkedTransferQueue<Integer>();
823 
824         Thread t = newStartedThread(new CheckedRunnable() {
825             public void realRun() throws InterruptedException {
826                 q.transfer(four);
827                 checkEmpty(q);
828             }});
829 
830         while (q.isEmpty())
831             Thread.yield();
832         assertFalse(q.isEmpty());
833         assertEquals(1, q.size());
834         assertSame(four, q.take());
835         checkEmpty(q);
836         awaitTermination(t, MEDIUM_DELAY_MS);
837     }
838 
839     /**
840      * tryTransfer(null) throws NullPointerException
841      */
testTryTransfer1()842     public void testTryTransfer1() {
843         try {
844             final LinkedTransferQueue q = new LinkedTransferQueue();
845             q.tryTransfer(null);
846             shouldThrow();
847         } catch (NullPointerException success) {}
848     }
849 
850     /**
851      * tryTransfer returns false and does not enqueue if there are no
852      * consumers waiting to poll or take.
853      */
testTryTransfer2()854     public void testTryTransfer2() throws InterruptedException {
855         final LinkedTransferQueue q = new LinkedTransferQueue();
856         assertFalse(q.tryTransfer(new Object()));
857         assertFalse(q.hasWaitingConsumer());
858         checkEmpty(q);
859     }
860 
861     /**
862      * If there is a consumer waiting in timed poll, tryTransfer
863      * returns true while successfully transfering object.
864      */
testTryTransfer3()865     public void testTryTransfer3() throws InterruptedException {
866         final Object hotPotato = new Object();
867         final LinkedTransferQueue q = new LinkedTransferQueue();
868 
869         Thread t = newStartedThread(new CheckedRunnable() {
870             public void realRun() {
871                 while (! q.hasWaitingConsumer())
872                     Thread.yield();
873                 assertTrue(q.hasWaitingConsumer());
874                 checkEmpty(q);
875                 assertTrue(q.tryTransfer(hotPotato));
876             }});
877 
878         assertSame(hotPotato, q.poll(MEDIUM_DELAY_MS, MILLISECONDS));
879         checkEmpty(q);
880         awaitTermination(t, MEDIUM_DELAY_MS);
881     }
882 
883     /**
884      * If there is a consumer waiting in take, tryTransfer returns
885      * true while successfully transfering object.
886      */
testTryTransfer4()887     public void testTryTransfer4() throws InterruptedException {
888         final Object hotPotato = new Object();
889         final LinkedTransferQueue q = new LinkedTransferQueue();
890 
891         Thread t = newStartedThread(new CheckedRunnable() {
892             public void realRun() {
893                 while (! q.hasWaitingConsumer())
894                     Thread.yield();
895                 assertTrue(q.hasWaitingConsumer());
896                 checkEmpty(q);
897                 assertTrue(q.tryTransfer(hotPotato));
898             }});
899 
900         assertSame(q.take(), hotPotato);
901         checkEmpty(q);
902         awaitTermination(t, MEDIUM_DELAY_MS);
903     }
904 
905     /**
906      * tryTransfer blocks interruptibly if no takers
907      */
testTryTransfer5()908     public void testTryTransfer5() throws InterruptedException {
909         final LinkedTransferQueue q = new LinkedTransferQueue();
910         final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
911         assertTrue(q.isEmpty());
912 
913         Thread t = newStartedThread(new CheckedRunnable() {
914             public void realRun() throws InterruptedException {
915                 Thread.currentThread().interrupt();
916                 try {
917                     q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
918                     shouldThrow();
919                 } catch (InterruptedException success) {}
920                 assertFalse(Thread.interrupted());
921 
922                 pleaseInterrupt.countDown();
923                 try {
924                     q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
925                     shouldThrow();
926                 } catch (InterruptedException success) {}
927                 assertFalse(Thread.interrupted());
928             }});
929 
930         await(pleaseInterrupt);
931         assertThreadStaysAlive(t);
932         t.interrupt();
933         awaitTermination(t);
934         checkEmpty(q);
935     }
936 
937     /**
938      * tryTransfer gives up after the timeout and returns false
939      */
testTryTransfer6()940     public void testTryTransfer6() throws InterruptedException {
941         final LinkedTransferQueue q = new LinkedTransferQueue();
942 
943         Thread t = newStartedThread(new CheckedRunnable() {
944             public void realRun() throws InterruptedException {
945                 long t0 = System.nanoTime();
946                 assertFalse(q.tryTransfer(new Object(),
947                                           timeoutMillis(), MILLISECONDS));
948                 assertTrue(millisElapsedSince(t0) >= timeoutMillis());
949                 checkEmpty(q);
950             }});
951 
952         awaitTermination(t);
953         checkEmpty(q);
954     }
955 
956     /**
957      * tryTransfer waits for any elements previously in to be removed
958      * before transfering to a poll or take
959      */
testTryTransfer7()960     public void testTryTransfer7() throws InterruptedException {
961         final LinkedTransferQueue q = new LinkedTransferQueue();
962         assertTrue(q.offer(four));
963 
964         Thread t = newStartedThread(new CheckedRunnable() {
965             public void realRun() throws InterruptedException {
966                 assertTrue(q.tryTransfer(five, MEDIUM_DELAY_MS, MILLISECONDS));
967                 checkEmpty(q);
968             }});
969 
970         while (q.size() != 2)
971             Thread.yield();
972         assertEquals(2, q.size());
973         assertSame(four, q.poll());
974         assertSame(five, q.poll());
975         checkEmpty(q);
976         awaitTermination(t, MEDIUM_DELAY_MS);
977     }
978 
979     /**
980      * tryTransfer attempts to enqueue into the queue and fails
981      * returning false not enqueueing and the successive poll is null
982      */
testTryTransfer8()983     public void testTryTransfer8() throws InterruptedException {
984         final LinkedTransferQueue q = new LinkedTransferQueue();
985         assertTrue(q.offer(four));
986         assertEquals(1, q.size());
987         long t0 = System.nanoTime();
988         assertFalse(q.tryTransfer(five, timeoutMillis(), MILLISECONDS));
989         assertTrue(millisElapsedSince(t0) >= timeoutMillis());
990         assertEquals(1, q.size());
991         assertSame(four, q.poll());
992         assertNull(q.poll());
993         checkEmpty(q);
994     }
995 
populatedQueue(int n)996     private LinkedTransferQueue<Integer> populatedQueue(int n) {
997         LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
998         checkEmpty(q);
999         for (int i = 0; i < n; i++) {
1000             assertEquals(i, q.size());
1001             assertTrue(q.offer(i));
1002             assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
1003         }
1004         assertFalse(q.isEmpty());
1005         return q;
1006     }
1007 }
1008