1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import static java.util.concurrent.TimeUnit.NANOSECONDS;
39 
40 import java.util.AbstractQueue;
41 import java.util.Collection;
42 import java.util.Iterator;
43 import java.util.NoSuchElementException;
44 import java.util.PriorityQueue;
45 import java.util.concurrent.locks.Condition;
46 import java.util.concurrent.locks.ReentrantLock;
47 
48 // BEGIN android-note
49 // removed link to collections framework docs
50 // END android-note
51 
52 /**
53  * An unbounded {@linkplain BlockingQueue blocking queue} of
54  * {@code Delayed} elements, in which an element can only be taken
55  * when its delay has expired.  The <em>head</em> of the queue is that
56  * {@code Delayed} element whose delay expired furthest in the
57  * past.  If no delay has expired there is no head and {@code poll}
58  * will return {@code null}. Expiration occurs when an element's
59  * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less
60  * than or equal to zero.  Even though unexpired elements cannot be
61  * removed using {@code take} or {@code poll}, they are otherwise
62  * treated as normal elements. For example, the {@code size} method
63  * returns the count of both expired and unexpired elements.
64  * This queue does not permit null elements.
65  *
66  * <p>This class and its iterator implement all of the
67  * <em>optional</em> methods of the {@link Collection} and {@link
68  * Iterator} interfaces.  The Iterator provided in method {@link
69  * #iterator()} is <em>not</em> guaranteed to traverse the elements of
70  * the DelayQueue in any particular order.
71  *
72  * @since 1.5
73  * @author Doug Lea
74  * @param <E> the type of elements held in this queue
75  */
76 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
77     implements BlockingQueue<E> {
78 
79     private final transient ReentrantLock lock = new ReentrantLock();
80     private final PriorityQueue<E> q = new PriorityQueue<E>();
81 
82     /**
83      * Thread designated to wait for the element at the head of
84      * the queue.  This variant of the Leader-Follower pattern
85      * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
86      * minimize unnecessary timed waiting.  When a thread becomes
87      * the leader, it waits only for the next delay to elapse, but
88      * other threads await indefinitely.  The leader thread must
89      * signal some other thread before returning from take() or
90      * poll(...), unless some other thread becomes leader in the
91      * interim.  Whenever the head of the queue is replaced with
92      * an element with an earlier expiration time, the leader
93      * field is invalidated by being reset to null, and some
94      * waiting thread, but not necessarily the current leader, is
95      * signalled.  So waiting threads must be prepared to acquire
96      * and lose leadership while waiting.
97      */
98     private Thread leader;
99 
100     /**
101      * Condition signalled when a newer element becomes available
102      * at the head of the queue or a new thread may need to
103      * become leader.
104      */
105     private final Condition available = lock.newCondition();
106 
107     /**
108      * Creates a new {@code DelayQueue} that is initially empty.
109      */
DelayQueue()110     public DelayQueue() {}
111 
112     /**
113      * Creates a {@code DelayQueue} initially containing the elements of the
114      * given collection of {@link Delayed} instances.
115      *
116      * @param c the collection of elements to initially contain
117      * @throws NullPointerException if the specified collection or any
118      *         of its elements are null
119      */
DelayQueue(Collection<? extends E> c)120     public DelayQueue(Collection<? extends E> c) {
121         this.addAll(c);
122     }
123 
124     /**
125      * Inserts the specified element into this delay queue.
126      *
127      * @param e the element to add
128      * @return {@code true} (as specified by {@link Collection#add})
129      * @throws NullPointerException if the specified element is null
130      */
add(E e)131     public boolean add(E e) {
132         return offer(e);
133     }
134 
135     /**
136      * Inserts the specified element into this delay queue.
137      *
138      * @param e the element to add
139      * @return {@code true}
140      * @throws NullPointerException if the specified element is null
141      */
offer(E e)142     public boolean offer(E e) {
143         final ReentrantLock lock = this.lock;
144         lock.lock();
145         try {
146             q.offer(e);
147             if (q.peek() == e) {
148                 leader = null;
149                 available.signal();
150             }
151             return true;
152         } finally {
153             lock.unlock();
154         }
155     }
156 
157     /**
158      * Inserts the specified element into this delay queue. As the queue is
159      * unbounded this method will never block.
160      *
161      * @param e the element to add
162      * @throws NullPointerException {@inheritDoc}
163      */
put(E e)164     public void put(E e) {
165         offer(e);
166     }
167 
168     /**
169      * Inserts the specified element into this delay queue. As the queue is
170      * unbounded this method will never block.
171      *
172      * @param e the element to add
173      * @param timeout This parameter is ignored as the method never blocks
174      * @param unit This parameter is ignored as the method never blocks
175      * @return {@code true}
176      * @throws NullPointerException {@inheritDoc}
177      */
offer(E e, long timeout, TimeUnit unit)178     public boolean offer(E e, long timeout, TimeUnit unit) {
179         return offer(e);
180     }
181 
182     /**
183      * Retrieves and removes the head of this queue, or returns {@code null}
184      * if this queue has no elements with an expired delay.
185      *
186      * @return the head of this queue, or {@code null} if this
187      *         queue has no elements with an expired delay
188      */
poll()189     public E poll() {
190         final ReentrantLock lock = this.lock;
191         lock.lock();
192         try {
193             E first = q.peek();
194             return (first == null || first.getDelay(NANOSECONDS) > 0)
195                 ? null
196                 : q.poll();
197         } finally {
198             lock.unlock();
199         }
200     }
201 
202     /**
203      * Retrieves and removes the head of this queue, waiting if necessary
204      * until an element with an expired delay is available on this queue.
205      *
206      * @return the head of this queue
207      * @throws InterruptedException {@inheritDoc}
208      */
take()209     public E take() throws InterruptedException {
210         final ReentrantLock lock = this.lock;
211         lock.lockInterruptibly();
212         try {
213             for (;;) {
214                 E first = q.peek();
215                 if (first == null)
216                     available.await();
217                 else {
218                     long delay = first.getDelay(NANOSECONDS);
219                     if (delay <= 0L)
220                         return q.poll();
221                     first = null; // don't retain ref while waiting
222                     if (leader != null)
223                         available.await();
224                     else {
225                         Thread thisThread = Thread.currentThread();
226                         leader = thisThread;
227                         try {
228                             available.awaitNanos(delay);
229                         } finally {
230                             if (leader == thisThread)
231                                 leader = null;
232                         }
233                     }
234                 }
235             }
236         } finally {
237             if (leader == null && q.peek() != null)
238                 available.signal();
239             lock.unlock();
240         }
241     }
242 
243     /**
244      * Retrieves and removes the head of this queue, waiting if necessary
245      * until an element with an expired delay is available on this queue,
246      * or the specified wait time expires.
247      *
248      * @return the head of this queue, or {@code null} if the
249      *         specified waiting time elapses before an element with
250      *         an expired delay becomes available
251      * @throws InterruptedException {@inheritDoc}
252      */
poll(long timeout, TimeUnit unit)253     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
254         long nanos = unit.toNanos(timeout);
255         final ReentrantLock lock = this.lock;
256         lock.lockInterruptibly();
257         try {
258             for (;;) {
259                 E first = q.peek();
260                 if (first == null) {
261                     if (nanos <= 0L)
262                         return null;
263                     else
264                         nanos = available.awaitNanos(nanos);
265                 } else {
266                     long delay = first.getDelay(NANOSECONDS);
267                     if (delay <= 0L)
268                         return q.poll();
269                     if (nanos <= 0L)
270                         return null;
271                     first = null; // don't retain ref while waiting
272                     if (nanos < delay || leader != null)
273                         nanos = available.awaitNanos(nanos);
274                     else {
275                         Thread thisThread = Thread.currentThread();
276                         leader = thisThread;
277                         try {
278                             long timeLeft = available.awaitNanos(delay);
279                             nanos -= delay - timeLeft;
280                         } finally {
281                             if (leader == thisThread)
282                                 leader = null;
283                         }
284                     }
285                 }
286             }
287         } finally {
288             if (leader == null && q.peek() != null)
289                 available.signal();
290             lock.unlock();
291         }
292     }
293 
294     /**
295      * Retrieves, but does not remove, the head of this queue, or
296      * returns {@code null} if this queue is empty.  Unlike
297      * {@code poll}, if no expired elements are available in the queue,
298      * this method returns the element that will expire next,
299      * if one exists.
300      *
301      * @return the head of this queue, or {@code null} if this
302      *         queue is empty
303      */
peek()304     public E peek() {
305         final ReentrantLock lock = this.lock;
306         lock.lock();
307         try {
308             return q.peek();
309         } finally {
310             lock.unlock();
311         }
312     }
313 
size()314     public int size() {
315         final ReentrantLock lock = this.lock;
316         lock.lock();
317         try {
318             return q.size();
319         } finally {
320             lock.unlock();
321         }
322     }
323 
324     /**
325      * Returns first element only if it is expired.
326      * Used only by drainTo.  Call only when holding lock.
327      */
peekExpired()328     private E peekExpired() {
329         // assert lock.isHeldByCurrentThread();
330         E first = q.peek();
331         return (first == null || first.getDelay(NANOSECONDS) > 0) ?
332             null : first;
333     }
334 
335     /**
336      * @throws UnsupportedOperationException {@inheritDoc}
337      * @throws ClassCastException            {@inheritDoc}
338      * @throws NullPointerException          {@inheritDoc}
339      * @throws IllegalArgumentException      {@inheritDoc}
340      */
drainTo(Collection<? super E> c)341     public int drainTo(Collection<? super E> c) {
342         if (c == null)
343             throw new NullPointerException();
344         if (c == this)
345             throw new IllegalArgumentException();
346         final ReentrantLock lock = this.lock;
347         lock.lock();
348         try {
349             int n = 0;
350             for (E e; (e = peekExpired()) != null;) {
351                 c.add(e);       // In this order, in case add() throws.
352                 q.poll();
353                 ++n;
354             }
355             return n;
356         } finally {
357             lock.unlock();
358         }
359     }
360 
361     /**
362      * @throws UnsupportedOperationException {@inheritDoc}
363      * @throws ClassCastException            {@inheritDoc}
364      * @throws NullPointerException          {@inheritDoc}
365      * @throws IllegalArgumentException      {@inheritDoc}
366      */
drainTo(Collection<? super E> c, int maxElements)367     public int drainTo(Collection<? super E> c, int maxElements) {
368         if (c == null)
369             throw new NullPointerException();
370         if (c == this)
371             throw new IllegalArgumentException();
372         if (maxElements <= 0)
373             return 0;
374         final ReentrantLock lock = this.lock;
375         lock.lock();
376         try {
377             int n = 0;
378             for (E e; n < maxElements && (e = peekExpired()) != null;) {
379                 c.add(e);       // In this order, in case add() throws.
380                 q.poll();
381                 ++n;
382             }
383             return n;
384         } finally {
385             lock.unlock();
386         }
387     }
388 
389     /**
390      * Atomically removes all of the elements from this delay queue.
391      * The queue will be empty after this call returns.
392      * Elements with an unexpired delay are not waited for; they are
393      * simply discarded from the queue.
394      */
clear()395     public void clear() {
396         final ReentrantLock lock = this.lock;
397         lock.lock();
398         try {
399             q.clear();
400         } finally {
401             lock.unlock();
402         }
403     }
404 
405     /**
406      * Always returns {@code Integer.MAX_VALUE} because
407      * a {@code DelayQueue} is not capacity constrained.
408      *
409      * @return {@code Integer.MAX_VALUE}
410      */
remainingCapacity()411     public int remainingCapacity() {
412         return Integer.MAX_VALUE;
413     }
414 
415     /**
416      * Returns an array containing all of the elements in this queue.
417      * The returned array elements are in no particular order.
418      *
419      * <p>The returned array will be "safe" in that no references to it are
420      * maintained by this queue.  (In other words, this method must allocate
421      * a new array).  The caller is thus free to modify the returned array.
422      *
423      * <p>This method acts as bridge between array-based and collection-based
424      * APIs.
425      *
426      * @return an array containing all of the elements in this queue
427      */
toArray()428     public Object[] toArray() {
429         final ReentrantLock lock = this.lock;
430         lock.lock();
431         try {
432             return q.toArray();
433         } finally {
434             lock.unlock();
435         }
436     }
437 
438     /**
439      * Returns an array containing all of the elements in this queue; the
440      * runtime type of the returned array is that of the specified array.
441      * The returned array elements are in no particular order.
442      * If the queue fits in the specified array, it is returned therein.
443      * Otherwise, a new array is allocated with the runtime type of the
444      * specified array and the size of this queue.
445      *
446      * <p>If this queue fits in the specified array with room to spare
447      * (i.e., the array has more elements than this queue), the element in
448      * the array immediately following the end of the queue is set to
449      * {@code null}.
450      *
451      * <p>Like the {@link #toArray()} method, this method acts as bridge between
452      * array-based and collection-based APIs.  Further, this method allows
453      * precise control over the runtime type of the output array, and may,
454      * under certain circumstances, be used to save allocation costs.
455      *
456      * <p>The following code can be used to dump a delay queue into a newly
457      * allocated array of {@code Delayed}:
458      *
459      * <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre>
460      *
461      * Note that {@code toArray(new Object[0])} is identical in function to
462      * {@code toArray()}.
463      *
464      * @param a the array into which the elements of the queue are to
465      *          be stored, if it is big enough; otherwise, a new array of the
466      *          same runtime type is allocated for this purpose
467      * @return an array containing all of the elements in this queue
468      * @throws ArrayStoreException if the runtime type of the specified array
469      *         is not a supertype of the runtime type of every element in
470      *         this queue
471      * @throws NullPointerException if the specified array is null
472      */
toArray(T[] a)473     public <T> T[] toArray(T[] a) {
474         final ReentrantLock lock = this.lock;
475         lock.lock();
476         try {
477             return q.toArray(a);
478         } finally {
479             lock.unlock();
480         }
481     }
482 
483     /**
484      * Removes a single instance of the specified element from this
485      * queue, if it is present, whether or not it has expired.
486      */
remove(Object o)487     public boolean remove(Object o) {
488         final ReentrantLock lock = this.lock;
489         lock.lock();
490         try {
491             return q.remove(o);
492         } finally {
493             lock.unlock();
494         }
495     }
496 
497     /**
498      * Identity-based version for use in Itr.remove.
499      */
removeEQ(Object o)500     void removeEQ(Object o) {
501         final ReentrantLock lock = this.lock;
502         lock.lock();
503         try {
504             for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
505                 if (o == it.next()) {
506                     it.remove();
507                     break;
508                 }
509             }
510         } finally {
511             lock.unlock();
512         }
513     }
514 
515     /**
516      * Returns an iterator over all the elements (both expired and
517      * unexpired) in this queue. The iterator does not return the
518      * elements in any particular order.
519      *
520      * <p>The returned iterator is
521      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
522      *
523      * @return an iterator over the elements in this queue
524      */
iterator()525     public Iterator<E> iterator() {
526         return new Itr(toArray());
527     }
528 
529     /**
530      * Snapshot iterator that works off copy of underlying q array.
531      */
532     private class Itr implements Iterator<E> {
533         final Object[] array; // Array of all elements
534         int cursor;           // index of next element to return
535         int lastRet;          // index of last element, or -1 if no such
536 
Itr(Object[] array)537         Itr(Object[] array) {
538             lastRet = -1;
539             this.array = array;
540         }
541 
hasNext()542         public boolean hasNext() {
543             return cursor < array.length;
544         }
545 
546         @SuppressWarnings("unchecked")
next()547         public E next() {
548             if (cursor >= array.length)
549                 throw new NoSuchElementException();
550             lastRet = cursor;
551             return (E)array[cursor++];
552         }
553 
remove()554         public void remove() {
555             if (lastRet < 0)
556                 throw new IllegalStateException();
557             removeEQ(array[lastRet]);
558             lastRet = -1;
559         }
560     }
561 
562 }
563