1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import static java.util.concurrent.TimeUnit.NANOSECONDS; 39 40 import java.util.AbstractQueue; 41 import java.util.Collection; 42 import java.util.Iterator; 43 import java.util.NoSuchElementException; 44 import java.util.Objects; 45 import java.util.PriorityQueue; 46 import java.util.concurrent.locks.Condition; 47 import java.util.concurrent.locks.ReentrantLock; 48 49 /** 50 * An unbounded {@linkplain BlockingQueue blocking queue} of 51 * {@code Delayed} elements, in which an element can only be taken 52 * when its delay has expired. The <em>head</em> of the queue is that 53 * {@code Delayed} element whose delay expired furthest in the 54 * past. If no delay has expired there is no head and {@code poll} 55 * will return {@code null}. Expiration occurs when an element's 56 * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less 57 * than or equal to zero. Even though unexpired elements cannot be 58 * removed using {@code take} or {@code poll}, they are otherwise 59 * treated as normal elements. For example, the {@code size} method 60 * returns the count of both expired and unexpired elements. 61 * This queue does not permit null elements. 62 * 63 * <p>This class and its iterator implement all of the <em>optional</em> 64 * methods of the {@link Collection} and {@link Iterator} interfaces. 65 * The Iterator provided in method {@link #iterator()} is <em>not</em> 66 * guaranteed to traverse the elements of the DelayQueue in any 67 * particular order. 68 * 69 * <p>This class is a member of the 70 * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework"> 71 * Java Collections Framework</a>. 72 * 73 * @since 1.5 74 * @author Doug Lea 75 * @param <E> the type of elements held in this queue 76 */ 77 public class DelayQueue<E extends Delayed> extends AbstractQueue<E> 78 implements BlockingQueue<E> { 79 80 private final transient ReentrantLock lock = new ReentrantLock(); 81 private final PriorityQueue<E> q = new PriorityQueue<E>(); 82 83 /** 84 * Thread designated to wait for the element at the head of 85 * the queue. This variant of the Leader-Follower pattern 86 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to 87 * minimize unnecessary timed waiting. When a thread becomes 88 * the leader, it waits only for the next delay to elapse, but 89 * other threads await indefinitely. The leader thread must 90 * signal some other thread before returning from take() or 91 * poll(...), unless some other thread becomes leader in the 92 * interim. Whenever the head of the queue is replaced with 93 * an element with an earlier expiration time, the leader 94 * field is invalidated by being reset to null, and some 95 * waiting thread, but not necessarily the current leader, is 96 * signalled. So waiting threads must be prepared to acquire 97 * and lose leadership while waiting. 98 */ 99 private Thread leader; 100 101 /** 102 * Condition signalled when a newer element becomes available 103 * at the head of the queue or a new thread may need to 104 * become leader. 105 */ 106 private final Condition available = lock.newCondition(); 107 108 /** 109 * Creates a new {@code DelayQueue} that is initially empty. 110 */ DelayQueue()111 public DelayQueue() {} 112 113 /** 114 * Creates a {@code DelayQueue} initially containing the elements of the 115 * given collection of {@link Delayed} instances. 116 * 117 * @param c the collection of elements to initially contain 118 * @throws NullPointerException if the specified collection or any 119 * of its elements are null 120 */ DelayQueue(Collection<? extends E> c)121 public DelayQueue(Collection<? extends E> c) { 122 this.addAll(c); 123 } 124 125 /** 126 * Inserts the specified element into this delay queue. 127 * 128 * @param e the element to add 129 * @return {@code true} (as specified by {@link Collection#add}) 130 * @throws NullPointerException if the specified element is null 131 */ add(E e)132 public boolean add(E e) { 133 return offer(e); 134 } 135 136 /** 137 * Inserts the specified element into this delay queue. 138 * 139 * @param e the element to add 140 * @return {@code true} 141 * @throws NullPointerException if the specified element is null 142 */ offer(E e)143 public boolean offer(E e) { 144 final ReentrantLock lock = this.lock; 145 lock.lock(); 146 try { 147 q.offer(e); 148 if (q.peek() == e) { 149 leader = null; 150 available.signal(); 151 } 152 return true; 153 } finally { 154 lock.unlock(); 155 } 156 } 157 158 /** 159 * Inserts the specified element into this delay queue. As the queue is 160 * unbounded this method will never block. 161 * 162 * @param e the element to add 163 * @throws NullPointerException {@inheritDoc} 164 */ put(E e)165 public void put(E e) { 166 offer(e); 167 } 168 169 /** 170 * Inserts the specified element into this delay queue. As the queue is 171 * unbounded this method will never block. 172 * 173 * @param e the element to add 174 * @param timeout This parameter is ignored as the method never blocks 175 * @param unit This parameter is ignored as the method never blocks 176 * @return {@code true} 177 * @throws NullPointerException {@inheritDoc} 178 */ offer(E e, long timeout, TimeUnit unit)179 public boolean offer(E e, long timeout, TimeUnit unit) { 180 return offer(e); 181 } 182 183 /** 184 * Retrieves and removes the head of this queue, or returns {@code null} 185 * if this queue has no elements with an expired delay. 186 * 187 * @return the head of this queue, or {@code null} if this 188 * queue has no elements with an expired delay 189 */ poll()190 public E poll() { 191 final ReentrantLock lock = this.lock; 192 lock.lock(); 193 try { 194 E first = q.peek(); 195 return (first == null || first.getDelay(NANOSECONDS) > 0) 196 ? null 197 : q.poll(); 198 } finally { 199 lock.unlock(); 200 } 201 } 202 203 /** 204 * Retrieves and removes the head of this queue, waiting if necessary 205 * until an element with an expired delay is available on this queue. 206 * 207 * @return the head of this queue 208 * @throws InterruptedException {@inheritDoc} 209 */ take()210 public E take() throws InterruptedException { 211 final ReentrantLock lock = this.lock; 212 lock.lockInterruptibly(); 213 try { 214 for (;;) { 215 E first = q.peek(); 216 if (first == null) 217 available.await(); 218 else { 219 long delay = first.getDelay(NANOSECONDS); 220 if (delay <= 0L) 221 return q.poll(); 222 first = null; // don't retain ref while waiting 223 if (leader != null) 224 available.await(); 225 else { 226 Thread thisThread = Thread.currentThread(); 227 leader = thisThread; 228 try { 229 available.awaitNanos(delay); 230 } finally { 231 if (leader == thisThread) 232 leader = null; 233 } 234 } 235 } 236 } 237 } finally { 238 if (leader == null && q.peek() != null) 239 available.signal(); 240 lock.unlock(); 241 } 242 } 243 244 /** 245 * Retrieves and removes the head of this queue, waiting if necessary 246 * until an element with an expired delay is available on this queue, 247 * or the specified wait time expires. 248 * 249 * @return the head of this queue, or {@code null} if the 250 * specified waiting time elapses before an element with 251 * an expired delay becomes available 252 * @throws InterruptedException {@inheritDoc} 253 */ poll(long timeout, TimeUnit unit)254 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 255 long nanos = unit.toNanos(timeout); 256 final ReentrantLock lock = this.lock; 257 lock.lockInterruptibly(); 258 try { 259 for (;;) { 260 E first = q.peek(); 261 if (first == null) { 262 if (nanos <= 0L) 263 return null; 264 else 265 nanos = available.awaitNanos(nanos); 266 } else { 267 long delay = first.getDelay(NANOSECONDS); 268 if (delay <= 0L) 269 return q.poll(); 270 if (nanos <= 0L) 271 return null; 272 first = null; // don't retain ref while waiting 273 if (nanos < delay || leader != null) 274 nanos = available.awaitNanos(nanos); 275 else { 276 Thread thisThread = Thread.currentThread(); 277 leader = thisThread; 278 try { 279 long timeLeft = available.awaitNanos(delay); 280 nanos -= delay - timeLeft; 281 } finally { 282 if (leader == thisThread) 283 leader = null; 284 } 285 } 286 } 287 } 288 } finally { 289 if (leader == null && q.peek() != null) 290 available.signal(); 291 lock.unlock(); 292 } 293 } 294 295 /** 296 * Retrieves, but does not remove, the head of this queue, or 297 * returns {@code null} if this queue is empty. Unlike 298 * {@code poll}, if no expired elements are available in the queue, 299 * this method returns the element that will expire next, 300 * if one exists. 301 * 302 * @return the head of this queue, or {@code null} if this 303 * queue is empty 304 */ peek()305 public E peek() { 306 final ReentrantLock lock = this.lock; 307 lock.lock(); 308 try { 309 return q.peek(); 310 } finally { 311 lock.unlock(); 312 } 313 } 314 size()315 public int size() { 316 final ReentrantLock lock = this.lock; 317 lock.lock(); 318 try { 319 return q.size(); 320 } finally { 321 lock.unlock(); 322 } 323 } 324 325 /** 326 * @throws UnsupportedOperationException {@inheritDoc} 327 * @throws ClassCastException {@inheritDoc} 328 * @throws NullPointerException {@inheritDoc} 329 * @throws IllegalArgumentException {@inheritDoc} 330 */ drainTo(Collection<? super E> c)331 public int drainTo(Collection<? super E> c) { 332 return drainTo(c, Integer.MAX_VALUE); 333 } 334 335 /** 336 * @throws UnsupportedOperationException {@inheritDoc} 337 * @throws ClassCastException {@inheritDoc} 338 * @throws NullPointerException {@inheritDoc} 339 * @throws IllegalArgumentException {@inheritDoc} 340 */ drainTo(Collection<? super E> c, int maxElements)341 public int drainTo(Collection<? super E> c, int maxElements) { 342 Objects.requireNonNull(c); 343 if (c == this) 344 throw new IllegalArgumentException(); 345 if (maxElements <= 0) 346 return 0; 347 final ReentrantLock lock = this.lock; 348 lock.lock(); 349 try { 350 int n = 0; 351 for (E first; 352 n < maxElements 353 && (first = q.peek()) != null 354 && first.getDelay(NANOSECONDS) <= 0;) { 355 c.add(first); // In this order, in case add() throws. 356 q.poll(); 357 ++n; 358 } 359 return n; 360 } finally { 361 lock.unlock(); 362 } 363 } 364 365 /** 366 * Atomically removes all of the elements from this delay queue. 367 * The queue will be empty after this call returns. 368 * Elements with an unexpired delay are not waited for; they are 369 * simply discarded from the queue. 370 */ clear()371 public void clear() { 372 final ReentrantLock lock = this.lock; 373 lock.lock(); 374 try { 375 q.clear(); 376 } finally { 377 lock.unlock(); 378 } 379 } 380 381 /** 382 * Always returns {@code Integer.MAX_VALUE} because 383 * a {@code DelayQueue} is not capacity constrained. 384 * 385 * @return {@code Integer.MAX_VALUE} 386 */ remainingCapacity()387 public int remainingCapacity() { 388 return Integer.MAX_VALUE; 389 } 390 391 /** 392 * Returns an array containing all of the elements in this queue. 393 * The returned array elements are in no particular order. 394 * 395 * <p>The returned array will be "safe" in that no references to it are 396 * maintained by this queue. (In other words, this method must allocate 397 * a new array). The caller is thus free to modify the returned array. 398 * 399 * <p>This method acts as bridge between array-based and collection-based 400 * APIs. 401 * 402 * @return an array containing all of the elements in this queue 403 */ toArray()404 public Object[] toArray() { 405 final ReentrantLock lock = this.lock; 406 lock.lock(); 407 try { 408 return q.toArray(); 409 } finally { 410 lock.unlock(); 411 } 412 } 413 414 /** 415 * Returns an array containing all of the elements in this queue; the 416 * runtime type of the returned array is that of the specified array. 417 * The returned array elements are in no particular order. 418 * If the queue fits in the specified array, it is returned therein. 419 * Otherwise, a new array is allocated with the runtime type of the 420 * specified array and the size of this queue. 421 * 422 * <p>If this queue fits in the specified array with room to spare 423 * (i.e., the array has more elements than this queue), the element in 424 * the array immediately following the end of the queue is set to 425 * {@code null}. 426 * 427 * <p>Like the {@link #toArray()} method, this method acts as bridge between 428 * array-based and collection-based APIs. Further, this method allows 429 * precise control over the runtime type of the output array, and may, 430 * under certain circumstances, be used to save allocation costs. 431 * 432 * <p>The following code can be used to dump a delay queue into a newly 433 * allocated array of {@code Delayed}: 434 * 435 * <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre> 436 * 437 * Note that {@code toArray(new Object[0])} is identical in function to 438 * {@code toArray()}. 439 * 440 * @param a the array into which the elements of the queue are to 441 * be stored, if it is big enough; otherwise, a new array of the 442 * same runtime type is allocated for this purpose 443 * @return an array containing all of the elements in this queue 444 * @throws ArrayStoreException if the runtime type of the specified array 445 * is not a supertype of the runtime type of every element in 446 * this queue 447 * @throws NullPointerException if the specified array is null 448 */ toArray(T[] a)449 public <T> T[] toArray(T[] a) { 450 final ReentrantLock lock = this.lock; 451 lock.lock(); 452 try { 453 return q.toArray(a); 454 } finally { 455 lock.unlock(); 456 } 457 } 458 459 /** 460 * Removes a single instance of the specified element from this 461 * queue, if it is present, whether or not it has expired. 462 */ remove(Object o)463 public boolean remove(Object o) { 464 final ReentrantLock lock = this.lock; 465 lock.lock(); 466 try { 467 return q.remove(o); 468 } finally { 469 lock.unlock(); 470 } 471 } 472 473 /** 474 * Identity-based version for use in Itr.remove. 475 */ removeEQ(Object o)476 void removeEQ(Object o) { 477 final ReentrantLock lock = this.lock; 478 lock.lock(); 479 try { 480 for (Iterator<E> it = q.iterator(); it.hasNext(); ) { 481 if (o == it.next()) { 482 it.remove(); 483 break; 484 } 485 } 486 } finally { 487 lock.unlock(); 488 } 489 } 490 491 /** 492 * Returns an iterator over all the elements (both expired and 493 * unexpired) in this queue. The iterator does not return the 494 * elements in any particular order. 495 * 496 * <p>The returned iterator is 497 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 498 * 499 * @return an iterator over the elements in this queue 500 */ iterator()501 public Iterator<E> iterator() { 502 return new Itr(toArray()); 503 } 504 505 /** 506 * Snapshot iterator that works off copy of underlying q array. 507 */ 508 private class Itr implements Iterator<E> { 509 final Object[] array; // Array of all elements 510 int cursor; // index of next element to return 511 int lastRet; // index of last element, or -1 if no such 512 Itr(Object[] array)513 Itr(Object[] array) { 514 lastRet = -1; 515 this.array = array; 516 } 517 hasNext()518 public boolean hasNext() { 519 return cursor < array.length; 520 } 521 522 @SuppressWarnings("unchecked") next()523 public E next() { 524 if (cursor >= array.length) 525 throw new NoSuchElementException(); 526 return (E)array[lastRet = cursor++]; 527 } 528 remove()529 public void remove() { 530 if (lastRet < 0) 531 throw new IllegalStateException(); 532 removeEQ(array[lastRet]); 533 lastRet = -1; 534 } 535 } 536 537 } 538