1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/publicdomain/zero/1.0/
5  * Other contributors include Andrew Wright, Jeffrey Hayes,
6  * Pat Fisher, Mike Judd.
7  */
8 
9 package jsr166;
10 
11 import junit.framework.*;
12 import java.util.Arrays;
13 import java.util.ArrayList;
14 import java.util.Collection;
15 import java.util.Comparator;
16 import java.util.Iterator;
17 import java.util.NoSuchElementException;
18 import java.util.Queue;
19 import java.util.concurrent.PriorityBlockingQueue;
20 import java.util.concurrent.BlockingQueue;
21 import java.util.concurrent.CountDownLatch;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ExecutorService;
24 import static java.util.concurrent.TimeUnit.MILLISECONDS;
25 
26 public class PriorityBlockingQueueTest extends JSR166TestCase {
27 
28     private static final int NOCAP = Integer.MAX_VALUE;
29 
30     /** Sample Comparator */
31     static class MyReverseComparator implements Comparator {
compare(Object x, Object y)32         public int compare(Object x, Object y) {
33             return ((Comparable)y).compareTo(x);
34         }
35     }
36 
37     /**
38      * Returns a new queue of given size containing consecutive
39      * Integers 0 ... n.
40      */
populatedQueue(int n)41     private PriorityBlockingQueue<Integer> populatedQueue(int n) {
42         PriorityBlockingQueue<Integer> q =
43             new PriorityBlockingQueue<Integer>(n);
44         assertTrue(q.isEmpty());
45         for (int i = n-1; i >= 0; i-=2)
46             assertTrue(q.offer(new Integer(i)));
47         for (int i = (n & 1); i < n; i+=2)
48             assertTrue(q.offer(new Integer(i)));
49         assertFalse(q.isEmpty());
50         assertEquals(NOCAP, q.remainingCapacity());
51         assertEquals(n, q.size());
52         return q;
53     }
54 
55     /**
56      * A new queue has unbounded capacity
57      */
testConstructor1()58     public void testConstructor1() {
59         assertEquals(NOCAP, new PriorityBlockingQueue(SIZE).remainingCapacity());
60     }
61 
62     /**
63      * Constructor throws IAE if capacity argument nonpositive
64      */
testConstructor2()65     public void testConstructor2() {
66         try {
67             new PriorityBlockingQueue(0);
68             shouldThrow();
69         } catch (IllegalArgumentException success) {}
70     }
71 
72     /**
73      * Initializing from null Collection throws NPE
74      */
testConstructor3()75     public void testConstructor3() {
76         try {
77             new PriorityBlockingQueue(null);
78             shouldThrow();
79         } catch (NullPointerException success) {}
80     }
81 
82     /**
83      * Initializing from Collection of null elements throws NPE
84      */
testConstructor4()85     public void testConstructor4() {
86         Collection<Integer> elements = Arrays.asList(new Integer[SIZE]);
87         try {
88             new PriorityBlockingQueue(elements);
89             shouldThrow();
90         } catch (NullPointerException success) {}
91     }
92 
93     /**
94      * Initializing from Collection with some null elements throws NPE
95      */
testConstructor5()96     public void testConstructor5() {
97         Integer[] ints = new Integer[SIZE];
98         for (int i = 0; i < SIZE-1; ++i)
99             ints[i] = i;
100         Collection<Integer> elements = Arrays.asList(ints);
101         try {
102             new PriorityBlockingQueue(elements);
103             shouldThrow();
104         } catch (NullPointerException success) {}
105     }
106 
107     /**
108      * Queue contains all elements of collection used to initialize
109      */
testConstructor6()110     public void testConstructor6() {
111         Integer[] ints = new Integer[SIZE];
112         for (int i = 0; i < SIZE; ++i)
113             ints[i] = i;
114         PriorityBlockingQueue q = new PriorityBlockingQueue(Arrays.asList(ints));
115         for (int i = 0; i < SIZE; ++i)
116             assertEquals(ints[i], q.poll());
117     }
118 
119     /**
120      * The comparator used in constructor is used
121      */
testConstructor7()122     public void testConstructor7() {
123         MyReverseComparator cmp = new MyReverseComparator();
124         PriorityBlockingQueue q = new PriorityBlockingQueue(SIZE, cmp);
125         assertEquals(cmp, q.comparator());
126         Integer[] ints = new Integer[SIZE];
127         for (int i = 0; i < SIZE; ++i)
128             ints[i] = new Integer(i);
129         q.addAll(Arrays.asList(ints));
130         for (int i = SIZE-1; i >= 0; --i)
131             assertEquals(ints[i], q.poll());
132     }
133 
134     /**
135      * isEmpty is true before add, false after
136      */
testEmpty()137     public void testEmpty() {
138         PriorityBlockingQueue q = new PriorityBlockingQueue(2);
139         assertTrue(q.isEmpty());
140         assertEquals(NOCAP, q.remainingCapacity());
141         q.add(one);
142         assertFalse(q.isEmpty());
143         q.add(two);
144         q.remove();
145         q.remove();
146         assertTrue(q.isEmpty());
147     }
148 
149     /**
150      * remainingCapacity does not change when elements added or removed,
151      * but size does
152      */
testRemainingCapacity()153     public void testRemainingCapacity() {
154         PriorityBlockingQueue q = populatedQueue(SIZE);
155         for (int i = 0; i < SIZE; ++i) {
156             assertEquals(NOCAP, q.remainingCapacity());
157             assertEquals(SIZE-i, q.size());
158             q.remove();
159         }
160         for (int i = 0; i < SIZE; ++i) {
161             assertEquals(NOCAP, q.remainingCapacity());
162             assertEquals(i, q.size());
163             q.add(new Integer(i));
164         }
165     }
166 
167     /**
168      * Offer of comparable element succeeds
169      */
testOffer()170     public void testOffer() {
171         PriorityBlockingQueue q = new PriorityBlockingQueue(1);
172         assertTrue(q.offer(zero));
173         assertTrue(q.offer(one));
174     }
175 
176     /**
177      * Offer of non-Comparable throws CCE
178      */
testOfferNonComparable()179     public void testOfferNonComparable() {
180         try {
181             PriorityBlockingQueue q = new PriorityBlockingQueue(1);
182             q.offer(new Object());
183             q.offer(new Object());
184             q.offer(new Object());
185             shouldThrow();
186         } catch (ClassCastException success) {}
187     }
188 
189     /**
190      * add of comparable succeeds
191      */
testAdd()192     public void testAdd() {
193         PriorityBlockingQueue q = new PriorityBlockingQueue(SIZE);
194         for (int i = 0; i < SIZE; ++i) {
195             assertEquals(i, q.size());
196             assertTrue(q.add(new Integer(i)));
197         }
198     }
199 
200     /**
201      * addAll(this) throws IAE
202      */
testAddAllSelf()203     public void testAddAllSelf() {
204         try {
205             PriorityBlockingQueue q = populatedQueue(SIZE);
206             q.addAll(q);
207             shouldThrow();
208         } catch (IllegalArgumentException success) {}
209     }
210 
211     /**
212      * addAll of a collection with any null elements throws NPE after
213      * possibly adding some elements
214      */
testAddAll3()215     public void testAddAll3() {
216         try {
217             PriorityBlockingQueue q = new PriorityBlockingQueue(SIZE);
218             Integer[] ints = new Integer[SIZE];
219             for (int i = 0; i < SIZE-1; ++i)
220                 ints[i] = new Integer(i);
221             q.addAll(Arrays.asList(ints));
222             shouldThrow();
223         } catch (NullPointerException success) {}
224     }
225 
226     /**
227      * Queue contains all elements of successful addAll
228      */
testAddAll5()229     public void testAddAll5() {
230         Integer[] empty = new Integer[0];
231         Integer[] ints = new Integer[SIZE];
232         for (int i = SIZE-1; i >= 0; --i)
233             ints[i] = new Integer(i);
234         PriorityBlockingQueue q = new PriorityBlockingQueue(SIZE);
235         assertFalse(q.addAll(Arrays.asList(empty)));
236         assertTrue(q.addAll(Arrays.asList(ints)));
237         for (int i = 0; i < SIZE; ++i)
238             assertEquals(ints[i], q.poll());
239     }
240 
241     /**
242      * all elements successfully put are contained
243      */
testPut()244     public void testPut() {
245         PriorityBlockingQueue q = new PriorityBlockingQueue(SIZE);
246         for (int i = 0; i < SIZE; ++i) {
247             Integer I = new Integer(i);
248             q.put(I);
249             assertTrue(q.contains(I));
250         }
251         assertEquals(SIZE, q.size());
252     }
253 
254     /**
255      * put doesn't block waiting for take
256      */
testPutWithTake()257     public void testPutWithTake() throws InterruptedException {
258         final PriorityBlockingQueue q = new PriorityBlockingQueue(2);
259         final int size = 4;
260         Thread t = newStartedThread(new CheckedRunnable() {
261             public void realRun() {
262                 for (int i = 0; i < size; i++)
263                     q.put(new Integer(0));
264             }});
265 
266         awaitTermination(t);
267         assertEquals(size, q.size());
268         q.take();
269     }
270 
271     /**
272      * timed offer does not time out
273      */
testTimedOffer()274     public void testTimedOffer() throws InterruptedException {
275         final PriorityBlockingQueue q = new PriorityBlockingQueue(2);
276         Thread t = newStartedThread(new CheckedRunnable() {
277             public void realRun() {
278                 q.put(new Integer(0));
279                 q.put(new Integer(0));
280                 assertTrue(q.offer(new Integer(0), SHORT_DELAY_MS, MILLISECONDS));
281                 assertTrue(q.offer(new Integer(0), LONG_DELAY_MS, MILLISECONDS));
282             }});
283 
284         awaitTermination(t);
285     }
286 
287     /**
288      * take retrieves elements in priority order
289      */
testTake()290     public void testTake() throws InterruptedException {
291         PriorityBlockingQueue q = populatedQueue(SIZE);
292         for (int i = 0; i < SIZE; ++i) {
293             assertEquals(i, q.take());
294         }
295     }
296 
297     /**
298      * Take removes existing elements until empty, then blocks interruptibly
299      */
testBlockingTake()300     public void testBlockingTake() throws InterruptedException {
301         final PriorityBlockingQueue q = populatedQueue(SIZE);
302         final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
303         Thread t = newStartedThread(new CheckedRunnable() {
304             public void realRun() throws InterruptedException {
305                 for (int i = 0; i < SIZE; ++i) {
306                     assertEquals(i, q.take());
307                 }
308 
309                 Thread.currentThread().interrupt();
310                 try {
311                     q.take();
312                     shouldThrow();
313                 } catch (InterruptedException success) {}
314                 assertFalse(Thread.interrupted());
315 
316                 pleaseInterrupt.countDown();
317                 try {
318                     q.take();
319                     shouldThrow();
320                 } catch (InterruptedException success) {}
321                 assertFalse(Thread.interrupted());
322             }});
323 
324         await(pleaseInterrupt);
325         assertThreadStaysAlive(t);
326         t.interrupt();
327         awaitTermination(t);
328     }
329 
330     /**
331      * poll succeeds unless empty
332      */
testPoll()333     public void testPoll() {
334         PriorityBlockingQueue q = populatedQueue(SIZE);
335         for (int i = 0; i < SIZE; ++i) {
336             assertEquals(i, q.poll());
337         }
338         assertNull(q.poll());
339     }
340 
341     /**
342      * timed poll with zero timeout succeeds when non-empty, else times out
343      */
testTimedPoll0()344     public void testTimedPoll0() throws InterruptedException {
345         PriorityBlockingQueue q = populatedQueue(SIZE);
346         for (int i = 0; i < SIZE; ++i) {
347             assertEquals(i, q.poll(0, MILLISECONDS));
348         }
349         assertNull(q.poll(0, MILLISECONDS));
350     }
351 
352     /**
353      * timed poll with nonzero timeout succeeds when non-empty, else times out
354      */
testTimedPoll()355     public void testTimedPoll() throws InterruptedException {
356         PriorityBlockingQueue<Integer> q = populatedQueue(SIZE);
357         for (int i = 0; i < SIZE; ++i) {
358             long startTime = System.nanoTime();
359             assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
360             assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
361         }
362         long startTime = System.nanoTime();
363         assertNull(q.poll(timeoutMillis(), MILLISECONDS));
364         assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
365         checkEmpty(q);
366     }
367 
368     /**
369      * Interrupted timed poll throws InterruptedException instead of
370      * returning timeout status
371      */
testInterruptedTimedPoll()372     public void testInterruptedTimedPoll() throws InterruptedException {
373         final BlockingQueue<Integer> q = populatedQueue(SIZE);
374         final CountDownLatch aboutToWait = new CountDownLatch(1);
375         Thread t = newStartedThread(new CheckedRunnable() {
376             public void realRun() throws InterruptedException {
377                 for (int i = 0; i < SIZE; ++i) {
378                     long t0 = System.nanoTime();
379                     assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
380                     assertTrue(millisElapsedSince(t0) < SMALL_DELAY_MS);
381                 }
382                 long t0 = System.nanoTime();
383                 aboutToWait.countDown();
384                 try {
385                     q.poll(LONG_DELAY_MS, MILLISECONDS);
386                     shouldThrow();
387                 } catch (InterruptedException success) {
388                     assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS);
389                 }
390             }});
391 
392         aboutToWait.await();
393         waitForThreadToEnterWaitState(t, SMALL_DELAY_MS);
394         t.interrupt();
395         awaitTermination(t, MEDIUM_DELAY_MS);
396     }
397 
398     /**
399      * peek returns next element, or null if empty
400      */
testPeek()401     public void testPeek() {
402         PriorityBlockingQueue q = populatedQueue(SIZE);
403         for (int i = 0; i < SIZE; ++i) {
404             assertEquals(i, q.peek());
405             assertEquals(i, q.poll());
406             assertTrue(q.peek() == null ||
407                        !q.peek().equals(i));
408         }
409         assertNull(q.peek());
410     }
411 
412     /**
413      * element returns next element, or throws NSEE if empty
414      */
testElement()415     public void testElement() {
416         PriorityBlockingQueue q = populatedQueue(SIZE);
417         for (int i = 0; i < SIZE; ++i) {
418             assertEquals(i, q.element());
419             assertEquals(i, q.poll());
420         }
421         try {
422             q.element();
423             shouldThrow();
424         } catch (NoSuchElementException success) {}
425     }
426 
427     /**
428      * remove removes next element, or throws NSEE if empty
429      */
testRemove()430     public void testRemove() {
431         PriorityBlockingQueue q = populatedQueue(SIZE);
432         for (int i = 0; i < SIZE; ++i) {
433             assertEquals(i, q.remove());
434         }
435         try {
436             q.remove();
437             shouldThrow();
438         } catch (NoSuchElementException success) {}
439     }
440 
441     /**
442      * contains(x) reports true when elements added but not yet removed
443      */
testContains()444     public void testContains() {
445         PriorityBlockingQueue q = populatedQueue(SIZE);
446         for (int i = 0; i < SIZE; ++i) {
447             assertTrue(q.contains(new Integer(i)));
448             q.poll();
449             assertFalse(q.contains(new Integer(i)));
450         }
451     }
452 
453     /**
454      * clear removes all elements
455      */
testClear()456     public void testClear() {
457         PriorityBlockingQueue q = populatedQueue(SIZE);
458         q.clear();
459         assertTrue(q.isEmpty());
460         assertEquals(0, q.size());
461         q.add(one);
462         assertFalse(q.isEmpty());
463         assertTrue(q.contains(one));
464         q.clear();
465         assertTrue(q.isEmpty());
466     }
467 
468     /**
469      * containsAll(c) is true when c contains a subset of elements
470      */
testContainsAll()471     public void testContainsAll() {
472         PriorityBlockingQueue q = populatedQueue(SIZE);
473         PriorityBlockingQueue p = new PriorityBlockingQueue(SIZE);
474         for (int i = 0; i < SIZE; ++i) {
475             assertTrue(q.containsAll(p));
476             assertFalse(p.containsAll(q));
477             p.add(new Integer(i));
478         }
479         assertTrue(p.containsAll(q));
480     }
481 
482     /**
483      * retainAll(c) retains only those elements of c and reports true if changed
484      */
testRetainAll()485     public void testRetainAll() {
486         PriorityBlockingQueue q = populatedQueue(SIZE);
487         PriorityBlockingQueue p = populatedQueue(SIZE);
488         for (int i = 0; i < SIZE; ++i) {
489             boolean changed = q.retainAll(p);
490             if (i == 0)
491                 assertFalse(changed);
492             else
493                 assertTrue(changed);
494 
495             assertTrue(q.containsAll(p));
496             assertEquals(SIZE-i, q.size());
497             p.remove();
498         }
499     }
500 
501     /**
502      * removeAll(c) removes only those elements of c and reports true if changed
503      */
testRemoveAll()504     public void testRemoveAll() {
505         for (int i = 1; i < SIZE; ++i) {
506             PriorityBlockingQueue q = populatedQueue(SIZE);
507             PriorityBlockingQueue p = populatedQueue(i);
508             assertTrue(q.removeAll(p));
509             assertEquals(SIZE-i, q.size());
510             for (int j = 0; j < i; ++j) {
511                 Integer I = (Integer)(p.remove());
512                 assertFalse(q.contains(I));
513             }
514         }
515     }
516 
517     /**
518      * toArray contains all elements
519      */
testToArray()520     public void testToArray() throws InterruptedException {
521         PriorityBlockingQueue q = populatedQueue(SIZE);
522         Object[] o = q.toArray();
523         Arrays.sort(o);
524         for (int i = 0; i < o.length; i++)
525             assertSame(o[i], q.take());
526     }
527 
528     /**
529      * toArray(a) contains all elements
530      */
testToArray2()531     public void testToArray2() throws InterruptedException {
532         PriorityBlockingQueue<Integer> q = populatedQueue(SIZE);
533         Integer[] ints = new Integer[SIZE];
534         Integer[] array = q.toArray(ints);
535         assertSame(ints, array);
536         Arrays.sort(ints);
537         for (int i = 0; i < ints.length; i++)
538             assertSame(ints[i], q.take());
539     }
540 
541     /**
542      * toArray(incompatible array type) throws ArrayStoreException
543      */
testToArray1_BadArg()544     public void testToArray1_BadArg() {
545         PriorityBlockingQueue q = populatedQueue(SIZE);
546         try {
547             q.toArray(new String[10]);
548             shouldThrow();
549         } catch (ArrayStoreException success) {}
550     }
551 
552     /**
553      * iterator iterates through all elements
554      */
testIterator()555     public void testIterator() {
556         PriorityBlockingQueue q = populatedQueue(SIZE);
557         int i = 0;
558         Iterator it = q.iterator();
559         while (it.hasNext()) {
560             assertTrue(q.contains(it.next()));
561             ++i;
562         }
563         assertEquals(i, SIZE);
564     }
565 
566     /**
567      * iterator.remove removes current element
568      */
testIteratorRemove()569     public void testIteratorRemove() {
570         final PriorityBlockingQueue q = new PriorityBlockingQueue(3);
571         q.add(new Integer(2));
572         q.add(new Integer(1));
573         q.add(new Integer(3));
574 
575         Iterator it = q.iterator();
576         it.next();
577         it.remove();
578 
579         it = q.iterator();
580         assertEquals(it.next(), new Integer(2));
581         assertEquals(it.next(), new Integer(3));
582         assertFalse(it.hasNext());
583     }
584 
585     /**
586      * toString contains toStrings of elements
587      */
testToString()588     public void testToString() {
589         PriorityBlockingQueue q = populatedQueue(SIZE);
590         String s = q.toString();
591         for (int i = 0; i < SIZE; ++i) {
592             assertTrue(s.contains(String.valueOf(i)));
593         }
594     }
595 
596     /**
597      * timed poll transfers elements across Executor tasks
598      */
testPollInExecutor()599     public void testPollInExecutor() {
600         final PriorityBlockingQueue q = new PriorityBlockingQueue(2);
601         final CheckedBarrier threadsStarted = new CheckedBarrier(2);
602         ExecutorService executor = Executors.newFixedThreadPool(2);
603         executor.execute(new CheckedRunnable() {
604             public void realRun() throws InterruptedException {
605                 assertNull(q.poll());
606                 threadsStarted.await();
607                 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
608                 checkEmpty(q);
609             }});
610 
611         executor.execute(new CheckedRunnable() {
612             public void realRun() throws InterruptedException {
613                 threadsStarted.await();
614                 q.put(one);
615             }});
616 
617         joinPool(executor);
618     }
619 
620     /**
621      * A deserialized serialized queue has same elements
622      */
testSerialization()623     public void testSerialization() throws Exception {
624         Queue x = populatedQueue(SIZE);
625         Queue y = serialClone(x);
626 
627         assertNotSame(x, y);
628         assertEquals(x.size(), y.size());
629         while (!x.isEmpty()) {
630             assertFalse(y.isEmpty());
631             assertEquals(x.remove(), y.remove());
632         }
633         assertTrue(y.isEmpty());
634     }
635 
636     /**
637      * drainTo(c) empties queue into another collection c
638      */
testDrainTo()639     public void testDrainTo() {
640         PriorityBlockingQueue q = populatedQueue(SIZE);
641         ArrayList l = new ArrayList();
642         q.drainTo(l);
643         assertEquals(0, q.size());
644         assertEquals(SIZE, l.size());
645         for (int i = 0; i < SIZE; ++i)
646             assertEquals(l.get(i), new Integer(i));
647         q.add(zero);
648         q.add(one);
649         assertFalse(q.isEmpty());
650         assertTrue(q.contains(zero));
651         assertTrue(q.contains(one));
652         l.clear();
653         q.drainTo(l);
654         assertEquals(0, q.size());
655         assertEquals(2, l.size());
656         for (int i = 0; i < 2; ++i)
657             assertEquals(l.get(i), new Integer(i));
658     }
659 
660     /**
661      * drainTo empties queue
662      */
testDrainToWithActivePut()663     public void testDrainToWithActivePut() throws InterruptedException {
664         final PriorityBlockingQueue q = populatedQueue(SIZE);
665         Thread t = new Thread(new CheckedRunnable() {
666             public void realRun() {
667                 q.put(new Integer(SIZE+1));
668             }});
669 
670         t.start();
671         ArrayList l = new ArrayList();
672         q.drainTo(l);
673         assertTrue(l.size() >= SIZE);
674         for (int i = 0; i < SIZE; ++i)
675             assertEquals(l.get(i), new Integer(i));
676         t.join();
677         assertTrue(q.size() + l.size() >= SIZE);
678     }
679 
680     /**
681      * drainTo(c, n) empties first min(n, size) elements of queue into c
682      */
testDrainToN()683     public void testDrainToN() {
684         PriorityBlockingQueue q = new PriorityBlockingQueue(SIZE*2);
685         for (int i = 0; i < SIZE + 2; ++i) {
686             for (int j = 0; j < SIZE; j++)
687                 assertTrue(q.offer(new Integer(j)));
688             ArrayList l = new ArrayList();
689             q.drainTo(l, i);
690             int k = (i < SIZE) ? i : SIZE;
691             assertEquals(k, l.size());
692             assertEquals(SIZE-k, q.size());
693             for (int j = 0; j < k; ++j)
694                 assertEquals(l.get(j), new Integer(j));
695             while (q.poll() != null) ;
696         }
697     }
698 
699 }
700