1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/publicdomain/zero/1.0/
5  * Other contributors include Andrew Wright, Jeffrey Hayes,
6  * Pat Fisher, Mike Judd.
7  */
8 
9 package jsr166;
10 
11 import static java.util.concurrent.TimeUnit.MILLISECONDS;
12 
13 import java.util.ArrayList;
14 import java.util.Arrays;
15 import java.util.Collection;
16 import java.util.Comparator;
17 import java.util.Iterator;
18 import java.util.NoSuchElementException;
19 import java.util.Queue;
20 import java.util.concurrent.BlockingQueue;
21 import java.util.concurrent.CountDownLatch;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.PriorityBlockingQueue;
25 
26 import junit.framework.Test;
27 
28 public class PriorityBlockingQueueTest extends JSR166TestCase {
29 
30     // android-note: These tests have been moved into their own separate
31     // classes to work around CTS issues.
32     //
33     // public static class Generic extends BlockingQueueTest {
34     //     protected BlockingQueue emptyCollection() {
35     //         return new PriorityBlockingQueue();
36     //     }
37     // }
38 
39     // public static class InitialCapacity extends BlockingQueueTest {
40     //     protected BlockingQueue emptyCollection() {
41     //         return new PriorityBlockingQueue(SIZE);
42     //     }
43     // }
44 
45     // android-note: Removed because the CTS runner does a bad job of
46     // retrying tests that have suite() declarations.
47     //
48     // public static void main(String[] args) {
49     //     main(suite(), args);
50     // }
51     // public static Test suite() {
52     //     return newTestSuite(PriorityBlockingQueueTest.class,
53     //                         new Generic().testSuite(),
54     //                         new InitialCapacity().testSuite());
55     // }
56 
57     /** Sample Comparator */
58     static class MyReverseComparator implements Comparator {
59         public int compare(Object x, Object y) {
60             return ((Comparable)y).compareTo(x);
61         }
62     }
63 
64     /**
65      * Returns a new queue of given size containing consecutive
66      * Integers 0 ... n.
67      */
68     private PriorityBlockingQueue<Integer> populatedQueue(int n) {
69         PriorityBlockingQueue<Integer> q =
70             new PriorityBlockingQueue<Integer>(n);
71         assertTrue(q.isEmpty());
72         for (int i = n - 1; i >= 0; i -= 2)
73             assertTrue(q.offer(new Integer(i)));
74         for (int i = (n & 1); i < n; i += 2)
75             assertTrue(q.offer(new Integer(i)));
76         assertFalse(q.isEmpty());
77         assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
78         assertEquals(n, q.size());
79         return q;
80     }
81 
82     /**
83      * A new queue has unbounded capacity
84      */
85     public void testConstructor1() {
86         assertEquals(Integer.MAX_VALUE,
87                      new PriorityBlockingQueue(SIZE).remainingCapacity());
88     }
89 
90     /**
91      * Constructor throws IAE if capacity argument nonpositive
92      */
93     public void testConstructor2() {
94         try {
95             new PriorityBlockingQueue(0);
96             shouldThrow();
97         } catch (IllegalArgumentException success) {}
98     }
99 
100     /**
101      * Initializing from null Collection throws NPE
102      */
103     public void testConstructor3() {
104         try {
105             new PriorityBlockingQueue(null);
106             shouldThrow();
107         } catch (NullPointerException success) {}
108     }
109 
110     /**
111      * Initializing from Collection of null elements throws NPE
112      */
113     public void testConstructor4() {
114         Collection<Integer> elements = Arrays.asList(new Integer[SIZE]);
115         try {
116             new PriorityBlockingQueue(elements);
117             shouldThrow();
118         } catch (NullPointerException success) {}
119     }
120 
121     /**
122      * Initializing from Collection with some null elements throws NPE
123      */
124     public void testConstructor5() {
125         Integer[] ints = new Integer[SIZE];
126         for (int i = 0; i < SIZE - 1; ++i)
127             ints[i] = i;
128         Collection<Integer> elements = Arrays.asList(ints);
129         try {
130             new PriorityBlockingQueue(elements);
131             shouldThrow();
132         } catch (NullPointerException success) {}
133     }
134 
135     /**
136      * Queue contains all elements of collection used to initialize
137      */
138     public void testConstructor6() {
139         Integer[] ints = new Integer[SIZE];
140         for (int i = 0; i < SIZE; ++i)
141             ints[i] = i;
142         PriorityBlockingQueue q = new PriorityBlockingQueue(Arrays.asList(ints));
143         for (int i = 0; i < SIZE; ++i)
144             assertEquals(ints[i], q.poll());
145     }
146 
147     /**
148      * The comparator used in constructor is used
149      */
150     public void testConstructor7() {
151         MyReverseComparator cmp = new MyReverseComparator();
152         PriorityBlockingQueue q = new PriorityBlockingQueue(SIZE, cmp);
153         assertEquals(cmp, q.comparator());
154         Integer[] ints = new Integer[SIZE];
155         for (int i = 0; i < SIZE; ++i)
156             ints[i] = new Integer(i);
157         q.addAll(Arrays.asList(ints));
158         for (int i = SIZE - 1; i >= 0; --i)
159             assertEquals(ints[i], q.poll());
160     }
161 
162     /**
163      * isEmpty is true before add, false after
164      */
165     public void testEmpty() {
166         PriorityBlockingQueue q = new PriorityBlockingQueue(2);
167         assertTrue(q.isEmpty());
168         assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
169         q.add(one);
170         assertFalse(q.isEmpty());
171         q.add(two);
172         q.remove();
173         q.remove();
174         assertTrue(q.isEmpty());
175     }
176 
177     /**
178      * remainingCapacity() always returns Integer.MAX_VALUE
179      */
180     public void testRemainingCapacity() {
181         BlockingQueue q = populatedQueue(SIZE);
182         for (int i = 0; i < SIZE; ++i) {
183             assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
184             assertEquals(SIZE - i, q.size());
185             assertEquals(i, q.remove());
186         }
187         for (int i = 0; i < SIZE; ++i) {
188             assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
189             assertEquals(i, q.size());
190             assertTrue(q.add(i));
191         }
192     }
193 
194     /**
195      * Offer of comparable element succeeds
196      */
197     public void testOffer() {
198         PriorityBlockingQueue q = new PriorityBlockingQueue(1);
199         assertTrue(q.offer(zero));
200         assertTrue(q.offer(one));
201     }
202 
203     /**
204      * Offer of non-Comparable throws CCE
205      */
206     public void testOfferNonComparable() {
207         PriorityBlockingQueue q = new PriorityBlockingQueue(1);
208         try {
209             q.offer(new Object());
210             q.offer(new Object());
211             shouldThrow();
212         } catch (ClassCastException success) {}
213     }
214 
215     /**
216      * add of comparable succeeds
217      */
218     public void testAdd() {
219         PriorityBlockingQueue q = new PriorityBlockingQueue(SIZE);
220         for (int i = 0; i < SIZE; ++i) {
221             assertEquals(i, q.size());
222             assertTrue(q.add(new Integer(i)));
223         }
224     }
225 
226     /**
227      * addAll(this) throws IAE
228      */
229     public void testAddAllSelf() {
230         PriorityBlockingQueue q = populatedQueue(SIZE);
231         try {
232             q.addAll(q);
233             shouldThrow();
234         } catch (IllegalArgumentException success) {}
235     }
236 
237     /**
238      * addAll of a collection with any null elements throws NPE after
239      * possibly adding some elements
240      */
241     public void testAddAll3() {
242         PriorityBlockingQueue q = new PriorityBlockingQueue(SIZE);
243         Integer[] ints = new Integer[SIZE];
244         for (int i = 0; i < SIZE - 1; ++i)
245             ints[i] = new Integer(i);
246         try {
247             q.addAll(Arrays.asList(ints));
248             shouldThrow();
249         } catch (NullPointerException success) {}
250     }
251 
252     /**
253      * Queue contains all elements of successful addAll
254      */
255     public void testAddAll5() {
256         Integer[] empty = new Integer[0];
257         Integer[] ints = new Integer[SIZE];
258         for (int i = SIZE - 1; i >= 0; --i)
259             ints[i] = new Integer(i);
260         PriorityBlockingQueue q = new PriorityBlockingQueue(SIZE);
261         assertFalse(q.addAll(Arrays.asList(empty)));
262         assertTrue(q.addAll(Arrays.asList(ints)));
263         for (int i = 0; i < SIZE; ++i)
264             assertEquals(ints[i], q.poll());
265     }
266 
267     /**
268      * all elements successfully put are contained
269      */
270     public void testPut() {
271         PriorityBlockingQueue q = new PriorityBlockingQueue(SIZE);
272         for (int i = 0; i < SIZE; ++i) {
273             Integer x = new Integer(i);
274             q.put(x);
275             assertTrue(q.contains(x));
276         }
277         assertEquals(SIZE, q.size());
278     }
279 
280     /**
281      * put doesn't block waiting for take
282      */
283     public void testPutWithTake() throws InterruptedException {
284         final PriorityBlockingQueue q = new PriorityBlockingQueue(2);
285         final int size = 4;
286         Thread t = newStartedThread(new CheckedRunnable() {
287             public void realRun() {
288                 for (int i = 0; i < size; i++)
289                     q.put(new Integer(0));
290             }});
291 
292         awaitTermination(t);
293         assertEquals(size, q.size());
294         q.take();
295     }
296 
297     /**
298      * timed offer does not time out
299      */
300     public void testTimedOffer() throws InterruptedException {
301         final PriorityBlockingQueue q = new PriorityBlockingQueue(2);
302         Thread t = newStartedThread(new CheckedRunnable() {
303             public void realRun() {
304                 q.put(new Integer(0));
305                 q.put(new Integer(0));
306                 assertTrue(q.offer(new Integer(0), SHORT_DELAY_MS, MILLISECONDS));
307                 assertTrue(q.offer(new Integer(0), LONG_DELAY_MS, MILLISECONDS));
308             }});
309 
310         awaitTermination(t);
311     }
312 
313     /**
314      * take retrieves elements in priority order
315      */
316     public void testTake() throws InterruptedException {
317         PriorityBlockingQueue q = populatedQueue(SIZE);
318         for (int i = 0; i < SIZE; ++i) {
319             assertEquals(i, q.take());
320         }
321     }
322 
323     /**
324      * Take removes existing elements until empty, then blocks interruptibly
325      */
326     public void testBlockingTake() throws InterruptedException {
327         final PriorityBlockingQueue q = populatedQueue(SIZE);
328         final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
329         Thread t = newStartedThread(new CheckedRunnable() {
330             public void realRun() throws InterruptedException {
331                 for (int i = 0; i < SIZE; ++i) {
332                     assertEquals(i, q.take());
333                 }
334 
335                 Thread.currentThread().interrupt();
336                 try {
337                     q.take();
338                     shouldThrow();
339                 } catch (InterruptedException success) {}
340                 assertFalse(Thread.interrupted());
341 
342                 pleaseInterrupt.countDown();
343                 try {
344                     q.take();
345                     shouldThrow();
346                 } catch (InterruptedException success) {}
347                 assertFalse(Thread.interrupted());
348             }});
349 
350         await(pleaseInterrupt);
351         assertThreadStaysAlive(t);
352         t.interrupt();
353         awaitTermination(t);
354     }
355 
356     /**
357      * poll succeeds unless empty
358      */
359     public void testPoll() {
360         PriorityBlockingQueue q = populatedQueue(SIZE);
361         for (int i = 0; i < SIZE; ++i) {
362             assertEquals(i, q.poll());
363         }
364         assertNull(q.poll());
365     }
366 
367     /**
368      * timed poll with zero timeout succeeds when non-empty, else times out
369      */
370     public void testTimedPoll0() throws InterruptedException {
371         PriorityBlockingQueue q = populatedQueue(SIZE);
372         for (int i = 0; i < SIZE; ++i) {
373             assertEquals(i, q.poll(0, MILLISECONDS));
374         }
375         assertNull(q.poll(0, MILLISECONDS));
376     }
377 
378     /**
379      * timed poll with nonzero timeout succeeds when non-empty, else times out
380      */
381     public void testTimedPoll() throws InterruptedException {
382         PriorityBlockingQueue<Integer> q = populatedQueue(SIZE);
383         for (int i = 0; i < SIZE; ++i) {
384             long startTime = System.nanoTime();
385             assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
386             assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
387         }
388         long startTime = System.nanoTime();
389         assertNull(q.poll(timeoutMillis(), MILLISECONDS));
390         assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
391         checkEmpty(q);
392     }
393 
394     /**
395      * Interrupted timed poll throws InterruptedException instead of
396      * returning timeout status
397      */
398     public void testInterruptedTimedPoll() throws InterruptedException {
399         final BlockingQueue<Integer> q = populatedQueue(SIZE);
400         final CountDownLatch aboutToWait = new CountDownLatch(1);
401         Thread t = newStartedThread(new CheckedRunnable() {
402             public void realRun() throws InterruptedException {
403                 long startTime = System.nanoTime();
404                 for (int i = 0; i < SIZE; ++i) {
405                     assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
406                 }
407                 aboutToWait.countDown();
408                 try {
409                     q.poll(LONG_DELAY_MS, MILLISECONDS);
410                     shouldThrow();
411                 } catch (InterruptedException success) {
412                     assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
413                 }
414             }});
415 
416         aboutToWait.await();
417         waitForThreadToEnterWaitState(t, LONG_DELAY_MS);
418         t.interrupt();
419         awaitTermination(t);
420     }
421 
422     /**
423      * peek returns next element, or null if empty
424      */
425     public void testPeek() {
426         PriorityBlockingQueue q = populatedQueue(SIZE);
427         for (int i = 0; i < SIZE; ++i) {
428             assertEquals(i, q.peek());
429             assertEquals(i, q.poll());
430             assertTrue(q.peek() == null ||
431                        !q.peek().equals(i));
432         }
433         assertNull(q.peek());
434     }
435 
436     /**
437      * element returns next element, or throws NSEE if empty
438      */
439     public void testElement() {
440         PriorityBlockingQueue q = populatedQueue(SIZE);
441         for (int i = 0; i < SIZE; ++i) {
442             assertEquals(i, q.element());
443             assertEquals(i, q.poll());
444         }
445         try {
446             q.element();
447             shouldThrow();
448         } catch (NoSuchElementException success) {}
449     }
450 
451     /**
452      * remove removes next element, or throws NSEE if empty
453      */
454     public void testRemove() {
455         PriorityBlockingQueue q = populatedQueue(SIZE);
456         for (int i = 0; i < SIZE; ++i) {
457             assertEquals(i, q.remove());
458         }
459         try {
460             q.remove();
461             shouldThrow();
462         } catch (NoSuchElementException success) {}
463     }
464 
465     /**
466      * contains(x) reports true when elements added but not yet removed
467      */
468     public void testContains() {
469         PriorityBlockingQueue q = populatedQueue(SIZE);
470         for (int i = 0; i < SIZE; ++i) {
471             assertTrue(q.contains(new Integer(i)));
472             q.poll();
473             assertFalse(q.contains(new Integer(i)));
474         }
475     }
476 
477     /**
478      * clear removes all elements
479      */
480     public void testClear() {
481         PriorityBlockingQueue q = populatedQueue(SIZE);
482         q.clear();
483         assertTrue(q.isEmpty());
484         assertEquals(0, q.size());
485         q.add(one);
486         assertFalse(q.isEmpty());
487         assertTrue(q.contains(one));
488         q.clear();
489         assertTrue(q.isEmpty());
490     }
491 
492     /**
493      * containsAll(c) is true when c contains a subset of elements
494      */
495     public void testContainsAll() {
496         PriorityBlockingQueue q = populatedQueue(SIZE);
497         PriorityBlockingQueue p = new PriorityBlockingQueue(SIZE);
498         for (int i = 0; i < SIZE; ++i) {
499             assertTrue(q.containsAll(p));
500             assertFalse(p.containsAll(q));
501             p.add(new Integer(i));
502         }
503         assertTrue(p.containsAll(q));
504     }
505 
506     /**
507      * retainAll(c) retains only those elements of c and reports true if changed
508      */
509     public void testRetainAll() {
510         PriorityBlockingQueue q = populatedQueue(SIZE);
511         PriorityBlockingQueue p = populatedQueue(SIZE);
512         for (int i = 0; i < SIZE; ++i) {
513             boolean changed = q.retainAll(p);
514             if (i == 0)
515                 assertFalse(changed);
516             else
517                 assertTrue(changed);
518 
519             assertTrue(q.containsAll(p));
520             assertEquals(SIZE - i, q.size());
521             p.remove();
522         }
523     }
524 
525     /**
526      * removeAll(c) removes only those elements of c and reports true if changed
527      */
528     public void testRemoveAll() {
529         for (int i = 1; i < SIZE; ++i) {
530             PriorityBlockingQueue q = populatedQueue(SIZE);
531             PriorityBlockingQueue p = populatedQueue(i);
532             assertTrue(q.removeAll(p));
533             assertEquals(SIZE - i, q.size());
534             for (int j = 0; j < i; ++j) {
535                 Integer x = (Integer)(p.remove());
536                 assertFalse(q.contains(x));
537             }
538         }
539     }
540 
541     /**
542      * toArray contains all elements
543      */
544     public void testToArray() throws InterruptedException {
545         PriorityBlockingQueue q = populatedQueue(SIZE);
546         Object[] o = q.toArray();
547         Arrays.sort(o);
548         for (int i = 0; i < o.length; i++)
549             assertSame(o[i], q.take());
550     }
551 
552     /**
553      * toArray(a) contains all elements
554      */
555     public void testToArray2() throws InterruptedException {
556         PriorityBlockingQueue<Integer> q = populatedQueue(SIZE);
557         Integer[] ints = new Integer[SIZE];
558         Integer[] array = q.toArray(ints);
559         assertSame(ints, array);
560         Arrays.sort(ints);
561         for (int i = 0; i < ints.length; i++)
562             assertSame(ints[i], q.take());
563     }
564 
565     /**
566      * toArray(incompatible array type) throws ArrayStoreException
567      */
568     public void testToArray1_BadArg() {
569         PriorityBlockingQueue q = populatedQueue(SIZE);
570         try {
571             q.toArray(new String[10]);
572             shouldThrow();
573         } catch (ArrayStoreException success) {}
574     }
575 
576     /**
577      * iterator iterates through all elements
578      */
579     public void testIterator() {
580         PriorityBlockingQueue q = populatedQueue(SIZE);
581         Iterator it = q.iterator();
582         int i;
583         for (i = 0; it.hasNext(); i++)
584             assertTrue(q.contains(it.next()));
585         assertEquals(i, SIZE);
586         assertIteratorExhausted(it);
587     }
588 
589     /**
590      * iterator of empty collection has no elements
591      */
592     public void testEmptyIterator() {
593         assertIteratorExhausted(new PriorityBlockingQueue().iterator());
594     }
595 
596     /**
597      * iterator.remove removes current element
598      */
599     public void testIteratorRemove() {
600         final PriorityBlockingQueue q = new PriorityBlockingQueue(3);
601         q.add(new Integer(2));
602         q.add(new Integer(1));
603         q.add(new Integer(3));
604 
605         Iterator it = q.iterator();
606         it.next();
607         it.remove();
608 
609         it = q.iterator();
610         assertEquals(it.next(), new Integer(2));
611         assertEquals(it.next(), new Integer(3));
612         assertFalse(it.hasNext());
613     }
614 
615     /**
616      * toString contains toStrings of elements
617      */
618     public void testToString() {
619         PriorityBlockingQueue q = populatedQueue(SIZE);
620         String s = q.toString();
621         for (int i = 0; i < SIZE; ++i) {
622             assertTrue(s.contains(String.valueOf(i)));
623         }
624     }
625 
626     /**
627      * timed poll transfers elements across Executor tasks
628      */
629     public void testPollInExecutor() {
630         final PriorityBlockingQueue q = new PriorityBlockingQueue(2);
631         final CheckedBarrier threadsStarted = new CheckedBarrier(2);
632         final ExecutorService executor = Executors.newFixedThreadPool(2);
633         try (PoolCleaner cleaner = cleaner(executor)) {
634             executor.execute(new CheckedRunnable() {
635                 public void realRun() throws InterruptedException {
636                     assertNull(q.poll());
637                     threadsStarted.await();
638                     assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
639                     checkEmpty(q);
640                 }});
641 
642             executor.execute(new CheckedRunnable() {
643                 public void realRun() throws InterruptedException {
644                     threadsStarted.await();
645                     q.put(one);
646                 }});
647         }
648     }
649 
650     /**
651      * A deserialized serialized queue has same elements
652      */
653     public void testSerialization() throws Exception {
654         Queue x = populatedQueue(SIZE);
655         Queue y = serialClone(x);
656 
657         assertNotSame(x, y);
658         assertEquals(x.size(), y.size());
659         while (!x.isEmpty()) {
660             assertFalse(y.isEmpty());
661             assertEquals(x.remove(), y.remove());
662         }
663         assertTrue(y.isEmpty());
664     }
665 
666     /**
667      * drainTo(c) empties queue into another collection c
668      */
669     public void testDrainTo() {
670         PriorityBlockingQueue q = populatedQueue(SIZE);
671         ArrayList l = new ArrayList();
672         q.drainTo(l);
673         assertEquals(0, q.size());
674         assertEquals(SIZE, l.size());
675         for (int i = 0; i < SIZE; ++i)
676             assertEquals(l.get(i), new Integer(i));
677         q.add(zero);
678         q.add(one);
679         assertFalse(q.isEmpty());
680         assertTrue(q.contains(zero));
681         assertTrue(q.contains(one));
682         l.clear();
683         q.drainTo(l);
684         assertEquals(0, q.size());
685         assertEquals(2, l.size());
686         for (int i = 0; i < 2; ++i)
687             assertEquals(l.get(i), new Integer(i));
688     }
689 
690     /**
691      * drainTo empties queue
692      */
693     public void testDrainToWithActivePut() throws InterruptedException {
694         final PriorityBlockingQueue q = populatedQueue(SIZE);
695         Thread t = new Thread(new CheckedRunnable() {
696             public void realRun() {
697                 q.put(new Integer(SIZE + 1));
698             }});
699 
700         t.start();
701         ArrayList l = new ArrayList();
702         q.drainTo(l);
703         assertTrue(l.size() >= SIZE);
704         for (int i = 0; i < SIZE; ++i)
705             assertEquals(l.get(i), new Integer(i));
706         t.join();
707         assertTrue(q.size() + l.size() >= SIZE);
708     }
709 
710     /**
711      * drainTo(c, n) empties first min(n, size) elements of queue into c
712      */
713     public void testDrainToN() {
714         PriorityBlockingQueue q = new PriorityBlockingQueue(SIZE * 2);
715         for (int i = 0; i < SIZE + 2; ++i) {
716             for (int j = 0; j < SIZE; j++)
717                 assertTrue(q.offer(new Integer(j)));
718             ArrayList l = new ArrayList();
719             q.drainTo(l, i);
720             int k = (i < SIZE) ? i : SIZE;
721             assertEquals(k, l.size());
722             assertEquals(SIZE - k, q.size());
723             for (int j = 0; j < k; ++j)
724                 assertEquals(l.get(j), new Integer(j));
725             do {} while (q.poll() != null);
726         }
727     }
728 
729     /**
730      * remove(null), contains(null) always return false
731      */
732     public void testNeverContainsNull() {
733         Collection<?>[] qs = {
734             new PriorityBlockingQueue<Object>(),
735             populatedQueue(2),
736         };
737 
738         for (Collection<?> q : qs) {
739             assertFalse(q.contains(null));
740             assertFalse(q.remove(null));
741         }
742     }
743 
744 }
745