1 /*
2  * Written by Doug Lea with assistance from members of JCP JSR-166
3  * Expert Group and released to the public domain, as explained at
4  * http://creativecommons.org/publicdomain/zero/1.0/
5  */
6 
7 package java.util.concurrent;
8 
9 import static java.util.concurrent.TimeUnit.NANOSECONDS;
10 
11 import java.util.AbstractQueue;
12 import java.util.Collection;
13 import java.util.Iterator;
14 import java.util.NoSuchElementException;
15 import java.util.PriorityQueue;
16 import java.util.concurrent.locks.Condition;
17 import java.util.concurrent.locks.ReentrantLock;
18 
19 // BEGIN android-note
20 // removed link to collections framework docs
21 // END android-note
22 
23 /**
24  * An unbounded {@linkplain BlockingQueue blocking queue} of
25  * {@code Delayed} elements, in which an element can only be taken
26  * when its delay has expired.  The <em>head</em> of the queue is that
27  * {@code Delayed} element whose delay expired furthest in the
28  * past.  If no delay has expired there is no head and {@code poll}
29  * will return {@code null}. Expiration occurs when an element's
30  * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less
31  * than or equal to zero.  Even though unexpired elements cannot be
32  * removed using {@code take} or {@code poll}, they are otherwise
33  * treated as normal elements. For example, the {@code size} method
34  * returns the count of both expired and unexpired elements.
35  * This queue does not permit null elements.
36  *
37  * <p>This class and its iterator implement all of the
38  * <em>optional</em> methods of the {@link Collection} and {@link
39  * Iterator} interfaces.  The Iterator provided in method {@link
40  * #iterator()} is <em>not</em> guaranteed to traverse the elements of
41  * the DelayQueue in any particular order.
42  *
43  * @since 1.5
44  * @author Doug Lea
45  * @param <E> the type of elements held in this queue
46  */
47 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
48     implements BlockingQueue<E> {
49 
50     private final transient ReentrantLock lock = new ReentrantLock();
51     private final PriorityQueue<E> q = new PriorityQueue<E>();
52 
53     /**
54      * Thread designated to wait for the element at the head of
55      * the queue.  This variant of the Leader-Follower pattern
56      * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
57      * minimize unnecessary timed waiting.  When a thread becomes
58      * the leader, it waits only for the next delay to elapse, but
59      * other threads await indefinitely.  The leader thread must
60      * signal some other thread before returning from take() or
61      * poll(...), unless some other thread becomes leader in the
62      * interim.  Whenever the head of the queue is replaced with
63      * an element with an earlier expiration time, the leader
64      * field is invalidated by being reset to null, and some
65      * waiting thread, but not necessarily the current leader, is
66      * signalled.  So waiting threads must be prepared to acquire
67      * and lose leadership while waiting.
68      */
69     private Thread leader;
70 
71     /**
72      * Condition signalled when a newer element becomes available
73      * at the head of the queue or a new thread may need to
74      * become leader.
75      */
76     private final Condition available = lock.newCondition();
77 
78     /**
79      * Creates a new {@code DelayQueue} that is initially empty.
80      */
DelayQueue()81     public DelayQueue() {}
82 
83     /**
84      * Creates a {@code DelayQueue} initially containing the elements of the
85      * given collection of {@link Delayed} instances.
86      *
87      * @param c the collection of elements to initially contain
88      * @throws NullPointerException if the specified collection or any
89      *         of its elements are null
90      */
DelayQueue(Collection<? extends E> c)91     public DelayQueue(Collection<? extends E> c) {
92         this.addAll(c);
93     }
94 
95     /**
96      * Inserts the specified element into this delay queue.
97      *
98      * @param e the element to add
99      * @return {@code true} (as specified by {@link Collection#add})
100      * @throws NullPointerException if the specified element is null
101      */
add(E e)102     public boolean add(E e) {
103         return offer(e);
104     }
105 
106     /**
107      * Inserts the specified element into this delay queue.
108      *
109      * @param e the element to add
110      * @return {@code true}
111      * @throws NullPointerException if the specified element is null
112      */
offer(E e)113     public boolean offer(E e) {
114         final ReentrantLock lock = this.lock;
115         lock.lock();
116         try {
117             q.offer(e);
118             if (q.peek() == e) {
119                 leader = null;
120                 available.signal();
121             }
122             return true;
123         } finally {
124             lock.unlock();
125         }
126     }
127 
128     /**
129      * Inserts the specified element into this delay queue. As the queue is
130      * unbounded this method will never block.
131      *
132      * @param e the element to add
133      * @throws NullPointerException {@inheritDoc}
134      */
put(E e)135     public void put(E e) {
136         offer(e);
137     }
138 
139     /**
140      * Inserts the specified element into this delay queue. As the queue is
141      * unbounded this method will never block.
142      *
143      * @param e the element to add
144      * @param timeout This parameter is ignored as the method never blocks
145      * @param unit This parameter is ignored as the method never blocks
146      * @return {@code true}
147      * @throws NullPointerException {@inheritDoc}
148      */
offer(E e, long timeout, TimeUnit unit)149     public boolean offer(E e, long timeout, TimeUnit unit) {
150         return offer(e);
151     }
152 
153     /**
154      * Retrieves and removes the head of this queue, or returns {@code null}
155      * if this queue has no elements with an expired delay.
156      *
157      * @return the head of this queue, or {@code null} if this
158      *         queue has no elements with an expired delay
159      */
poll()160     public E poll() {
161         final ReentrantLock lock = this.lock;
162         lock.lock();
163         try {
164             E first = q.peek();
165             return (first == null || first.getDelay(NANOSECONDS) > 0)
166                 ? null
167                 : q.poll();
168         } finally {
169             lock.unlock();
170         }
171     }
172 
173     /**
174      * Retrieves and removes the head of this queue, waiting if necessary
175      * until an element with an expired delay is available on this queue.
176      *
177      * @return the head of this queue
178      * @throws InterruptedException {@inheritDoc}
179      */
take()180     public E take() throws InterruptedException {
181         final ReentrantLock lock = this.lock;
182         lock.lockInterruptibly();
183         try {
184             for (;;) {
185                 E first = q.peek();
186                 if (first == null)
187                     available.await();
188                 else {
189                     long delay = first.getDelay(NANOSECONDS);
190                     if (delay <= 0L)
191                         return q.poll();
192                     first = null; // don't retain ref while waiting
193                     if (leader != null)
194                         available.await();
195                     else {
196                         Thread thisThread = Thread.currentThread();
197                         leader = thisThread;
198                         try {
199                             available.awaitNanos(delay);
200                         } finally {
201                             if (leader == thisThread)
202                                 leader = null;
203                         }
204                     }
205                 }
206             }
207         } finally {
208             if (leader == null && q.peek() != null)
209                 available.signal();
210             lock.unlock();
211         }
212     }
213 
214     /**
215      * Retrieves and removes the head of this queue, waiting if necessary
216      * until an element with an expired delay is available on this queue,
217      * or the specified wait time expires.
218      *
219      * @return the head of this queue, or {@code null} if the
220      *         specified waiting time elapses before an element with
221      *         an expired delay becomes available
222      * @throws InterruptedException {@inheritDoc}
223      */
poll(long timeout, TimeUnit unit)224     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
225         long nanos = unit.toNanos(timeout);
226         final ReentrantLock lock = this.lock;
227         lock.lockInterruptibly();
228         try {
229             for (;;) {
230                 E first = q.peek();
231                 if (first == null) {
232                     if (nanos <= 0L)
233                         return null;
234                     else
235                         nanos = available.awaitNanos(nanos);
236                 } else {
237                     long delay = first.getDelay(NANOSECONDS);
238                     if (delay <= 0L)
239                         return q.poll();
240                     if (nanos <= 0L)
241                         return null;
242                     first = null; // don't retain ref while waiting
243                     if (nanos < delay || leader != null)
244                         nanos = available.awaitNanos(nanos);
245                     else {
246                         Thread thisThread = Thread.currentThread();
247                         leader = thisThread;
248                         try {
249                             long timeLeft = available.awaitNanos(delay);
250                             nanos -= delay - timeLeft;
251                         } finally {
252                             if (leader == thisThread)
253                                 leader = null;
254                         }
255                     }
256                 }
257             }
258         } finally {
259             if (leader == null && q.peek() != null)
260                 available.signal();
261             lock.unlock();
262         }
263     }
264 
265     /**
266      * Retrieves, but does not remove, the head of this queue, or
267      * returns {@code null} if this queue is empty.  Unlike
268      * {@code poll}, if no expired elements are available in the queue,
269      * this method returns the element that will expire next,
270      * if one exists.
271      *
272      * @return the head of this queue, or {@code null} if this
273      *         queue is empty
274      */
peek()275     public E peek() {
276         final ReentrantLock lock = this.lock;
277         lock.lock();
278         try {
279             return q.peek();
280         } finally {
281             lock.unlock();
282         }
283     }
284 
size()285     public int size() {
286         final ReentrantLock lock = this.lock;
287         lock.lock();
288         try {
289             return q.size();
290         } finally {
291             lock.unlock();
292         }
293     }
294 
295     /**
296      * Returns first element only if it is expired.
297      * Used only by drainTo.  Call only when holding lock.
298      */
peekExpired()299     private E peekExpired() {
300         // assert lock.isHeldByCurrentThread();
301         E first = q.peek();
302         return (first == null || first.getDelay(NANOSECONDS) > 0) ?
303             null : first;
304     }
305 
306     /**
307      * @throws UnsupportedOperationException {@inheritDoc}
308      * @throws ClassCastException            {@inheritDoc}
309      * @throws NullPointerException          {@inheritDoc}
310      * @throws IllegalArgumentException      {@inheritDoc}
311      */
drainTo(Collection<? super E> c)312     public int drainTo(Collection<? super E> c) {
313         if (c == null)
314             throw new NullPointerException();
315         if (c == this)
316             throw new IllegalArgumentException();
317         final ReentrantLock lock = this.lock;
318         lock.lock();
319         try {
320             int n = 0;
321             for (E e; (e = peekExpired()) != null;) {
322                 c.add(e);       // In this order, in case add() throws.
323                 q.poll();
324                 ++n;
325             }
326             return n;
327         } finally {
328             lock.unlock();
329         }
330     }
331 
332     /**
333      * @throws UnsupportedOperationException {@inheritDoc}
334      * @throws ClassCastException            {@inheritDoc}
335      * @throws NullPointerException          {@inheritDoc}
336      * @throws IllegalArgumentException      {@inheritDoc}
337      */
drainTo(Collection<? super E> c, int maxElements)338     public int drainTo(Collection<? super E> c, int maxElements) {
339         if (c == null)
340             throw new NullPointerException();
341         if (c == this)
342             throw new IllegalArgumentException();
343         if (maxElements <= 0)
344             return 0;
345         final ReentrantLock lock = this.lock;
346         lock.lock();
347         try {
348             int n = 0;
349             for (E e; n < maxElements && (e = peekExpired()) != null;) {
350                 c.add(e);       // In this order, in case add() throws.
351                 q.poll();
352                 ++n;
353             }
354             return n;
355         } finally {
356             lock.unlock();
357         }
358     }
359 
360     /**
361      * Atomically removes all of the elements from this delay queue.
362      * The queue will be empty after this call returns.
363      * Elements with an unexpired delay are not waited for; they are
364      * simply discarded from the queue.
365      */
clear()366     public void clear() {
367         final ReentrantLock lock = this.lock;
368         lock.lock();
369         try {
370             q.clear();
371         } finally {
372             lock.unlock();
373         }
374     }
375 
376     /**
377      * Always returns {@code Integer.MAX_VALUE} because
378      * a {@code DelayQueue} is not capacity constrained.
379      *
380      * @return {@code Integer.MAX_VALUE}
381      */
remainingCapacity()382     public int remainingCapacity() {
383         return Integer.MAX_VALUE;
384     }
385 
386     /**
387      * Returns an array containing all of the elements in this queue.
388      * The returned array elements are in no particular order.
389      *
390      * <p>The returned array will be "safe" in that no references to it are
391      * maintained by this queue.  (In other words, this method must allocate
392      * a new array).  The caller is thus free to modify the returned array.
393      *
394      * <p>This method acts as bridge between array-based and collection-based
395      * APIs.
396      *
397      * @return an array containing all of the elements in this queue
398      */
toArray()399     public Object[] toArray() {
400         final ReentrantLock lock = this.lock;
401         lock.lock();
402         try {
403             return q.toArray();
404         } finally {
405             lock.unlock();
406         }
407     }
408 
409     /**
410      * Returns an array containing all of the elements in this queue; the
411      * runtime type of the returned array is that of the specified array.
412      * The returned array elements are in no particular order.
413      * If the queue fits in the specified array, it is returned therein.
414      * Otherwise, a new array is allocated with the runtime type of the
415      * specified array and the size of this queue.
416      *
417      * <p>If this queue fits in the specified array with room to spare
418      * (i.e., the array has more elements than this queue), the element in
419      * the array immediately following the end of the queue is set to
420      * {@code null}.
421      *
422      * <p>Like the {@link #toArray()} method, this method acts as bridge between
423      * array-based and collection-based APIs.  Further, this method allows
424      * precise control over the runtime type of the output array, and may,
425      * under certain circumstances, be used to save allocation costs.
426      *
427      * <p>The following code can be used to dump a delay queue into a newly
428      * allocated array of {@code Delayed}:
429      *
430      * <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre>
431      *
432      * Note that {@code toArray(new Object[0])} is identical in function to
433      * {@code toArray()}.
434      *
435      * @param a the array into which the elements of the queue are to
436      *          be stored, if it is big enough; otherwise, a new array of the
437      *          same runtime type is allocated for this purpose
438      * @return an array containing all of the elements in this queue
439      * @throws ArrayStoreException if the runtime type of the specified array
440      *         is not a supertype of the runtime type of every element in
441      *         this queue
442      * @throws NullPointerException if the specified array is null
443      */
toArray(T[] a)444     public <T> T[] toArray(T[] a) {
445         final ReentrantLock lock = this.lock;
446         lock.lock();
447         try {
448             return q.toArray(a);
449         } finally {
450             lock.unlock();
451         }
452     }
453 
454     /**
455      * Removes a single instance of the specified element from this
456      * queue, if it is present, whether or not it has expired.
457      */
remove(Object o)458     public boolean remove(Object o) {
459         final ReentrantLock lock = this.lock;
460         lock.lock();
461         try {
462             return q.remove(o);
463         } finally {
464             lock.unlock();
465         }
466     }
467 
468     /**
469      * Identity-based version for use in Itr.remove.
470      */
removeEQ(Object o)471     void removeEQ(Object o) {
472         final ReentrantLock lock = this.lock;
473         lock.lock();
474         try {
475             for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
476                 if (o == it.next()) {
477                     it.remove();
478                     break;
479                 }
480             }
481         } finally {
482             lock.unlock();
483         }
484     }
485 
486     /**
487      * Returns an iterator over all the elements (both expired and
488      * unexpired) in this queue. The iterator does not return the
489      * elements in any particular order.
490      *
491      * <p>The returned iterator is
492      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
493      *
494      * @return an iterator over the elements in this queue
495      */
iterator()496     public Iterator<E> iterator() {
497         return new Itr(toArray());
498     }
499 
500     /**
501      * Snapshot iterator that works off copy of underlying q array.
502      */
503     private class Itr implements Iterator<E> {
504         final Object[] array; // Array of all elements
505         int cursor;           // index of next element to return
506         int lastRet;          // index of last element, or -1 if no such
507 
Itr(Object[] array)508         Itr(Object[] array) {
509             lastRet = -1;
510             this.array = array;
511         }
512 
hasNext()513         public boolean hasNext() {
514             return cursor < array.length;
515         }
516 
517         @SuppressWarnings("unchecked")
next()518         public E next() {
519             if (cursor >= array.length)
520                 throw new NoSuchElementException();
521             lastRet = cursor;
522             return (E)array[cursor++];
523         }
524 
remove()525         public void remove() {
526             if (lastRet < 0)
527                 throw new IllegalStateException();
528             removeEQ(array[lastRet]);
529             lastRet = -1;
530         }
531     }
532 
533 }
534